Class ConcurrentBlockingRingBuffer<T>

  • All Implemented Interfaces:
    java.lang.Iterable<T>, java.util.Collection<T>, java.util.concurrent.BlockingQueue<T>, java.util.Queue<T>

    public class ConcurrentBlockingRingBuffer<T>
    extends java.lang.Object
    implements java.util.concurrent.BlockingQueue<T>
    MPSC queue, based on a ring buffer implementation which optionally blocks on adding or removing of elements.
    The main fields are read-index (ri), write-index (wi) and size (all AtomicIntegers). Producers change wi and size, the single consumer changes ri and size.
    A producer tries to increment size. If unsuccessful, it drops the message (or blocks). If successful, it increments wi and writes the element to array[wi]. If size was incremented from 0 -> 1, a producer also wakes up the consumer.
    The single consumer tries to remove size elements (drainTo() or poll()), but returns when a null element is found (see below). It then decrements size by the number of removed elements (N for drainTo() and 1 for a successful poll()). If no elements are in the ring buffer, the consumer blocks (if configured), until it is woken up by a producer adding the first element to the empty queue.
    Note that this class is designed for a single consumer and has undefined behavior if multiple consumers are used.
    There is a special case that has the consumer busy-polling (ri=wi=1): assume we have producers P1-P3, each adding an element concurrently. P3 successfully incremented size from 0->1 and set wi=3. P2 also incremented size from 1->2 and set wi=1, but didn't yet write to the array. P3 incremented size from 2->3 and set wi=2, but also didn't write to the array yet. P1 woke up the consumer, but the consumer saw array[1]=null, array[2]=null, array[3]=el3 (written by P3). The consumer therefore returns on the first element because it is null, but continues looping because size=3. Only when P1 and P2 write their respective elements will the consumer be able to make progress.
    Since:
    5.5.0
    Author:
    Bela Ban
    • Field Summary

      Fields 
      Modifier and Type Field Description
      protected java.util.concurrent.atomic.AtomicReferenceArray<T> array  
      protected boolean block_on_empty  
      protected boolean block_on_full  
      protected int capacity  
      protected static java.util.function.IntUnaryOperator DECR  
      protected static java.util.function.IntBinaryOperator DECR_DELTA  
      protected java.util.function.IntUnaryOperator INCR  
      protected java.util.function.IntUnaryOperator INCR_INDEX  
      protected java.util.concurrent.locks.Lock lock  
      protected java.util.concurrent.locks.Condition not_empty  
      protected java.util.concurrent.locks.Condition not_full  
      protected int ri  
      protected java.util.concurrent.atomic.AtomicInteger size  
      protected java.util.concurrent.atomic.AtomicInteger wi  
    • Constructor Summary

      Constructors 
      Constructor Description
      ConcurrentBlockingRingBuffer​(int capacity, boolean block_on_empty, boolean block_on_full)  
    • Method Summary

      All Methods Instance Methods Concrete Methods 
      Modifier and Type Method Description
      boolean add​(T t)  
      boolean addAll​(java.util.Collection<? extends T> c)  
      protected int advance​(int idx)  
      protected int advance​(int idx, int delta)  
      void clear()  
      boolean contains​(java.lang.Object o)  
      boolean containsAll​(java.util.Collection<?> c)  
      int drainTo​(java.util.Collection<? super T> c)  
      int drainTo​(java.util.Collection<? super T> c, int max)  
      T element()  
      boolean isEmpty()  
      java.util.Iterator<T> iterator()  
      boolean offer​(T t)  
      boolean offer​(T t, long timeout, java.util.concurrent.TimeUnit unit)  
      T peek()  
      T poll()  
      T poll​(long timeout, java.util.concurrent.TimeUnit unit)  
      void put​(T t)  
      int remainingCapacity()  
      T remove()  
      boolean remove​(java.lang.Object o)  
      boolean removeAll​(java.util.Collection<?> c)  
      boolean removeIf​(java.util.function.Predicate<? super T> filter)  
      boolean retainAll​(java.util.Collection<?> c)  
      protected void signalNotEmpty()  
      protected void signalNotFull()  
      int size()  
      T take()  
      java.lang.Object[] toArray()  
      <T1> T1[] toArray​(T1[] a)  
      java.lang.String toString()  
      • Methods inherited from class java.lang.Object

        clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
      • Methods inherited from interface java.util.Collection

        equals, hashCode, parallelStream, spliterator, stream, toArray
      • Methods inherited from interface java.lang.Iterable

        forEach
    • Field Detail

      • capacity

        protected final int capacity
      • array

        protected final java.util.concurrent.atomic.AtomicReferenceArray<T> array
      • wi

        protected final java.util.concurrent.atomic.AtomicInteger wi
      • ri

        protected int ri
      • size

        protected final java.util.concurrent.atomic.AtomicInteger size
      • block_on_empty

        protected final boolean block_on_empty
      • block_on_full

        protected final boolean block_on_full
      • lock

        protected final java.util.concurrent.locks.Lock lock
      • not_empty

        protected final java.util.concurrent.locks.Condition not_empty
      • not_full

        protected final java.util.concurrent.locks.Condition not_full
      • INCR

        protected final java.util.function.IntUnaryOperator INCR
      • INCR_INDEX

        protected final java.util.function.IntUnaryOperator INCR_INDEX
      • DECR

        protected static final java.util.function.IntUnaryOperator DECR
      • DECR_DELTA

        protected static final java.util.function.IntBinaryOperator DECR_DELTA
    • Constructor Detail

      • ConcurrentBlockingRingBuffer

        public ConcurrentBlockingRingBuffer​(int capacity,
                                            boolean block_on_empty,
                                            boolean block_on_full)
    • Method Detail

      • put

        public void put​(T t)
                 throws java.lang.InterruptedException
        Specified by:
        put in interface java.util.concurrent.BlockingQueue<T>
        Throws:
        java.lang.InterruptedException
      • add

        public boolean add​(T t)
        Specified by:
        add in interface java.util.concurrent.BlockingQueue<T>
        Specified by:
        add in interface java.util.Collection<T>
        Specified by:
        add in interface java.util.Queue<T>
      • addAll

        public boolean addAll​(java.util.Collection<? extends T> c)
        Specified by:
        addAll in interface java.util.Collection<T>
      • offer

        public boolean offer​(T t)
        Specified by:
        offer in interface java.util.concurrent.BlockingQueue<T>
        Specified by:
        offer in interface java.util.Queue<T>
      • offer

        public boolean offer​(T t,
                             long timeout,
                             java.util.concurrent.TimeUnit unit)
                      throws java.lang.InterruptedException
        Specified by:
        offer in interface java.util.concurrent.BlockingQueue<T>
        Throws:
        java.lang.InterruptedException
      • clear

        public void clear()
        Specified by:
        clear in interface java.util.Collection<T>
      • isEmpty

        public boolean isEmpty()
        Specified by:
        isEmpty in interface java.util.Collection<T>
      • poll

        public T poll()
        Specified by:
        poll in interface java.util.Queue<T>
      • poll

        public T poll​(long timeout,
                      java.util.concurrent.TimeUnit unit)
               throws java.lang.InterruptedException
        Specified by:
        poll in interface java.util.concurrent.BlockingQueue<T>
        Throws:
        java.lang.InterruptedException
      • drainTo

        public int drainTo​(java.util.Collection<? super T> c,
                           int max)
        Specified by:
        drainTo in interface java.util.concurrent.BlockingQueue<T>
      • drainTo

        public int drainTo​(java.util.Collection<? super T> c)
        Specified by:
        drainTo in interface java.util.concurrent.BlockingQueue<T>
      • element

        public T element()
        Specified by:
        element in interface java.util.Queue<T>
      • peek

        public T peek()
        Specified by:
        peek in interface java.util.Queue<T>
      • take

        public T take()
               throws java.lang.InterruptedException
        Specified by:
        take in interface java.util.concurrent.BlockingQueue<T>
        Throws:
        java.lang.InterruptedException
      • remove

        public T remove()
        Specified by:
        remove in interface java.util.Queue<T>
      • remove

        public boolean remove​(java.lang.Object o)
        Specified by:
        remove in interface java.util.concurrent.BlockingQueue<T>
        Specified by:
        remove in interface java.util.Collection<T>
      • containsAll

        public boolean containsAll​(java.util.Collection<?> c)
        Specified by:
        containsAll in interface java.util.Collection<T>
      • contains

        public boolean contains​(java.lang.Object o)
        Specified by:
        contains in interface java.util.concurrent.BlockingQueue<T>
        Specified by:
        contains in interface java.util.Collection<T>
      • iterator

        public java.util.Iterator<T> iterator()
        Specified by:
        iterator in interface java.util.Collection<T>
        Specified by:
        iterator in interface java.lang.Iterable<T>
      • toArray

        public java.lang.Object[] toArray()
        Specified by:
        toArray in interface java.util.Collection<T>
      • toArray

        public <T1> T1[] toArray​(T1[] a)
        Specified by:
        toArray in interface java.util.Collection<T>
      • removeAll

        public boolean removeAll​(java.util.Collection<?> c)
        Specified by:
        removeAll in interface java.util.Collection<T>
      • removeIf

        public boolean removeIf​(java.util.function.Predicate<? super T> filter)
        Specified by:
        removeIf in interface java.util.Collection<T>
      • retainAll

        public boolean retainAll​(java.util.Collection<?> c)
        Specified by:
        retainAll in interface java.util.Collection<T>
      • size

        public int size()
        Specified by:
        size in interface java.util.Collection<T>
      • remainingCapacity

        public int remainingCapacity()
        Specified by:
        remainingCapacity in interface java.util.concurrent.BlockingQueue<T>
      • toString

        public java.lang.String toString()
        Overrides:
        toString in class java.lang.Object
      • advance

        protected int advance​(int idx)
      • advance

        protected int advance​(int idx,
                              int delta)
      • signalNotEmpty

        protected void signalNotEmpty()
      • signalNotFull

        protected void signalNotFull()