Package org.jgroups.protocols
Class TransferQueueBundler2
- java.lang.Object
-
- org.jgroups.protocols.TransferQueueBundler2
-
- All Implemented Interfaces:
java.lang.Runnable
,Bundler
public class TransferQueueBundler2 extends java.lang.Object implements Bundler, java.lang.Runnable
This bundler adds all (unicast or multicast) messages to a queue until max size has been exceeded, but does send messages immediately when no other messages are available. https://issues.redhat.com/browse/JGRP-1540
The difference toTransferQueueBundler
is that a size is maintainedper destination
and we maintain byte arrays of max_bundle_size per destination into which we marshall a message directly when it is sent.
-
-
Field Summary
Fields Modifier and Type Field Description protected AverageMinMax
avg_fill_count
protected java.lang.Thread
bundler_thread
protected int
capacity
protected Log
log
protected int
max_size
Maximum number of bytes for messages to be queued until they are sent.protected java.util.Map<Address,org.jgroups.protocols.TransferQueueBundler2.Buffer>
messages
protected static NullAddress
NIL
protected long
num_sends_because_full_queue
protected long
num_sends_because_no_msgs
protected long
poll_timeout
protected java.util.concurrent.BlockingQueue<Message>
queue
protected java.util.List<Message>
remove_queue
protected boolean
running
protected static java.lang.String
THREAD_NAME
protected TP
transport
-
Constructor Summary
Constructors Modifier Constructor Description TransferQueueBundler2()
TransferQueueBundler2(int capacity)
protected
TransferQueueBundler2(java.util.concurrent.BlockingQueue<Message> queue)
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Modifier and Type Method Description protected void
addAndSendIfSizeExceeded(Message msg)
protected static int
assertPositive(int value, java.lang.String message)
protected void
drain()
Takes all messages from the queue, adds them to the hashmap and then sends all bundled messagesjava.lang.String
dump()
int
getCapacity()
If the bundler implementation supports a capacity (e.g.int
getMaxSize()
Maximum number of bytes for messages to be queued until they are sentint
getQueueSize()
If the bundler has a queue and it should be managed by a queuing discipline (like Random Early Detection), then return the number of elements in the queue, else -1.protected boolean
hasMessages()
void
init(TP transport)
Called after creation of the bundlerint
removeQueueSize()
TransferQueueBundler2
removeQueueSize(int size)
void
renameThread()
void
resetStats()
void
run()
void
send(Message msg)
protected void
sendBundledMessages()
Bundler
setCapacity(int c)
Bundler
setMaxSize(int s)
int
size()
The number of unsent messages in the bundlervoid
start()
Called afterBundler.init(TP)
void
stop()
void
viewChange(View view)
-
-
-
Field Detail
-
max_size
protected int max_size
Maximum number of bytes for messages to be queued until they are sent. This value needs to be smaller than the largest datagram packet size in case of UDP
-
capacity
protected int capacity
-
poll_timeout
protected long poll_timeout
-
transport
protected TP transport
-
log
protected Log log
-
queue
protected java.util.concurrent.BlockingQueue<Message> queue
-
remove_queue
protected java.util.List<Message> remove_queue
-
bundler_thread
protected volatile java.lang.Thread bundler_thread
-
running
protected volatile boolean running
-
num_sends_because_full_queue
protected long num_sends_because_full_queue
-
num_sends_because_no_msgs
protected long num_sends_because_no_msgs
-
avg_fill_count
protected final AverageMinMax avg_fill_count
-
THREAD_NAME
protected static final java.lang.String THREAD_NAME
- See Also:
- Constant Field Values
-
messages
protected final java.util.Map<Address,org.jgroups.protocols.TransferQueueBundler2.Buffer> messages
-
NIL
protected static final NullAddress NIL
-
-
Constructor Detail
-
TransferQueueBundler2
public TransferQueueBundler2()
-
TransferQueueBundler2
protected TransferQueueBundler2(java.util.concurrent.BlockingQueue<Message> queue)
-
TransferQueueBundler2
public TransferQueueBundler2(int capacity)
-
-
Method Detail
-
getCapacity
public int getCapacity()
Description copied from interface:Bundler
If the bundler implementation supports a capacity (e.g.RingBufferBundler
, then return it, else return -1- Specified by:
getCapacity
in interfaceBundler
-
setCapacity
public Bundler setCapacity(int c)
-
getMaxSize
public int getMaxSize()
Description copied from interface:Bundler
Maximum number of bytes for messages to be queued until they are sent- Specified by:
getMaxSize
in interfaceBundler
-
setMaxSize
public Bundler setMaxSize(int s)
- Specified by:
setMaxSize
in interfaceBundler
-
getQueueSize
public int getQueueSize()
Description copied from interface:Bundler
If the bundler has a queue and it should be managed by a queuing discipline (like Random Early Detection), then return the number of elements in the queue, else -1. In the latter case, the queue won't be managed.
This method needs to be fast as it might get called on every message to be sent.- Specified by:
getQueueSize
in interfaceBundler
-
removeQueueSize
public int removeQueueSize()
-
removeQueueSize
public TransferQueueBundler2 removeQueueSize(int size)
-
dump
public java.lang.String dump()
-
init
public void init(TP transport)
Description copied from interface:Bundler
Called after creation of the bundler
-
resetStats
public void resetStats()
- Specified by:
resetStats
in interfaceBundler
-
viewChange
public void viewChange(View view)
- Specified by:
viewChange
in interfaceBundler
-
start
public void start()
Description copied from interface:Bundler
Called afterBundler.init(TP)
-
renameThread
public void renameThread()
- Specified by:
renameThread
in interfaceBundler
-
size
public int size()
Description copied from interface:Bundler
The number of unsent messages in the bundler
-
send
public void send(Message msg) throws java.lang.Exception
-
run
public void run()
- Specified by:
run
in interfacejava.lang.Runnable
-
hasMessages
protected boolean hasMessages()
-
addAndSendIfSizeExceeded
protected void addAndSendIfSizeExceeded(Message msg)
-
sendBundledMessages
protected void sendBundledMessages()
-
drain
protected void drain()
Takes all messages from the queue, adds them to the hashmap and then sends all bundled messages
-
assertPositive
protected static int assertPositive(int value, java.lang.String message)
-
-