org.jgroups.util
Class ForwardQueue

java.lang.Object
  extended by org.jgroups.util.ForwardQueue

public class ForwardQueue
extends java.lang.Object

Forwards messages in FIFO order to a destination. Uses IDs to prevent duplicates. Used by FORWARD_TO_COORD.

Since:
3.3
Author:
Bela Ban

Nested Class Summary
protected  class ForwardQueue.Flusher
           
 
Field Summary
protected  Promise<java.lang.Long> ack_promise
          Used for each resent message to wait until the message has been received
protected  java.util.concurrent.ConcurrentMap<Address,java.util.NavigableSet<java.lang.Long>> delivery_table
           
protected  int delivery_table_max_size
          Size of the set to store received seqnos (for duplicate checking)
protected  Protocol down_prot
           
protected  ForwardQueue.Flusher flusher
           
protected  boolean flushing
          Set when we block all sending threads to resend all messages from forward_table
protected  java.util.NavigableMap<java.lang.Long,Message> forward_table
          Maintains messages forwarded to the target which which no ack has been received yet.
protected  java.util.concurrent.atomic.AtomicInteger in_flight_sends
          Keeps track of the threads sending messages
protected  Address local_addr
           
protected  Log log
           
protected  boolean running
           
protected  java.util.concurrent.locks.Condition send_cond
           
protected  java.util.concurrent.locks.Lock send_lock
           
protected  Protocol up_prot
           
 
Constructor Summary
ForwardQueue(Log log)
           
 
Method Summary
 void ack(long id)
           
protected  void block()
           
protected  boolean canDeliver(Address sender, long seqno)
          Checks if seqno has already been received from sender.
 int deliveryTableSize()
          Total size of all queues of the delivery table
protected  void doFlush(Address new_target)
           
 void flush(Address new_target, java.util.List<Address> mbrs)
           
protected  void flushMessagesInForwardTable(Address target)
          Sends all messages currently in forward_table to the new target (changing the dest field).
 int getDeliveryTableMaxSize()
           
 Protocol getDownProt()
           
 Address getLocalAddr()
           
 Protocol getUpProt()
           
 void receive(long id, Message msg)
           
 void send(long id, Message msg)
           
 void setDeliveryTableMaxSize(int max_size)
           
 void setDownProt(Protocol down_prot)
           
 void setLocalAddr(Address local_addr)
           
 void setUpProt(Protocol up_prot)
           
 int size()
           
 void start()
           
protected  void startFlusher(Address new_coord)
           
 void stop()
           
protected  void stopFlusher()
           
protected  void unblockAll()
           
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Field Detail

up_prot

protected Protocol up_prot

down_prot

protected Protocol down_prot

local_addr

protected Address local_addr

forward_table

protected final java.util.NavigableMap<java.lang.Long,Message> forward_table
Maintains messages forwarded to the target which which no ack has been received yet. Needs to be sorted so we can resend them in the right order


send_lock

protected final java.util.concurrent.locks.Lock send_lock

send_cond

protected final java.util.concurrent.locks.Condition send_cond

flushing

protected volatile boolean flushing
Set when we block all sending threads to resend all messages from forward_table


running

protected volatile boolean running

in_flight_sends

protected final java.util.concurrent.atomic.AtomicInteger in_flight_sends
Keeps track of the threads sending messages


delivery_table

protected final java.util.concurrent.ConcurrentMap<Address,java.util.NavigableSet<java.lang.Long>> delivery_table

flusher

protected volatile ForwardQueue.Flusher flusher

ack_promise

protected final Promise<java.lang.Long> ack_promise
Used for each resent message to wait until the message has been received


log

protected final Log log

delivery_table_max_size

protected int delivery_table_max_size
Size of the set to store received seqnos (for duplicate checking)

Constructor Detail

ForwardQueue

public ForwardQueue(Log log)
Method Detail

getUpProt

public Protocol getUpProt()

setUpProt

public void setUpProt(Protocol up_prot)

getDownProt

public Protocol getDownProt()

setDownProt

public void setDownProt(Protocol down_prot)

getLocalAddr

public Address getLocalAddr()

setLocalAddr

public void setLocalAddr(Address local_addr)

getDeliveryTableMaxSize

public int getDeliveryTableMaxSize()

setDeliveryTableMaxSize

public void setDeliveryTableMaxSize(int max_size)

deliveryTableSize

public int deliveryTableSize()
Total size of all queues of the delivery table


start

public void start()

stop

public void stop()

send

public void send(long id,
                 Message msg)

receive

public void receive(long id,
                    Message msg)

flush

public void flush(Address new_target,
                  java.util.List<Address> mbrs)

ack

public void ack(long id)

size

public int size()

doFlush

protected void doFlush(Address new_target)
                throws java.lang.InterruptedException
Throws:
java.lang.InterruptedException

flushMessagesInForwardTable

protected void flushMessagesInForwardTable(Address target)
Sends all messages currently in forward_table to the new target (changing the dest field). This needs to be done, so the underlying reliable unicast protocol (e.g. UNICAST) adds these messages to its retransmission mechanism
Note that we need to resend the messages in order of their seqnos ! We also need to prevent other message from being inserted until we're done, that's why there's synchronization.
Access to the forward_table doesn't need to be synchronized as there won't be any insertions during flushing (all down-threads are blocked)


canDeliver

protected boolean canDeliver(Address sender,
                             long seqno)
Checks if seqno has already been received from sender. This weeds out duplicates. Note that this method is never called concurrently for the same sender.


block

protected void block()

unblockAll

protected void unblockAll()

startFlusher

protected void startFlusher(Address new_coord)

stopFlusher

protected void stopFlusher()


Copyright © 1998-2012 Bela Ban / Red Hat. All Rights Reserved.