Package org.jgroups.util
Class ForwardQueue
- java.lang.Object
-
- org.jgroups.util.ForwardQueue
-
public class ForwardQueue extends java.lang.Object
Forwards messages in FIFO order to a destination. Uses IDs to prevent duplicates. Used byFORWARD_TO_COORD
.- Since:
- 3.3
- Author:
- Bela Ban
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description protected class
ForwardQueue.Flusher
-
Field Summary
Fields Modifier and Type Field Description protected Promise<java.lang.Long>
ack_promise
Used for each resent message to wait until the message has been receivedprotected java.util.concurrent.ConcurrentMap<Address,BoundedHashMap<java.lang.Long,java.lang.Long>>
delivery_table
protected int
delivery_table_max_size
Size of the set to store received seqnos (for duplicate checking)protected Protocol
down_prot
protected ForwardQueue.Flusher
flusher
protected boolean
flushing
Set when we block all sending threads to resend all messages from forward_tableprotected java.util.NavigableMap<java.lang.Long,Message>
forward_table
Maintains messages forwarded to the target which which no ack has been received yet.protected java.util.concurrent.atomic.AtomicInteger
in_flight_sends
Keeps track of the threads sending messagesprotected Address
local_addr
protected Log
log
protected boolean
running
protected java.util.concurrent.locks.Condition
send_cond
protected java.util.concurrent.locks.Lock
send_lock
protected Protocol
up_prot
-
Constructor Summary
Constructors Constructor Description ForwardQueue(Log log)
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description void
ack(long id)
protected void
block()
protected boolean
canDeliver(Address sender, long seqno)
Checks if seqno has already been received from sender.int
deliveryTableSize()
Total size of all queues of the delivery tableprotected void
doFlush(Address new_target)
void
flush(Address new_target, java.util.List<Address> mbrs)
protected void
flushMessagesInForwardTable(Address target)
Sends all messages currently in forward_table to the new target (changing the dest field).int
getDeliveryTableMaxSize()
Protocol
getDownProt()
Address
getLocalAddr()
Protocol
getUpProt()
void
receive(long id, Message msg)
void
send(long id, Message msg)
void
setDeliveryTableMaxSize(int max_size)
void
setDownProt(Protocol down_prot)
void
setLocalAddr(Address local_addr)
void
setUpProt(Protocol up_prot)
int
size()
void
start()
protected void
startFlusher(Address new_coord)
void
stop()
protected void
stopFlusher()
protected void
unblockAll()
-
-
-
Field Detail
-
up_prot
protected Protocol up_prot
-
down_prot
protected Protocol down_prot
-
local_addr
protected Address local_addr
-
forward_table
protected final java.util.NavigableMap<java.lang.Long,Message> forward_table
Maintains messages forwarded to the target which which no ack has been received yet. Needs to be sorted so we can resend them in the right order
-
send_lock
protected final java.util.concurrent.locks.Lock send_lock
-
send_cond
protected final java.util.concurrent.locks.Condition send_cond
-
flushing
protected volatile boolean flushing
Set when we block all sending threads to resend all messages from forward_table
-
running
protected volatile boolean running
-
in_flight_sends
protected final java.util.concurrent.atomic.AtomicInteger in_flight_sends
Keeps track of the threads sending messages
-
delivery_table
protected final java.util.concurrent.ConcurrentMap<Address,BoundedHashMap<java.lang.Long,java.lang.Long>> delivery_table
-
flusher
protected volatile ForwardQueue.Flusher flusher
-
ack_promise
protected final Promise<java.lang.Long> ack_promise
Used for each resent message to wait until the message has been received
-
log
protected final Log log
-
delivery_table_max_size
protected int delivery_table_max_size
Size of the set to store received seqnos (for duplicate checking)
-
-
Constructor Detail
-
ForwardQueue
public ForwardQueue(Log log)
-
-
Method Detail
-
getUpProt
public Protocol getUpProt()
-
setUpProt
public void setUpProt(Protocol up_prot)
-
getDownProt
public Protocol getDownProt()
-
setDownProt
public void setDownProt(Protocol down_prot)
-
getLocalAddr
public Address getLocalAddr()
-
setLocalAddr
public void setLocalAddr(Address local_addr)
-
getDeliveryTableMaxSize
public int getDeliveryTableMaxSize()
-
setDeliveryTableMaxSize
public void setDeliveryTableMaxSize(int max_size)
-
deliveryTableSize
public int deliveryTableSize()
Total size of all queues of the delivery table
-
start
public void start()
-
stop
public void stop()
-
send
public void send(long id, Message msg)
-
receive
public void receive(long id, Message msg)
-
ack
public void ack(long id)
-
size
public int size()
-
doFlush
protected void doFlush(Address new_target) throws java.lang.InterruptedException
- Throws:
java.lang.InterruptedException
-
flushMessagesInForwardTable
protected void flushMessagesInForwardTable(Address target)
Sends all messages currently in forward_table to the new target (changing the dest field). This needs to be done, so the underlying reliable unicast protocol (e.g. UNICAST) adds these messages to its retransmission mechanism
Note that we need to resend the messages in order of their seqnos ! We also need to prevent other message from being inserted until we're done, that's why there's synchronization.
Access to the forward_table doesn't need to be synchronized as there won't be any insertions during flushing (all down-threads are blocked)
-
canDeliver
protected boolean canDeliver(Address sender, long seqno)
Checks if seqno has already been received from sender. This weeds out duplicates. Note that this method is never called concurrently for the same sender.
-
block
protected void block()
-
unblockAll
protected void unblockAll()
-
startFlusher
protected void startFlusher(Address new_coord)
-
stopFlusher
protected void stopFlusher()
-
-