Class ForwardQueue


  • public class ForwardQueue
    extends java.lang.Object
    Forwards messages in FIFO order to a destination. Uses IDs to prevent duplicates. Used by FORWARD_TO_COORD.
    Since:
    3.3
    Author:
    Bela Ban
    • Field Detail

      • down_prot

        protected Protocol down_prot
      • local_addr

        protected Address local_addr
      • forward_table

        protected final java.util.NavigableMap<java.lang.Long,​Message> forward_table
        Maintains messages forwarded to the target which which no ack has been received yet. Needs to be sorted so we can resend them in the right order
      • send_lock

        protected final java.util.concurrent.locks.Lock send_lock
      • send_cond

        protected final java.util.concurrent.locks.Condition send_cond
      • flushing

        protected volatile boolean flushing
        Set when we block all sending threads to resend all messages from forward_table
      • running

        protected volatile boolean running
      • in_flight_sends

        protected final java.util.concurrent.atomic.AtomicInteger in_flight_sends
        Keeps track of the threads sending messages
      • delivery_table

        protected final java.util.concurrent.ConcurrentMap<Address,​BoundedHashMap<java.lang.Long,​java.lang.Long>> delivery_table
      • ack_promise

        protected final Promise<java.lang.Long> ack_promise
        Used for each resent message to wait until the message has been received
      • log

        protected final Log log
      • delivery_table_max_size

        protected int delivery_table_max_size
        Size of the set to store received seqnos (for duplicate checking)
    • Constructor Detail

      • ForwardQueue

        public ForwardQueue​(Log log)
    • Method Detail

      • getUpProt

        public Protocol getUpProt()
      • setUpProt

        public void setUpProt​(Protocol up_prot)
      • getDownProt

        public Protocol getDownProt()
      • setDownProt

        public void setDownProt​(Protocol down_prot)
      • getLocalAddr

        public Address getLocalAddr()
      • setLocalAddr

        public void setLocalAddr​(Address local_addr)
      • getDeliveryTableMaxSize

        public int getDeliveryTableMaxSize()
      • setDeliveryTableMaxSize

        public void setDeliveryTableMaxSize​(int max_size)
      • deliveryTableSize

        public int deliveryTableSize()
        Total size of all queues of the delivery table
      • start

        public void start()
      • stop

        public void stop()
      • send

        public void send​(long id,
                         Message msg)
      • receive

        public void receive​(long id,
                            Message msg)
      • flush

        public void flush​(Address new_target,
                          java.util.List<Address> mbrs)
      • ack

        public void ack​(long id)
      • size

        public int size()
      • doFlush

        protected void doFlush​(Address new_target)
                        throws java.lang.InterruptedException
        Throws:
        java.lang.InterruptedException
      • flushMessagesInForwardTable

        protected void flushMessagesInForwardTable​(Address target)
        Sends all messages currently in forward_table to the new target (changing the dest field). This needs to be done, so the underlying reliable unicast protocol (e.g. UNICAST) adds these messages to its retransmission mechanism
        Note that we need to resend the messages in order of their seqnos ! We also need to prevent other message from being inserted until we're done, that's why there's synchronization.
        Access to the forward_table doesn't need to be synchronized as there won't be any insertions during flushing (all down-threads are blocked)
      • canDeliver

        protected boolean canDeliver​(Address sender,
                                     long seqno)
        Checks if seqno has already been received from sender. This weeds out duplicates. Note that this method is never called concurrently for the same sender.
      • block

        protected void block()
      • unblockAll

        protected void unblockAll()
      • startFlusher

        protected void startFlusher​(Address new_coord)
      • stopFlusher

        protected void stopFlusher()