Class DynamicBuffer<T>

  • All Implemented Interfaces:
    java.io.Closeable, java.lang.AutoCloseable, java.lang.Iterable<T>

    public class DynamicBuffer<T>
    extends Buffer<T>
    Copy of Table. Implementation of ), expanding and shrinking dynamically.
    A store for elements (typically messages) to be retransmitted or delivered. Used on sender and receiver side.
    DynamicBuffer maintains a matrix of elements, which are stored by mapping their seqno to an index. E.g. when we have 10 rows of 1000 elements each, and first_seqno is 3000, then an element with seqno=5600, will be stored in the 3rd row, at index 600.
    Rows are removed when all elements in that row have been delivered.
    Version:
    3.1
    Author:
    Bela Ban
    • Field Detail

      • num_rows

        protected final int num_rows
      • elements_per_row

        protected final int elements_per_row
        Must be a power of 2 for efficient modular arithmetic
      • resize_factor

        protected final double resize_factor
      • matrix

        protected T[][] matrix
      • max_compaction_time

        protected long max_compaction_time
        Time (in nanoseconds) after which a compaction should take place. 0 disables compaction
      • last_compaction_timestamp

        protected long last_compaction_timestamp
        The time when the last compaction took place. If a compact() takes place and sees that the last compaction is more than max_compaction_time nanoseconds ago, a compaction will take place
      • num_compactions

        protected int num_compactions
      • num_resizes

        protected int num_resizes
      • num_moves

        protected int num_moves
      • num_purges

        protected int num_purges
      • DEFAULT_MAX_COMPACTION_TIME

        protected static final long DEFAULT_MAX_COMPACTION_TIME
        See Also:
        Constant Field Values
      • DEFAULT_RESIZE_FACTOR

        protected static final double DEFAULT_RESIZE_FACTOR
        See Also:
        Constant Field Values
    • Constructor Detail

      • DynamicBuffer

        public DynamicBuffer()
      • DynamicBuffer

        public DynamicBuffer​(long offset)
      • DynamicBuffer

        public DynamicBuffer​(int num_rows,
                             int elements_per_row,
                             long offset)
      • DynamicBuffer

        public DynamicBuffer​(int num_rows,
                             int elements_per_row,
                             long offset,
                             double resize_factor)
      • DynamicBuffer

        public DynamicBuffer​(int num_rows,
                             int elements_per_row,
                             long offset,
                             double resize_factor,
                             long max_compaction_time)
        Creates a new table
        Parameters:
        num_rows - the number of rows in the matrix
        elements_per_row - the number of elements per row
        offset - the seqno before the first seqno to be inserted. E.g. if 0 then the first seqno will be 1
        resize_factor - teh factor with which to increase the number of rows
        max_compaction_time - the max time in milliseconds after we attempt a compaction
    • Method Detail

      • getElementsPerRow

        public int getElementsPerRow()
      • capacity

        public int capacity()
        Returns the total capacity in the matrix
        Specified by:
        capacity in class Buffer<T>
      • getNumRows

        public int getNumRows()
      • getNumCompactions

        public int getNumCompactions()
      • getNumMoves

        public int getNumMoves()
      • getNumResizes

        public int getNumResizes()
      • getNumPurges

        public int getNumPurges()
      • getMaxCompactionTime

        public long getMaxCompactionTime()
      • setMaxCompactionTime

        public DynamicBuffer<T> setMaxCompactionTime​(long max_compaction_time)
      • add

        public boolean add​(long seqno,
                           T element,
                           java.util.function.Predicate<T> remove_filter,
                           Buffer.Options __,
                           boolean ignored)
        Adds an element if the element at the given index is null. Returns true if no element existed at the given index, else returns false and doesn't set the element.
        Specified by:
        add in class Buffer<T>
        Parameters:
        seqno -
        element -
        remove_filter - If not null, a filter used to remove all consecutive messages passing the filter
        __ - Ignored
        ignored - If true, don't block when no space is available, but instead drop the element. This parameter is set by calling Message.isFlagSet(DONT_BLOCK)
        Returns:
        True if the element at the computed index was null, else false
      • add

        public boolean add​(MessageBatch batch,
                           java.util.function.Function<T,​java.lang.Long> seqno_getter,
                           boolean remove_from_batch,
                           T const_value)
        Adds all messages from the given batch to the table
        Specified by:
        add in class Buffer<T>
        Parameters:
        batch - The batch
        seqno_getter - A function to return the sequence number (seqno) of a given Message. Must be non-null. If the function return -1, then the message won't be added
        remove_from_batch - If true, the message is removed from the batch
        const_value - If non-null, this value should be used rather than the values of the list tuples
        Returns:
        True if at least 1 element was added successfully, false otherwise.
      • add

        public boolean add​(java.util.List<LongTuple<T>> list,
                           boolean remove_added_elements,
                           T const_value)
        Description copied from class: Buffer
        Adds elements from the list
        Specified by:
        add in class Buffer<T>
        Parameters:
        list - The list of tuples of seqnos and elements. If remove_added_elements is true, if elements could not be added (e.g. because they were already present or the seqno was < HD), those elements will be removed from list
        remove_added_elements - If true, elements that could not be added to the table are removed from list
        const_value - If non-null, this value should be used rather than the values of the list tuples
        Returns:
        True if at least 1 element was added successfully, false otherwise.
      • get

        public T get​(long seqno)
        Returns an element at seqno
        Specified by:
        get in class Buffer<T>
        Parameters:
        seqno -
        Returns:
      • _get

        public T _get​(long seqno)
        To be used only for testing; doesn't do any index or sanity checks
        Specified by:
        _get in class Buffer<T>
        Parameters:
        seqno -
        Returns:
      • remove

        public T remove​(boolean nullify)
        Removes the next non-null element and nulls the index if nullify=true
        Specified by:
        remove in class Buffer<T>
      • removeMany

        public java.util.List<T> removeMany​(boolean nullify,
                                            int max_results,
                                            java.util.function.Predicate<T> filter)
        Specified by:
        removeMany in class Buffer<T>
      • removeMany

        public <R> R removeMany​(boolean nullify,
                                int max_results,
                                java.util.function.Predicate<T> filter,
                                java.util.function.Supplier<R> result_creator,
                                java.util.function.BiConsumer<R,​T> accumulator)
        Removes elements from the table and adds them to the result created by result_creator. Between 0 and max_results elements are removed.
        Specified by:
        removeMany in class Buffer<T>
        Type Parameters:
        R - the type of the result
        Parameters:
        nullify - if true, the removed element will be nulled
        max_results - the max number of results to be returned, even if more elements would be removable
        filter - a filter which accepts (or rejects) elements into the result. If null, all elements will be accepted
        result_creator - a supplier required to create the result, e.g. ArrayList::new
        accumulator - an accumulator accepting the result and an element, e.g. ArrayList::add
        Returns:
        the result
      • purge

        public int purge​(long seqno,
                         boolean force)
        Removes all elements less than or equal to seqno from the buffer. Does this by nulling entire rows in the matrix and nulling all elements < index(seqno) of the first row that cannot be removed.
        Specified by:
        purge in class Buffer<T>
        Parameters:
        seqno - All elements <= seqno will be nulled
        force - If true, we only ensure that seqno <= hr, but don't care about hd, and set hd=low=seqno.
        Returns:
        0. The number of purged elements
      • compact

        public void compact()
      • forEach

        public void forEach​(long from,
                            long to,
                            Buffer.Visitor<T> visitor,
                            boolean nullify)
        Iterates over the matrix with range [from .. to] (including from and to), and calls Buffer.Visitor.visit(long, Object). If the visit() method returns false, the iteration is terminated.
        This method must be called with the lock held
        Specified by:
        forEach in class Buffer<T>
        Parameters:
        from - The starting seqno
        to - The ending seqno, the range is [from .. to] including from and to
        visitor - An instance of Visitor
        nullify - Nulls the visited element when true
      • iterator

        public java.util.Iterator<T> iterator()
      • iterator

        public java.util.Iterator<T> iterator​(long from,
                                              long to)
        Specified by:
        iterator in class Buffer<T>
      • stream

        public java.util.stream.Stream<T> stream()
        Specified by:
        stream in class Buffer<T>
      • stream

        public java.util.stream.Stream<T> stream​(long from,
                                                 long to)
        Specified by:
        stream in class Buffer<T>
      • findHighestSeqno

        protected long findHighestSeqno​(java.util.List<LongTuple<T>> list)
      • findHighestSeqno

        protected static <T> long findHighestSeqno​(MessageBatch batch,
                                                   java.util.function.Function<T,​java.lang.Long> seqno_getter)
      • resize

        protected void resize​(long seqno)
        Moves rows down the matrix, by removing purged rows. If resizing to accommodate seqno is still needed, computes a new size. Then either moves existing rows down, or copies them into a new array (if resizing took place). The lock must be held by the caller of resize().
      • move

        protected void move​(int num_rows)
        Moves contents of matrix num_rows down. Avoids a System.arraycopy(). Caller must hold the lock.
      • _compact

        protected void _compact()
        Moves the contents of matrix down by the number of purged rows and resizes the matrix accordingly. The capacity of the matrix should be size * resize_factor. Caller must hold the lock.
      • getRow

        protected T[] getRow​(int index)
        Returns a row. Creates a new row and inserts it at index if the row at index doesn't exist
        Parameters:
        index -
        Returns:
        A row
      • computeRow

        protected int computeRow​(long seqno)
        Computes and returns the row index for seqno. The caller must hold the lock.
      • computeIndex

        protected int computeIndex​(long seqno)
        Computes and returns the index within a row for seqno