Package org.jgroups.util
Class ForwardQueue
- java.lang.Object
-
- org.jgroups.util.ForwardQueue
-
public class ForwardQueue extends java.lang.ObjectForwards messages in FIFO order to a destination. Uses IDs to prevent duplicates. Used byFORWARD_TO_COORD.- Since:
- 3.3
- Author:
- Bela Ban
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description protected classForwardQueue.Flusher
-
Field Summary
Fields Modifier and Type Field Description protected Promise<java.lang.Long>ack_promiseUsed for each resent message to wait until the message has been receivedprotected java.util.concurrent.ConcurrentMap<Address,BoundedHashMap<java.lang.Long,java.lang.Long>>delivery_tableprotected intdelivery_table_max_sizeSize of the set to store received seqnos (for duplicate checking)protected Protocoldown_protprotected ForwardQueue.Flusherflusherprotected booleanflushingSet when we block all sending threads to resend all messages from forward_tableprotected java.util.NavigableMap<java.lang.Long,Message>forward_tableMaintains messages forwarded to the target which which no ack has been received yet.protected java.util.concurrent.atomic.AtomicIntegerin_flight_sendsKeeps track of the threads sending messagesprotected Addresslocal_addrprotected Loglogprotected booleanrunningprotected java.util.concurrent.locks.Conditionsend_condprotected java.util.concurrent.locks.Locksend_lockprotected Protocolup_prot
-
Constructor Summary
Constructors Constructor Description ForwardQueue(Log log)
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description voidack(long id)protected voidblock()protected booleancanDeliver(Address sender, long seqno)Checks if seqno has already been received from sender.intdeliveryTableSize()Total size of all queues of the delivery tableprotected voiddoFlush(Address new_target)voidflush(Address new_target, java.util.List<Address> mbrs)protected voidflushMessagesInForwardTable(Address target)Sends all messages currently in forward_table to the new target (changing the dest field).intgetDeliveryTableMaxSize()ProtocolgetDownProt()AddressgetLocalAddr()ProtocolgetUpProt()voidreceive(long id, Message msg)voidsend(long id, Message msg)voidsetDeliveryTableMaxSize(int max_size)voidsetDownProt(Protocol down_prot)voidsetLocalAddr(Address local_addr)voidsetUpProt(Protocol up_prot)intsize()voidstart()protected voidstartFlusher(Address new_coord)voidstop()protected voidstopFlusher()protected voidunblockAll()
-
-
-
Field Detail
-
up_prot
protected Protocol up_prot
-
down_prot
protected Protocol down_prot
-
local_addr
protected Address local_addr
-
forward_table
protected final java.util.NavigableMap<java.lang.Long,Message> forward_table
Maintains messages forwarded to the target which which no ack has been received yet. Needs to be sorted so we can resend them in the right order
-
send_lock
protected final java.util.concurrent.locks.Lock send_lock
-
send_cond
protected final java.util.concurrent.locks.Condition send_cond
-
flushing
protected volatile boolean flushing
Set when we block all sending threads to resend all messages from forward_table
-
running
protected volatile boolean running
-
in_flight_sends
protected final java.util.concurrent.atomic.AtomicInteger in_flight_sends
Keeps track of the threads sending messages
-
delivery_table
protected final java.util.concurrent.ConcurrentMap<Address,BoundedHashMap<java.lang.Long,java.lang.Long>> delivery_table
-
flusher
protected volatile ForwardQueue.Flusher flusher
-
ack_promise
protected final Promise<java.lang.Long> ack_promise
Used for each resent message to wait until the message has been received
-
log
protected final Log log
-
delivery_table_max_size
protected int delivery_table_max_size
Size of the set to store received seqnos (for duplicate checking)
-
-
Constructor Detail
-
ForwardQueue
public ForwardQueue(Log log)
-
-
Method Detail
-
getUpProt
public Protocol getUpProt()
-
setUpProt
public void setUpProt(Protocol up_prot)
-
getDownProt
public Protocol getDownProt()
-
setDownProt
public void setDownProt(Protocol down_prot)
-
getLocalAddr
public Address getLocalAddr()
-
setLocalAddr
public void setLocalAddr(Address local_addr)
-
getDeliveryTableMaxSize
public int getDeliveryTableMaxSize()
-
setDeliveryTableMaxSize
public void setDeliveryTableMaxSize(int max_size)
-
deliveryTableSize
public int deliveryTableSize()
Total size of all queues of the delivery table
-
start
public void start()
-
stop
public void stop()
-
send
public void send(long id, Message msg)
-
receive
public void receive(long id, Message msg)
-
ack
public void ack(long id)
-
size
public int size()
-
doFlush
protected void doFlush(Address new_target) throws java.lang.InterruptedException
- Throws:
java.lang.InterruptedException
-
flushMessagesInForwardTable
protected void flushMessagesInForwardTable(Address target)
Sends all messages currently in forward_table to the new target (changing the dest field). This needs to be done, so the underlying reliable unicast protocol (e.g. UNICAST) adds these messages to its retransmission mechanism
Note that we need to resend the messages in order of their seqnos ! We also need to prevent other message from being inserted until we're done, that's why there's synchronization.
Access to the forward_table doesn't need to be synchronized as there won't be any insertions during flushing (all down-threads are blocked)
-
canDeliver
protected boolean canDeliver(Address sender, long seqno)
Checks if seqno has already been received from sender. This weeds out duplicates. Note that this method is never called concurrently for the same sender.
-
block
protected void block()
-
unblockAll
protected void unblockAll()
-
startFlusher
protected void startFlusher(Address new_coord)
-
stopFlusher
protected void stopFlusher()
-
-