Package org.jgroups.protocols
Class RingBufferBundlerLockless
- java.lang.Object
 - 
- org.jgroups.protocols.BaseBundler
 - 
- org.jgroups.protocols.RingBufferBundlerLockless
 
 
 
- 
- All Implemented Interfaces:
 Bundler
public class RingBufferBundlerLockless extends BaseBundler
Bundler which doesn't use locks but relies on CAS. There is 1 reader thread which gets unparked by (exactly one) writer when the max size has been exceeded, or no other threads are sending messages.- Since:
 - 4.0
 - Author:
 - Bela Ban
 
 
- 
- 
Field Summary
Fields Modifier and Type Field Description protected java.util.concurrent.atomic.AtomicLongaccumulated_bytesprotected Message[]bufprotected Runnerbundler_threadprotected java.util.concurrent.atomic.AtomicIntegernum_threadsprotected intread_indexprotected java.lang.Runnablerun_functionprotected java.util.concurrent.atomic.AtomicIntegersizeprotected static java.lang.StringTHREAD_NAMEprotected java.util.concurrent.atomic.AtomicIntegertmp_write_indexprotected java.util.concurrent.atomic.AtomicBooleanunparkingprotected intwrite_indexprotected java.util.concurrent.atomic.AtomicIntegerwrite_permits 
- 
Constructor Summary
Constructors Constructor Description RingBufferBundlerLockless()RingBufferBundlerLockless(int capacity) 
- 
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Modifier and Type Method Description int_readMessages()protected intadvanceWriteIndex()protected static intassertPositive(int value, java.lang.String message)protected intgetPermitToWrite()intgetQueueSize()If the bundler has a queue and it should be managed by a queuing discipline (like Random Early Detection), then return the number of elements in the queue, else -1.protected intgetWriteIndex()protected intincrement(int index)protected intindex(int idx)voidinit(TP transport)Called after creation of the bundlerprotected intmarshalMessagesToSameDestination(Address dest, Message[] buf, int start_index, int available_msgs, int max_bundle_size)intreadIndex()protected voidreadMessages()voidreset()voidsend(Message msg)protected intsendBundledMessages(Message[] buf, int read_index, int available_msgs)Read and send messages in range [read-index ..intsize()Returns the total number of messages in the hashmapvoidstart()Called afterBundler.init(TP)voidstop()java.lang.StringtoString()intwriteIndex()- 
Methods inherited from class org.jgroups.protocols.BaseBundler
addMessage, clearMessages, sendBundledMessages, sendMessageList, sendSingleMessage, viewChange 
- 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait 
- 
Methods inherited from interface org.jgroups.protocols.Bundler
getStats, resetStats 
 - 
 
 - 
 
- 
- 
Field Detail
- 
buf
protected Message[] buf
 
- 
read_index
protected int read_index
 
- 
write_index
protected volatile int write_index
 
- 
tmp_write_index
protected final java.util.concurrent.atomic.AtomicInteger tmp_write_index
 
- 
write_permits
protected final java.util.concurrent.atomic.AtomicInteger write_permits
 
- 
size
protected final java.util.concurrent.atomic.AtomicInteger size
 
- 
num_threads
protected final java.util.concurrent.atomic.AtomicInteger num_threads
 
- 
accumulated_bytes
protected final java.util.concurrent.atomic.AtomicLong accumulated_bytes
 
- 
unparking
protected final java.util.concurrent.atomic.AtomicBoolean unparking
 
- 
bundler_thread
protected Runner bundler_thread
 
- 
THREAD_NAME
protected static final java.lang.String THREAD_NAME
- See Also:
 - Constant Field Values
 
 
- 
run_function
protected final java.lang.Runnable run_function
 
 - 
 
- 
Method Detail
- 
readIndex
public int readIndex()
 
- 
writeIndex
public int writeIndex()
 
- 
size
public int size()
Description copied from class:BaseBundlerReturns the total number of messages in the hashmap- Specified by:
 sizein interfaceBundler- Overrides:
 sizein classBaseBundler
 
- 
getQueueSize
public int getQueueSize()
Description copied from interface:BundlerIf the bundler has a queue and it should be managed by a queuing discipline (like Random Early Detection), then return the number of elements in the queue, else -1. In the latter case, the queue won't be managed.
This method needs to be fast as it might get called on every message to be sent. 
- 
init
public void init(TP transport)
Description copied from interface:BundlerCalled after creation of the bundler- Specified by:
 initin interfaceBundler- Overrides:
 initin classBaseBundler- Parameters:
 transport- the transport, for further reference
 
- 
reset
public void reset()
 
- 
start
public void start()
Description copied from interface:BundlerCalled afterBundler.init(TP)- Specified by:
 startin interfaceBundler- Overrides:
 startin classBaseBundler
 
- 
stop
public void stop()
- Specified by:
 stopin interfaceBundler- Overrides:
 stopin classBaseBundler
 
- 
send
public void send(Message msg) throws java.lang.Exception
- Specified by:
 sendin interfaceBundler- Overrides:
 sendin classBaseBundler- Throws:
 java.lang.Exception
 
- 
getWriteIndex
protected int getWriteIndex()
 
- 
getPermitToWrite
protected int getPermitToWrite()
 
- 
advanceWriteIndex
protected int advanceWriteIndex()
 
- 
readMessages
protected void readMessages()
 
- 
sendBundledMessages
protected int sendBundledMessages(Message[] buf, int read_index, int available_msgs)
Read and send messages in range [read-index .. read-index+available_msgs-1] 
- 
toString
public java.lang.String toString()
- Overrides:
 toStringin classjava.lang.Object
 
- 
_readMessages
public int _readMessages()
 
- 
marshalMessagesToSameDestination
protected int marshalMessagesToSameDestination(Address dest, Message[] buf, int start_index, int available_msgs, int max_bundle_size) throws java.lang.Exception
- Throws:
 java.lang.Exception
 
- 
increment
protected final int increment(int index)
 
- 
index
protected final int index(int idx)
 
- 
assertPositive
protected static int assertPositive(int value, java.lang.String message) 
 - 
 
 -