Package org.jgroups.util
Class RingBufferSeqnoLockless<T>
- java.lang.Object
-
- org.jgroups.util.RingBufferSeqnoLockless<T>
-
- All Implemented Interfaces:
java.lang.Iterable<T>
public class RingBufferSeqnoLockless<T> extends java.lang.Object implements java.lang.Iterable<T>
Ring buffer, implemented with a circular array. Designed for multiple producers (add()) and a single consumer (remove()). Note that the remove() methods are not reentrant, so multiple consumers won't work correctly ! The buffer has a fixed capacity, and a low (LOW), highest delivered (HD) and highest received (HR) seqno. An element with a sequence number (seqno) > low + capacity or < HD will get discarded. Elements are added after HD, but cannot wrap around beyond LOW. Addition doesn't need to be sequential, e.g. adding 5, 6, 8 is OK (as long as a seqno doesn't pass LOW). Addition may advance HR. Addition of elements that are already present is a no-op, and will not set the element again. Removal of elements starts at HD+1; any non-null element is removed and HD is advanced accordingly. If a remove method is called with nullify=true, then removed elements are nulled and LOW is advanced as well (LOW=HD). Note that all removals in a given RingBufferLockless must either have nullify=true, or all must be false. It is not permitted to do some removals with nullify=true, and others with nullify=false, in the same RingBufferLockless. Thestable(long)
method is called periodically; it nulls all elements between LOW and HD and advances LOW to HD.- Since:
- 3.1
- Author:
- Bela Ban
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description protected class
RingBufferSeqnoLockless.RingBufferIterator
-
Field Summary
Fields Modifier and Type Field Description protected java.util.concurrent.atomic.AtomicReferenceArray<T>
buf
Atomic ref array so that elements can be checked for null and set atomically.protected java.util.concurrent.locks.Condition
buffer_full
protected long
hd
The highest delivered seqno.protected java.util.concurrent.atomic.AtomicLong
hr
The highest received seqno.protected java.util.concurrent.locks.Lock
lock
Lock for adders to block on when the buffer is fullprotected long
low
The lowest seqno.protected long
offset
protected java.util.concurrent.atomic.AtomicBoolean
processing
protected boolean
running
-
Constructor Summary
Constructors Constructor Description RingBufferSeqnoLockless(int capacity, long offset)
Creates a RingBuffer
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Modifier and Type Method Description T
_get(long seqno)
Only used for testing !!boolean
add(long seqno, T element)
boolean
add(long seqno, T element, boolean block)
Adds a new element to the bufferprotected boolean
block(long seqno)
int
capacity()
protected int
count(boolean missing)
void
destroy()
T
get(long seqno)
java.util.List<T>
get(long from, long to)
Returns a list of messages in the range [from ..long[]
getDigest()
long
getHighestDelivered()
long
getHighestReceived()
long
getLow()
SeqnoList
getMissing()
java.util.concurrent.atomic.AtomicBoolean
getProcessing()
protected int
index(long seqno)
java.util.Iterator<T>
iterator()
Returns an iterator over the elements of the ring buffer in the range [HD+1 ..int
missing()
T
remove()
Removes the next element (at hd +1).T
remove(boolean nullify)
Removes the next element (at hd +1).java.util.List<T>
removeMany(boolean nullify, int max_results)
java.util.List<T>
removeMany(java.util.concurrent.atomic.AtomicBoolean processing, boolean nullify, int max_results)
double
saturation()
void
setHighestDelivered(long hd)
int
size()
int
spaceUsed()
void
stable(long seqno)
Nulls elements between low and seqno and forwards lowjava.lang.String
toString()
protected static void
validate(long seqno)
-
-
-
Field Detail
-
buf
protected final java.util.concurrent.atomic.AtomicReferenceArray<T> buf
Atomic ref array so that elements can be checked for null and set atomically. Should always be sized to a power of 2.
-
low
protected volatile long low
The lowest seqno. Moved forward by stable()
-
hd
protected volatile long hd
The highest delivered seqno. Moved forward by a remove method. The next message to be removed is hd +1
-
hr
protected final java.util.concurrent.atomic.AtomicLong hr
The highest received seqno. Moved forward by add(). The next message to be added is hr +1
-
offset
protected final long offset
-
lock
protected final java.util.concurrent.locks.Lock lock
Lock for adders to block on when the buffer is full
-
buffer_full
protected final java.util.concurrent.locks.Condition buffer_full
-
running
protected volatile boolean running
-
processing
protected final java.util.concurrent.atomic.AtomicBoolean processing
-
-
Method Detail
-
getLow
public long getLow()
-
getHighestDelivered
public long getHighestDelivered()
-
setHighestDelivered
public void setHighestDelivered(long hd)
-
getHighestReceived
public long getHighestReceived()
-
getDigest
public long[] getDigest()
-
getProcessing
public java.util.concurrent.atomic.AtomicBoolean getProcessing()
-
add
public boolean add(long seqno, T element)
-
add
public boolean add(long seqno, T element, boolean block)
Adds a new element to the buffer- Parameters:
seqno
- The seqno of the elementelement
- The elementblock
- If true, add() will block when the buffer is full until there is space. Else, add() will return immediately, either successfully or unsuccessfully (if the buffer is full)- Returns:
- True if the element was added, false otherwise.
-
remove
public T remove(boolean nullify)
Removes the next element (at hd +1). Note that this method is not concurrent, as RingBuffer can only have 1 remover thread active at any time !- Parameters:
nullify
- Nulls the element in the array if true- Returns:
- T if there was a non-null element at position hd +1, or null if the element at hd+1 was null, or hd+1 > hr.
-
remove
public T remove()
Removes the next element (at hd +1). Note that this method is not concurrent, as RingBuffer can only have 1 remover thread active at any time !- Returns:
- T if there was a non-null element at position hd +1, or null if the element at hd+1 was null.
-
removeMany
public java.util.List<T> removeMany(boolean nullify, int max_results)
-
removeMany
public java.util.List<T> removeMany(java.util.concurrent.atomic.AtomicBoolean processing, boolean nullify, int max_results)
-
get
public T get(long seqno)
-
_get
public T _get(long seqno)
Only used for testing !!
-
get
public java.util.List<T> get(long from, long to)
Returns a list of messages in the range [from .. to], including from and to- Parameters:
from
-to
-- Returns:
- A list of messages, or null if none in range [from .. to] was found
-
stable
public void stable(long seqno)
Nulls elements between low and seqno and forwards low
-
destroy
public void destroy()
-
capacity
public final int capacity()
-
size
public int size()
-
missing
public int missing()
-
spaceUsed
public int spaceUsed()
-
saturation
public double saturation()
-
getMissing
public SeqnoList getMissing()
-
iterator
public java.util.Iterator<T> iterator()
Returns an iterator over the elements of the ring buffer in the range [HD+1 .. HR]- Specified by:
iterator
in interfacejava.lang.Iterable<T>
- Returns:
- RingBufferIterator
- Throws:
java.util.NoSuchElementException
- is HD is moved forward during the iteration
-
toString
public java.lang.String toString()
- Overrides:
toString
in classjava.lang.Object
-
validate
protected static final void validate(long seqno)
-
index
protected int index(long seqno)
-
block
protected boolean block(long seqno)
-
count
protected int count(boolean missing)
-
-