Class RingBufferSeqno<T>

  • All Implemented Interfaces:
    java.lang.Iterable<T>

    public class RingBufferSeqno<T>
    extends java.lang.Object
    implements java.lang.Iterable<T>
    Ring buffer, implemented with a circular array. Designed for multiple producers (add()) and a single consumer (remove()). Note that the remove() methods are not reentrant, so multiple consumers won't work correctly !

    The buffer has a fixed capacity, and a low (LOW), highest delivered (HD) and highest received (HR) seqno.

    An element with a sequence number (seqno) > low + capacity or < HD will get discarded.

    Elements are added after HD, but cannot wrap around beyond LOW. Addition doesn't need to be sequential, e.g. adding 5, 6, 8 is OK (as long as a seqno doesn't pass LOW). Addition may advance HR. Addition of elements that are already present is a no-op, and will not set the element again.

    Removal of elements starts at HD+1; any non-null element is removed and HD is advanced accordingly. If a remove method is called with nullify=true, then removed elements are nulled and LOW is advanced as well (LOW=HD). Note that all removals in a given RingBuffer must either have nullify=true, or all must be false. It is not permitted to do some removals with nullify=true, and others with nullify=false, in the same RingBuffer.

    The stable(long) method is called periodically; it nulls all elements between LOW and HD and advances LOW to HD.

    The design of RingBuffer is discussed in doc/design/RingBufferSeqno.txt.

    Since:
    3.1
    Author:
    Bela Ban
    • Field Summary

      Fields 
      Modifier and Type Field Description
      protected T[] buf
      Atomic ref array so that elements can be checked for null and set atomically.
      protected java.util.concurrent.locks.Condition buffer_full  
      protected long hd
      The highest delivered seqno.
      protected long hr
      The highest received seqno.
      protected java.util.concurrent.locks.Lock lock
      Lock for adders to block on when the buffer is full
      protected long low
      The lowest seqno.
      protected long offset  
      protected java.util.concurrent.atomic.AtomicBoolean processing  
      protected boolean running  
    • Constructor Summary

      Constructors 
      Constructor Description
      RingBufferSeqno​(int capacity, long offset)
      Creates a RingBuffer
    • Method Summary

      All Methods Instance Methods Concrete Methods 
      Modifier and Type Method Description
      T _get​(long seqno)
      Only used for testing !!
      boolean add​(long seqno, T element)  
      boolean add​(long seqno, T element, boolean block)
      Adds a new element to the buffer
      protected boolean block​(long seqno)  
      int capacity()  
      protected int count​(boolean missing)  
      void destroy()  
      T get​(long seqno)  
      java.util.List<T> get​(long from, long to)
      Returns a list of messages in the range [from ..
      long[] getDigest()  
      long getHighestDelivered()  
      long getHighestReceived()  
      long getLow()  
      SeqnoList getMissing()  
      java.util.concurrent.atomic.AtomicBoolean getProcessing()  
      protected int index​(long seqno)  
      java.util.Iterator<T> iterator()
      Returns an iterator over the elements of the ring buffer in the range [HD+1 ..
      int missing()  
      T remove()
      Removes the next element (at hd +1).
      T remove​(boolean nullify)
      Removes the next element (at hd +1).
      java.util.List<T> removeMany​(boolean nullify, int max_results)  
      java.util.List<T> removeMany​(java.util.concurrent.atomic.AtomicBoolean processing, boolean nullify, int max_results)  
      double saturation()  
      void setHighestDelivered​(long hd)  
      int size()  
      int spaceUsed()  
      void stable​(long seqno)
      Nulls elements between low and seqno and forwards low
      java.lang.String toString()  
      • Methods inherited from class java.lang.Object

        clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
      • Methods inherited from interface java.lang.Iterable

        forEach, spliterator
    • Field Detail

      • buf

        protected final T[] buf
        Atomic ref array so that elements can be checked for null and set atomically. Should always be sized to a power of 2.
      • low

        protected long low
        The lowest seqno. Moved forward by stable()
      • hd

        protected long hd
        The highest delivered seqno. Moved forward by a remove method. The next message to be removed is hd +1
      • hr

        protected long hr
        The highest received seqno. Moved forward by add(). The next message to be added is hr +1
      • offset

        protected final long offset
      • lock

        protected final java.util.concurrent.locks.Lock lock
        Lock for adders to block on when the buffer is full
      • buffer_full

        protected final java.util.concurrent.locks.Condition buffer_full
      • running

        protected boolean running
      • processing

        protected final java.util.concurrent.atomic.AtomicBoolean processing
    • Constructor Detail

      • RingBufferSeqno

        public RingBufferSeqno​(int capacity,
                               long offset)
        Creates a RingBuffer
        Parameters:
        capacity - The number of elements the ring buffer's array should hold.
        offset - The offset. The first element to be added has to be offset +1.
    • Method Detail

      • getLow

        public long getLow()
      • getHighestDelivered

        public long getHighestDelivered()
      • setHighestDelivered

        public void setHighestDelivered​(long hd)
      • getHighestReceived

        public long getHighestReceived()
      • getDigest

        public long[] getDigest()
      • getProcessing

        public java.util.concurrent.atomic.AtomicBoolean getProcessing()
      • add

        public boolean add​(long seqno,
                           T element)
      • add

        public boolean add​(long seqno,
                           T element,
                           boolean block)
        Adds a new element to the buffer
        Parameters:
        seqno - The seqno of the element
        element - The element
        block - If true, add() will block when the buffer is full until there is space. Else, add() will return immediately, either successfully or unsuccessfully (if the buffer is full)
        Returns:
        True if the element was added, false otherwise.
      • remove

        public T remove​(boolean nullify)
        Removes the next element (at hd +1). Note that this method is not concurrent, as RingBuffer can only have 1 remover thread active at any time !
        Parameters:
        nullify - Nulls the element in the array if true
        Returns:
        T if there was a non-null element at position hd +1, or null if the element at hd+1 was null, or hd+1 > hr.
      • remove

        public T remove()
        Removes the next element (at hd +1). Note that this method is not concurrent, as RingBuffer can only have 1 remover thread active at any time !
        Returns:
        T if there was a non-null element at position hd +1, or null if the element at hd+1 was null.
      • removeMany

        public java.util.List<T> removeMany​(boolean nullify,
                                            int max_results)
      • removeMany

        public java.util.List<T> removeMany​(java.util.concurrent.atomic.AtomicBoolean processing,
                                            boolean nullify,
                                            int max_results)
      • get

        public T get​(long seqno)
      • _get

        public T _get​(long seqno)
        Only used for testing !!
      • get

        public java.util.List<T> get​(long from,
                                     long to)
        Returns a list of messages in the range [from .. to], including from and to
        Parameters:
        from -
        to -
        Returns:
        A list of messages, or null if none in range [from .. to] was found
      • stable

        public void stable​(long seqno)
        Nulls elements between low and seqno and forwards low
      • destroy

        public void destroy()
      • capacity

        public final int capacity()
      • size

        public int size()
      • missing

        public int missing()
      • spaceUsed

        public int spaceUsed()
      • saturation

        public double saturation()
      • iterator

        public java.util.Iterator<T> iterator()
        Returns an iterator over the elements of the ring buffer in the range [HD+1 .. HR]
        Specified by:
        iterator in interface java.lang.Iterable<T>
        Returns:
        RingBufferIterator
        Throws:
        java.util.NoSuchElementException - is HD is moved forward during the iteration
      • toString

        public java.lang.String toString()
        Overrides:
        toString in class java.lang.Object
      • index

        protected int index​(long seqno)
      • block

        protected boolean block​(long seqno)
      • count

        protected int count​(boolean missing)