org.jgroups.util
Class RingBuffer<T>

java.lang.Object
  extended by org.jgroups.util.RingBuffer<T>
All Implemented Interfaces:
java.lang.Iterable<T>

public class RingBuffer<T>
extends java.lang.Object
implements java.lang.Iterable<T>

Ring buffer, implemented with a circular array. Designed for multiple producers (add()) and a single consumer (remove()). Note that the remove() methods are not reentrant, so multiple consumers won't work correctly !

The buffer has a fixed capacity, and a low (LOW), highest delivered (HD) and highest received (HR) seqno.

An element with a sequence number (seqno) > low + capacity or < HD will get discarded.

Elements are added after HD, but cannot wrap around beyond LOW. Addition doesn't need to be sequential, e.g. adding 5, 6, 8 is OK (as long as a seqno doesn't pass LOW). Addition may advance HR. Addition of elements that are already present is a no-op, and will not set the element again.

Removal of elements starts at HD+1; any non-null element is removed and HD is advanced accordingly. If a remove method is called with nullify=true, then removed elements are nulled and LOW is advanced as well (LOW=HD). Note that all removals in a given RingBuffer must either have nullify=true, or all must be false. It is not permitted to do some removals with nullify=true, and others with nullify=false, in the same RingBuffer.

The stable(long) method is called periodically; it nulls all elements between LOW and HD and advances LOW to HD.

The design of RingBuffer is discussed in doc/design/RingBuffer.txt.

Since:
3.1
Author:
Bela Ban

Nested Class Summary
protected  class RingBuffer.RingBufferIterator<T>
           
 
Field Summary
protected  T[] buf
          Atomic ref array so that elements can be checked for null and set atomically
protected  java.util.concurrent.locks.Condition buffer_full
           
protected  long hd
          The highest delivered seqno.
protected  long hr
          The highest received seqno.
protected  java.util.concurrent.locks.Lock lock
          Lock for adders to block on when the buffer is full
protected  long low
          The lowest seqno.
protected  long offset
           
protected  java.util.concurrent.atomic.AtomicBoolean processing
           
protected  boolean running
           
 
Constructor Summary
RingBuffer(int capacity, long offset)
          Creates a RingBuffer
 
Method Summary
 T _get(long seqno)
          Only used for testing !!
 boolean add(long seqno, T element)
           
 boolean add(long seqno, T element, boolean block)
          Adds a new element to the buffer
protected  boolean block(long seqno)
           
 int capacity()
           
protected  int count(boolean missing)
           
 void destroy()
           
 T get(long seqno)
           
 java.util.List<T> get(long from, long to)
          Returns a list of messages in the range [from ..
 long[] getDigest()
           
 long getHighestDelivered()
           
 long getHighestReceived()
           
 long getLow()
           
 SeqnoList getMissing()
           
 java.util.concurrent.atomic.AtomicBoolean getProcessing()
           
protected  int index(long seqno)
           
 java.util.Iterator<T> iterator()
          Returns an iterator over the elements of the ring buffer in the range [HD+1 ..
 int missing()
           
 T remove()
          Removes the next element (at hd +1).
 T remove(boolean nullify)
          Removes the next element (at hd +1).
 java.util.List<T> removeMany(java.util.concurrent.atomic.AtomicBoolean processing, boolean nullify, int max_results)
           
 java.util.List<T> removeMany(boolean nullify, int max_results)
           
 double saturation()
           
 void setHighestDelivered(long hd)
           
 int size()
           
 int spaceUsed()
           
 void stable(long seqno)
          Nulls elements between low and seqno and forwards low
 java.lang.String toString()
           
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
 

Field Detail

buf

protected final T[] buf
Atomic ref array so that elements can be checked for null and set atomically


low

protected long low
The lowest seqno. Moved forward by stable()


hd

protected long hd
The highest delivered seqno. Moved forward by a remove method. The next message to be removed is hd +1


hr

protected long hr
The highest received seqno. Moved forward by add(). The next message to be added is hr +1


offset

protected final long offset

lock

protected final java.util.concurrent.locks.Lock lock
Lock for adders to block on when the buffer is full


buffer_full

protected final java.util.concurrent.locks.Condition buffer_full

running

protected boolean running

processing

protected final java.util.concurrent.atomic.AtomicBoolean processing
Constructor Detail

RingBuffer

public RingBuffer(int capacity,
                  long offset)
Creates a RingBuffer

Parameters:
capacity - The number of elements the ring buffer's array should hold
offset - The offset. The first element to be added has to be offset +1.
Method Detail

getLow

public long getLow()

getHighestDelivered

public long getHighestDelivered()

setHighestDelivered

public void setHighestDelivered(long hd)

getHighestReceived

public long getHighestReceived()

getDigest

public long[] getDigest()

getProcessing

public java.util.concurrent.atomic.AtomicBoolean getProcessing()

add

public boolean add(long seqno,
                   T element)

add

public boolean add(long seqno,
                   T element,
                   boolean block)
Adds a new element to the buffer

Parameters:
seqno - The seqno of the element
element - The element
block - If true, add() will block when the buffer is full until there is space. Else, add() will return immediately, either successfully or unsuccessfully (if the buffer is full)
Returns:
True if the element was added, false otherwise.

remove

public T remove(boolean nullify)
Removes the next element (at hd +1). Note that this method is not concurrent, as RingBuffer can only have 1 remover thread active at any time !

Parameters:
nullify - Nulls the element in the array if true
Returns:
T if there was a non-null element at position hd +1, or null if the element at hd+1 was null, or hd+1 > hr.

remove

public T remove()
Removes the next element (at hd +1). Note that this method is not concurrent, as RingBuffer can only have 1 remover thread active at any time !

Returns:
T if there was a non-null element at position hd +1, or null if the element at hd+1 was null.

removeMany

public java.util.List<T> removeMany(boolean nullify,
                                    int max_results)

removeMany

public java.util.List<T> removeMany(java.util.concurrent.atomic.AtomicBoolean processing,
                                    boolean nullify,
                                    int max_results)

get

public T get(long seqno)

_get

public T _get(long seqno)
Only used for testing !!


get

public java.util.List<T> get(long from,
                             long to)
Returns a list of messages in the range [from .. to], including from and to

Parameters:
from -
to -
Returns:
A list of messages, or null if none in range [from .. to] was found

stable

public void stable(long seqno)
Nulls elements between low and seqno and forwards low


destroy

public void destroy()

capacity

public final int capacity()

size

public int size()

missing

public int missing()

spaceUsed

public int spaceUsed()

saturation

public double saturation()

getMissing

public SeqnoList getMissing()

iterator

public java.util.Iterator<T> iterator()
Returns an iterator over the elements of the ring buffer in the range [HD+1 .. HR]

Specified by:
iterator in interface java.lang.Iterable<T>
Returns:
RingBufferIterator
Throws:
java.util.NoSuchElementException - is HD is moved forward during the iteration

toString

public java.lang.String toString()
Overrides:
toString in class java.lang.Object

index

protected int index(long seqno)

block

protected boolean block(long seqno)

count

protected int count(boolean missing)


Copyright © 1998-2012 Bela Ban / Red Hat. All Rights Reserved.