Package org.jgroups.util
Class DynamicBuffer<T>
- java.lang.Object
-
- org.jgroups.util.Buffer<T>
-
- org.jgroups.util.DynamicBuffer<T>
-
- All Implemented Interfaces:
java.io.Closeable
,java.lang.AutoCloseable
,java.lang.Iterable<T>
public class DynamicBuffer<T> extends Buffer<T>
Copy ofTable
. Implementation of)
, expanding and shrinking dynamically.
A store for elements (typically messages) to be retransmitted or delivered. Used on sender and receiver side.
DynamicBuffer maintains a matrix of elements, which are stored by mapping their seqno to an index. E.g. when we have 10 rows of 1000 elements each, and first_seqno is 3000, then an element with seqno=5600, will be stored in the 3rd row, at index 600.
Rows are removed when all elements in that row have been delivered.- Version:
- 3.1
- Author:
- Bela Ban
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description protected class
DynamicBuffer.TableIterator
Iterates through all elements of the matrix.-
Nested classes/interfaces inherited from class org.jgroups.util.Buffer
Buffer.HighestDeliverable, Buffer.Missing, Buffer.NumDeliverable, Buffer.Options, Buffer.Remover<R>, Buffer.Visitor<T>
-
-
Field Summary
Fields Modifier and Type Field Description protected static long
DEFAULT_MAX_COMPACTION_TIME
protected static double
DEFAULT_RESIZE_FACTOR
protected int
elements_per_row
Must be a power of 2 for efficient modular arithmeticprotected long
last_compaction_timestamp
The time when the last compaction took place.protected T[][]
matrix
protected long
max_compaction_time
Time (in nanoseconds) after which a compaction should take place.protected int
num_compactions
protected int
num_moves
protected int
num_purges
protected int
num_resizes
protected int
num_rows
protected double
resize_factor
-
Constructor Summary
Constructors Constructor Description DynamicBuffer()
DynamicBuffer(int num_rows, int elements_per_row, long offset)
DynamicBuffer(int num_rows, int elements_per_row, long offset, double resize_factor)
DynamicBuffer(int num_rows, int elements_per_row, long offset, double resize_factor, long max_compaction_time)
Creates a new tableDynamicBuffer(long offset)
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Modifier and Type Method Description protected void
_compact()
Moves the contents of matrix down by the number of purged rows and resizes the matrix accordingly.T
_get(long seqno)
To be used only for testing; doesn't do any index or sanity checksboolean
add(long seqno, T element, java.util.function.Predicate<T> remove_filter, Buffer.Options __, boolean ignored)
Adds an element if the element at the given index is null.boolean
add(java.util.List<LongTuple<T>> list, boolean remove_added_elements, T const_value)
Adds elements from the listboolean
add(MessageBatch batch, java.util.function.Function<T,java.lang.Long> seqno_getter, boolean remove_from_batch, T const_value)
Adds all messages from the given batch to the tableint
capacity()
Returns the total capacity in the matrixvoid
compact()
protected int
computeIndex(long seqno)
Computes and returns the index within a row for seqnoprotected int
computeRow(long seqno)
Computes and returns the row index for seqno.protected long
findHighestSeqno(java.util.List<LongTuple<T>> list)
protected static <T> long
findHighestSeqno(MessageBatch batch, java.util.function.Function<T,java.lang.Long> seqno_getter)
void
forEach(long from, long to, Buffer.Visitor<T> visitor, boolean nullify)
Iterates over the matrix with range [from ..T
get(long seqno)
Returns an element at seqnoint
getElementsPerRow()
long
getMaxCompactionTime()
int
getNumCompactions()
int
getNumMoves()
int
getNumPurges()
int
getNumResizes()
int
getNumRows()
protected T[]
getRow(int index)
Returns a row.java.util.Iterator<T>
iterator()
java.util.Iterator<T>
iterator(long from, long to)
protected void
move(int num_rows)
Moves contents of matrix num_rows down.int
purge(long seqno, boolean force)
Removes all elements less than or equal to seqno from the buffer.T
remove(boolean nullify)
Removes the next non-null element and nulls the index if nullify=truejava.util.List<T>
removeMany(boolean nullify, int max_results, java.util.function.Predicate<T> filter)
<R> R
removeMany(boolean nullify, int max_results, java.util.function.Predicate<T> filter, java.util.function.Supplier<R> result_creator, java.util.function.BiConsumer<R,T> accumulator)
Removes elements from the table and adds them to the result created by result_creator.void
resetStats()
protected void
resize(long seqno)
Moves rows down the matrix, by removing purged rows.DynamicBuffer<T>
setMaxCompactionTime(long max_compaction_time)
java.util.stream.Stream<T>
stream()
java.util.stream.Stream<T>
stream(long from, long to)
-
Methods inherited from class org.jgroups.util.Buffer
add, close, computeSize, dump, forEach, getAdders, getDigest, getHighestDeliverable, getMissing, getMissing, getNumDeliverable, hd, high, highestDelivered, highestDelivered, isEmpty, lock, low, numMissing, offset, open, purge, remove, removeMany, size, toString
-
-
-
-
Field Detail
-
num_rows
protected final int num_rows
-
elements_per_row
protected final int elements_per_row
Must be a power of 2 for efficient modular arithmetic
-
resize_factor
protected final double resize_factor
-
matrix
protected T[][] matrix
-
max_compaction_time
protected long max_compaction_time
Time (in nanoseconds) after which a compaction should take place. 0 disables compaction
-
last_compaction_timestamp
protected long last_compaction_timestamp
The time when the last compaction took place. If acompact()
takes place and sees that the last compaction is more than max_compaction_time nanoseconds ago, a compaction will take place
-
num_compactions
protected int num_compactions
-
num_resizes
protected int num_resizes
-
num_moves
protected int num_moves
-
num_purges
protected int num_purges
-
DEFAULT_MAX_COMPACTION_TIME
protected static final long DEFAULT_MAX_COMPACTION_TIME
- See Also:
- Constant Field Values
-
DEFAULT_RESIZE_FACTOR
protected static final double DEFAULT_RESIZE_FACTOR
- See Also:
- Constant Field Values
-
-
Constructor Detail
-
DynamicBuffer
public DynamicBuffer()
-
DynamicBuffer
public DynamicBuffer(long offset)
-
DynamicBuffer
public DynamicBuffer(int num_rows, int elements_per_row, long offset)
-
DynamicBuffer
public DynamicBuffer(int num_rows, int elements_per_row, long offset, double resize_factor)
-
DynamicBuffer
public DynamicBuffer(int num_rows, int elements_per_row, long offset, double resize_factor, long max_compaction_time)
Creates a new table- Parameters:
num_rows
- the number of rows in the matrixelements_per_row
- the number of elements per rowoffset
- the seqno before the first seqno to be inserted. E.g. if 0 then the first seqno will be 1resize_factor
- teh factor with which to increase the number of rowsmax_compaction_time
- the max time in milliseconds after we attempt a compaction
-
-
Method Detail
-
getElementsPerRow
public int getElementsPerRow()
-
capacity
public int capacity()
Returns the total capacity in the matrix
-
getNumRows
public int getNumRows()
-
getNumCompactions
public int getNumCompactions()
-
getNumMoves
public int getNumMoves()
-
getNumResizes
public int getNumResizes()
-
getNumPurges
public int getNumPurges()
-
getMaxCompactionTime
public long getMaxCompactionTime()
-
setMaxCompactionTime
public DynamicBuffer<T> setMaxCompactionTime(long max_compaction_time)
-
resetStats
public void resetStats()
- Overrides:
resetStats
in classBuffer<T>
-
add
public boolean add(long seqno, T element, java.util.function.Predicate<T> remove_filter, Buffer.Options __, boolean ignored)
Adds an element if the element at the given index is null. Returns true if no element existed at the given index, else returns false and doesn't set the element.- Specified by:
add
in classBuffer<T>
- Parameters:
seqno
-element
-remove_filter
- If not null, a filter used to remove all consecutive messages passing the filter__
- Ignoredignored
- If true, don't block when no space is available, but instead drop the element. This parameter is set by calling Message.isFlagSet(DONT_BLOCK)- Returns:
- True if the element at the computed index was null, else false
-
add
public boolean add(MessageBatch batch, java.util.function.Function<T,java.lang.Long> seqno_getter, boolean remove_from_batch, T const_value)
Adds all messages from the given batch to the table- Specified by:
add
in classBuffer<T>
- Parameters:
batch
- The batchseqno_getter
- A function to return the sequence number (seqno) of a given Message. Must be non-null. If the function return -1, then the message won't be addedremove_from_batch
- If true, the message is removed from the batchconst_value
- If non-null, this value should be used rather than the values of the list tuples- Returns:
- True if at least 1 element was added successfully, false otherwise.
-
add
public boolean add(java.util.List<LongTuple<T>> list, boolean remove_added_elements, T const_value)
Description copied from class:Buffer
Adds elements from the list- Specified by:
add
in classBuffer<T>
- Parameters:
list
- The list of tuples of seqnos and elements. If remove_added_elements is true, if elements could not be added (e.g. because they were already present or the seqno was < HD), those elements will be removed from listremove_added_elements
- If true, elements that could not be added to the table are removed from listconst_value
- If non-null, this value should be used rather than the values of the list tuples- Returns:
- True if at least 1 element was added successfully, false otherwise.
-
get
public T get(long seqno)
Returns an element at seqno
-
_get
public T _get(long seqno)
To be used only for testing; doesn't do any index or sanity checks
-
remove
public T remove(boolean nullify)
Removes the next non-null element and nulls the index if nullify=true
-
removeMany
public java.util.List<T> removeMany(boolean nullify, int max_results, java.util.function.Predicate<T> filter)
- Specified by:
removeMany
in classBuffer<T>
-
removeMany
public <R> R removeMany(boolean nullify, int max_results, java.util.function.Predicate<T> filter, java.util.function.Supplier<R> result_creator, java.util.function.BiConsumer<R,T> accumulator)
Removes elements from the table and adds them to the result created by result_creator. Between 0 and max_results elements are removed.- Specified by:
removeMany
in classBuffer<T>
- Type Parameters:
R
- the type of the result- Parameters:
nullify
- if true, the removed element will be nulledmax_results
- the max number of results to be returned, even if more elements would be removablefilter
- a filter which accepts (or rejects) elements into the result. If null, all elements will be acceptedresult_creator
- a supplier required to create the result, e.g. ArrayList::newaccumulator
- an accumulator accepting the result and an element, e.g. ArrayList::add- Returns:
- the result
-
purge
public int purge(long seqno, boolean force)
Removes all elements less than or equal to seqno from the buffer. Does this by nulling entire rows in the matrix and nulling all elements < index(seqno) of the first row that cannot be removed.
-
compact
public void compact()
-
forEach
public void forEach(long from, long to, Buffer.Visitor<T> visitor, boolean nullify)
Iterates over the matrix with range [from .. to] (including from and to), and callsBuffer.Visitor.visit(long, Object)
. If the visit() method returns false, the iteration is terminated.
This method must be called with the lock held
-
iterator
public java.util.Iterator<T> iterator()
-
iterator
public java.util.Iterator<T> iterator(long from, long to)
-
stream
public java.util.stream.Stream<T> stream(long from, long to)
-
findHighestSeqno
protected static <T> long findHighestSeqno(MessageBatch batch, java.util.function.Function<T,java.lang.Long> seqno_getter)
-
resize
protected void resize(long seqno)
Moves rows down the matrix, by removing purged rows. If resizing to accommodate seqno is still needed, computes a new size. Then either moves existing rows down, or copies them into a new array (if resizing took place). The lock must be held by the caller of resize().
-
move
protected void move(int num_rows)
Moves contents of matrix num_rows down. Avoids a System.arraycopy(). Caller must hold the lock.
-
_compact
protected void _compact()
Moves the contents of matrix down by the number of purged rows and resizes the matrix accordingly. The capacity of the matrix should be size * resize_factor. Caller must hold the lock.
-
getRow
protected T[] getRow(int index)
Returns a row. Creates a new row and inserts it at index if the row at index doesn't exist- Parameters:
index
-- Returns:
- A row
-
computeRow
protected int computeRow(long seqno)
Computes and returns the row index for seqno. The caller must hold the lock.
-
computeIndex
protected int computeIndex(long seqno)
Computes and returns the index within a row for seqno
-
-