Package org.jgroups.protocols
Class BatchBundler
- java.lang.Object
-
- org.jgroups.protocols.BaseBundler
-
- org.jgroups.protocols.BatchBundler
-
- All Implemented Interfaces:
Bundler
public class BatchBundler extends BaseBundler
Bundler based onBATCH
. Batches messages, keeping aBaseBundler.max_size
for every destination. When the accumulated size of the messages for a given destination P would exceed max_bytes, a MessageBatch is created and sent to P.
Additionally, a timer runs everyflush_interval
milliseconds, sending messages whose size hasn't yet reached max_size.
Contrary toTransferQueueBundler
, which maintains a max_size for all messages,BatchBundler
maintains it for every destination separately. This causes batches to be fuller than withTransferQueueBundler
: assuming 4 members, everyone sending to everyone else, and max_size = 60000: with TransferQueueBundler, a batch is sent with ~15'000 bytes of messages (60'000/4), but with BatchBundler, it has ~60'000 bytes. Fuller batches means more amortization of costs of handling single messages.- Since:
- 5.2
- Author:
- Bela Ban
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description protected class
BatchBundler.Buffer
protected class
BatchBundler.FlushTask
-
Field Summary
Fields Modifier and Type Field Description protected long
flush_interval
protected java.util.concurrent.Future<?>
flush_task
protected Address
local_addr
int
max_batch_size
protected java.util.concurrent.ConcurrentMap<Address,BatchBundler.Buffer>
msgMap
protected NullAddress
nullAddress
protected long
num_ebs_sent
protected long
num_ebs_sent_due_to_full_queue
protected long
num_ebs_sent_due_to_max_number_of_msgs
protected long
num_ebs_sent_due_to_timeout
protected long
num_msgs_sent
protected boolean
running
protected TimeScheduler
timer
-
Fields inherited from class org.jgroups.protocols.BaseBundler
avg_send_time, capacity, count, FUNC, lock, log, max_size, msg_processing_policy, msg_stats, msgs, output, transport
-
-
Constructor Summary
Constructors Constructor Description BatchBundler()
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description protected void
_send(Message msg)
double
avgBatchSize()
void
flush()
int
getCapacity()
If the bundler implementation supports a capacity (e.g.int
getQueueSize()
If the bundler has a queue and it should be managed by a queuing discipline (like Random Early Detection), then return the number of elements in the queue, else -1.void
resetStats()
void
send(Message msg)
void
start()
Called afterBundler.init(TP)
protected void
startFlushTask()
void
stop()
protected void
stopFlushTask()
void
viewChange(View view)
-
Methods inherited from class org.jgroups.protocols.BaseBundler
addMessage, getMaxSize, init, loopback, loopback, loopbackUnlessDontLoopbackIsSet, printBuffers, sendBundledMessages, sendMessageList, sendMessageList, sendMultiple, sendMultiple, sendSingle, sendSingleMessage, setCapacity, setMaxSize, size
-
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
-
Methods inherited from interface org.jgroups.protocols.Bundler
renameThread
-
-
-
-
Field Detail
-
flush_interval
protected long flush_interval
-
max_batch_size
public int max_batch_size
-
local_addr
protected volatile Address local_addr
-
num_msgs_sent
protected long num_msgs_sent
-
num_ebs_sent
protected long num_ebs_sent
-
num_ebs_sent_due_to_full_queue
protected long num_ebs_sent_due_to_full_queue
-
num_ebs_sent_due_to_max_number_of_msgs
protected long num_ebs_sent_due_to_max_number_of_msgs
-
num_ebs_sent_due_to_timeout
protected long num_ebs_sent_due_to_timeout
-
msgMap
protected java.util.concurrent.ConcurrentMap<Address,BatchBundler.Buffer> msgMap
-
nullAddress
protected final NullAddress nullAddress
-
timer
protected TimeScheduler timer
-
running
protected volatile boolean running
-
flush_task
protected java.util.concurrent.Future<?> flush_task
-
-
Method Detail
-
avgBatchSize
public double avgBatchSize()
-
resetStats
public void resetStats()
- Specified by:
resetStats
in interfaceBundler
- Overrides:
resetStats
in classBaseBundler
-
viewChange
public void viewChange(View view)
- Specified by:
viewChange
in interfaceBundler
- Overrides:
viewChange
in classBaseBundler
-
start
public void start()
Description copied from interface:Bundler
Called afterBundler.init(TP)
- Specified by:
start
in interfaceBundler
- Overrides:
start
in classBaseBundler
-
stop
public void stop()
- Specified by:
stop
in interfaceBundler
- Overrides:
stop
in classBaseBundler
-
send
public void send(Message msg) throws java.lang.Exception
- Specified by:
send
in interfaceBundler
- Overrides:
send
in classBaseBundler
- Throws:
java.lang.Exception
-
getQueueSize
public int getQueueSize()
Description copied from interface:Bundler
If the bundler has a queue and it should be managed by a queuing discipline (like Random Early Detection), then return the number of elements in the queue, else -1. In the latter case, the queue won't be managed.
This method needs to be fast as it might get called on every message to be sent.- Specified by:
getQueueSize
in interfaceBundler
- Overrides:
getQueueSize
in classBaseBundler
-
getCapacity
public int getCapacity()
Description copied from interface:Bundler
If the bundler implementation supports a capacity (e.g.RingBufferBundler
, then return it, else return -1- Specified by:
getCapacity
in interfaceBundler
- Overrides:
getCapacity
in classBaseBundler
-
_send
protected void _send(Message msg)
-
startFlushTask
protected void startFlushTask()
-
stopFlushTask
protected void stopFlushTask()
-
flush
public void flush()
-
-