Class ReliableUnicast

    • Field Detail

      • DEFAULT_XMIT_INTERVAL

        protected static final long DEFAULT_XMIT_INTERVAL
        See Also:
        Constant Field Values
      • conn_expiry_timeout

        protected long conn_expiry_timeout
      • conn_close_timeout

        protected long conn_close_timeout
      • max_retransmit_time

        protected long max_retransmit_time
      • xmit_interval

        protected long xmit_interval
      • xmits_enabled

        protected boolean xmits_enabled
      • log_not_found_msgs

        protected boolean log_not_found_msgs
      • sync_min_interval

        protected long sync_min_interval
      • max_xmit_req_size

        protected int max_xmit_req_size
      • max_batch_size

        protected int max_batch_size
      • send_atomically

        protected boolean send_atomically
      • reuse_message_batches

        protected boolean reuse_message_batches
      • loopback

        protected boolean loopback
      • DEFAULT_INITIAL_CAPACITY

        protected static final int DEFAULT_INITIAL_CAPACITY
        See Also:
        Constant Field Values
      • num_msgs_sent

        protected final java.util.concurrent.atomic.LongAdder num_msgs_sent
      • num_msgs_received

        protected final java.util.concurrent.atomic.LongAdder num_msgs_received
      • num_acks_sent

        protected final java.util.concurrent.atomic.LongAdder num_acks_sent
      • num_acks_received

        protected final java.util.concurrent.atomic.LongAdder num_acks_received
      • num_xmits

        protected final java.util.concurrent.atomic.LongAdder num_xmits
      • xmit_reqs_received

        protected final java.util.concurrent.atomic.LongAdder xmit_reqs_received
      • xmit_reqs_sent

        protected final java.util.concurrent.atomic.LongAdder xmit_reqs_sent
      • xmit_rsps_sent

        protected final java.util.concurrent.atomic.LongAdder xmit_rsps_sent
      • avg_delivery_batch_size

        protected final AverageMinMax avg_delivery_batch_size
      • sends_can_block

        protected boolean sends_can_block
      • is_trace

        protected boolean is_trace
      • relay_present

        protected boolean relay_present
      • cached_batches

        protected final java.util.Map<Address,​MessageBatch> cached_batches
        To cache batches for sending messages up the stack (https://issues.redhat.com/browse/JGRP-2841). Note: values are all _regular_ batches; OOB batches/messages have been passed up, or were removed when draining the table in removeAndDeliver()
      • recv_table_lock

        protected final java.util.concurrent.locks.ReentrantLock recv_table_lock
      • xmit_task_map

        protected final java.util.Map<Address,​java.lang.Long> xmit_task_map
        Used by the retransmit task to keep the last retransmitted seqno per member (applicable only for received messages (ReceiverEntry)): https://issues.redhat.com/browse/JGRP-1539
      • xmit_task

        protected java.util.concurrent.Future<?> xmit_task
        RetransmitTask running every xmit_interval ms
      • members

        protected volatile java.util.List<Address> members
      • running

        protected volatile boolean running
      • last_conn_id

        protected short last_conn_id
      • timestamper

        protected final java.util.concurrent.atomic.AtomicInteger timestamper
      • last_sync_sent

        protected ExpiryCache<Address> last_sync_sent
        Keep track of when a SEND_FIRST_SEQNO message was sent to a given sender
      • num_loopbacks

        protected final java.util.concurrent.atomic.LongAdder num_loopbacks
      • DUMMY_OOB_MSG

        protected static final Message DUMMY_OOB_MSG
      • drop_oob_and_dont_loopback_msgs_filter

        protected final java.util.function.Predicate<Message> drop_oob_and_dont_loopback_msgs_filter
      • remove_filter

        protected static final java.util.function.Predicate<Message> remove_filter
      • BATCH_ACCUMULATOR

        protected static final java.util.function.BiConsumer<MessageBatch,​Message> BATCH_ACCUMULATOR
    • Constructor Detail

      • ReliableUnicast

        public ReliableUnicast()
    • Method Detail

      • createBuffer

        protected abstract Buffer<Message> createBuffer​(long initial_seqno)
      • getNumLoopbacks

        public long getNumLoopbacks()
      • getNumSendConnections

        public int getNumSendConnections()
      • getNumReceiveConnections

        public int getNumReceiveConnections()
      • getNumConnections

        public int getNumConnections()
      • getTimestamper

        public int getTimestamper()
      • setLevel

        public <T extends Protocol> T setLevel​(java.lang.String level)
        Description copied from class: Protocol
        Sets the level of a logger. This method is used to dynamically change the logging level of a running system, e.g. via JMX. The appender of a level needs to exist.
        Overrides:
        setLevel in class Protocol
        Parameters:
        level - The new level. Valid values are "fatal", "error", "warn", "info", "debug", "trace" (capitalization not relevant)
      • getXmitInterval

        public long getXmitInterval()
      • isXmitsEnabled

        public boolean isXmitsEnabled()
      • getConnExpiryTimeout

        public long getConnExpiryTimeout()
      • setConnExpiryTimeout

        public ReliableUnicast setConnExpiryTimeout​(long c)
      • getConnCloseTimeout

        public long getConnCloseTimeout()
      • setConnCloseTimeout

        public ReliableUnicast setConnCloseTimeout​(long c)
      • logNotFoundMsgs

        public boolean logNotFoundMsgs()
      • getSyncMinInterval

        public long getSyncMinInterval()
      • setSyncMinInterval

        public ReliableUnicast setSyncMinInterval​(long s)
      • getMaxXmitReqSize

        public int getMaxXmitReqSize()
      • reuseMessageBatches

        public boolean reuseMessageBatches()
      • reuseMessageBatches

        public ReliableUnicast reuseMessageBatches​(boolean b)
      • sendsCanBlock

        public boolean sendsCanBlock()
      • sendAtomically

        public boolean sendAtomically()
      • loopback

        public boolean loopback()
      • printConnections

        public java.lang.String printConnections()
      • printCachedBatches

        public java.lang.String printCachedBatches()
      • getNumMessagesSent

        @Deprecated
        public long getNumMessagesSent()
        Deprecated.
        Don't remove! https://issues.redhat.com/browse/JGRP-2814
      • getNumMessagesReceived

        @Deprecated
        public long getNumMessagesReceived()
        Deprecated.
        Don't remove! https://issues.redhat.com/browse/JGRP-2814
      • getNumAcksSent

        public long getNumAcksSent()
      • getNumAcksReceived

        public long getNumAcksReceived()
      • getNumXmits

        public long getNumXmits()
      • getMaxRetransmitTime

        public long getMaxRetransmitTime()
      • setMaxRetransmitTime

        public ReliableUnicast setMaxRetransmitTime​(long max_retransmit_time)
      • isXmitTaskRunning

        public boolean isXmitTaskRunning()
      • getAgeOutCacheSize

        public int getAgeOutCacheSize()
      • printAgeOutCache

        public java.lang.String printAgeOutCache()
      • hasSendConnectionTo

        public boolean hasSendConnectionTo​(Address dest)
        Used for testing only
      • getNumUnackedMessages

        public int getNumUnackedMessages()
        The number of messages in all Entry.sent_msgs tables (haven't received an ACK yet)
      • getXmitTableUndeliveredMessages

        public int getXmitTableUndeliveredMessages()
      • getXmitTableMissingMessages

        public int getXmitTableMissingMessages()
      • getXmitTableDeliverableMessages

        public int getXmitTableDeliverableMessages()
      • printReceiveWindowMessages

        public java.lang.String printReceiveWindowMessages()
      • printSendWindowMessages

        public java.lang.String printSendWindowMessages()
      • init

        public void init()
                  throws java.lang.Exception
        Description copied from class: Protocol
        Called after a protocol has been created and before the protocol is started. Attributes are already set. Other protocols are not yet connected and events cannot yet be sent.
        Specified by:
        init in interface Lifecycle
        Overrides:
        init in class Protocol
        Throws:
        java.lang.Exception - Thrown if protocol cannot be initialized successfully. This will cause the ProtocolStack to fail, so the the channel constructor will throw an exception
      • start

        public void start()
                   throws java.lang.Exception
        Description copied from class: Protocol
        This method is called on a JChannel.connect(String); starts work. Protocols are connected ready to receive events. Will be called from bottom to top.
        Specified by:
        start in interface Lifecycle
        Overrides:
        start in class Protocol
        Throws:
        java.lang.Exception - Thrown if protocol cannot be started successfully. This will cause the ProtocolStack to fail, so JChannel.connect(String) will throw an exception
      • up

        public java.lang.Object up​(Message msg)
        Description copied from class: Protocol
        A single message was received. Protocols may examine the message and do something (e.g. add a header) with it before passing it up.
        Overrides:
        up in class Protocol
      • up

        public void up​(MessageBatch batch)
        Description copied from class: Protocol
        Sends up a multiple messages in a MessageBatch. The sender of the batch is always the same, and so is the destination (null == multicast messages). Messages in a batch can be OOB messages, regular messages, or mixed messages, although the transport itself will create initial MessageBatches that contain only either OOB or regular messages.

        The default processing below sends messages up the stack individually, based on a matching criteria (calling Protocol.accept(Message)), and - if true - calls Protocol.up(org.jgroups.Event) for that message and removes the message. If the batch is not empty, it is passed up, or else it is dropped.

        Subclasses should check if there are any messages destined for them (e.g. using MessageBatch.iterator(Predicate)), then possibly remove and process them and finally pass the batch up to the next protocol. Protocols can also modify messages in place, e.g. ENCRYPT could decrypt all encrypted messages in the batch, not remove them, and pass the batch up when done.

        Overrides:
        up in class Protocol
        Parameters:
        batch - The message batch
      • down

        public java.lang.Object down​(Event evt)
        Description copied from class: Protocol
        An event is to be sent down the stack. A protocol may want to examine its type and perform some action on it, depending on the event's type. If the event is a message MSG, then the protocol may need to add a header to it (or do nothing at all) before sending it down the stack using down_prot.down().
        Overrides:
        down in class Protocol
      • down

        public java.lang.Object down​(Message msg)
        Description copied from class: Protocol
        A message is sent down the stack. Protocols may examine the message and do something (e.g. add a header) with it, before passing it down.
        Overrides:
        down in class Protocol
      • isLocalSiteMaster

        protected boolean isLocalSiteMaster​(Address dest)
      • isLocal

        protected boolean isLocal​(Address addr)
      • closeConnection

        public void closeConnection​(Address mbr)
        Removes and resets from connection table (which is already locked). Returns true if member was found, otherwise false. This method is public only so it can be invoked by unit testing, but should not be used !
      • closeSendConnection

        public void closeSendConnection​(Address mbr)
      • closeReceiveConnection

        public void closeReceiveConnection​(Address mbr)
      • removeSendConnection

        public void removeSendConnection​(Address mbr)
      • removeReceiveConnection

        public void removeReceiveConnection​(Address mbr)
      • removeAllConnections

        public void removeAllConnections()
        This method is public only so it can be invoked by unit testing, but should not otherwise be used !
      • retransmit

        protected void retransmit​(SeqnoList missing,
                                  Address sender,
                                  Address real_dest)
        Sends a retransmit request to the given sender
      • retransmit

        protected void retransmit​(Message msg)
        Called by the sender to resend messages for which no ACK has been received yet
      • handleDataReceived

        protected void handleDataReceived​(Address sender,
                                          long seqno,
                                          short conn_id,
                                          boolean first,
                                          Message msg)
        Check whether the hashtable contains an entry e for sender (create if not). If e.received_msgs is null and first is true: create a new AckReceiverWindow(seqno) and add message. Set e.received_msgs to the new window. Else just add the message.
      • handleDataReceivedFromSelf

        protected void handleDataReceivedFromSelf​(Address sender,
                                                  long seqno,
                                                  Message msg)
        Called when the sender of a message is the local member. In this case, we don't need to add the message to the table as the sender already did that
      • removeAndDeliver

        protected void removeAndDeliver​(ReliableUnicast.Entry entry,
                                        Address sender,
                                        AsciiString cluster,
                                        int min_size)
        Try to remove as many messages as possible from the table as pass them up. Prevents concurrent passing up of messages by different threads (https://issues.redhat.com/browse/JGRP-198); lots of threads can come up to this point concurrently, but only 1 is allowed to pass at a time. We *can* deliver messages from *different* senders concurrently, e.g. reception of P1, Q1, P2, Q2 can result in delivery of P1, Q1, Q2, P2: FIFO (implemented by UNICAST) says messages need to be delivered in the order in which they were sent
      • printMessageList

        protected java.lang.String printMessageList​(java.util.List<LongTuple<Message>> list)
      • handleAckReceived

        protected void handleAckReceived​(Address sender,
                                         long seqno,
                                         short conn_id,
                                         int timestamp)
        Add the ACK to hashtable.sender.sent_msgs
      • handleResendingOfFirstMessage

        protected void handleResendingOfFirstMessage​(Address sender,
                                                     int timestamp)
        We need to resend the first message with our conn_id
      • handleXmitRequest

        protected void handleXmitRequest​(Address sender,
                                         SeqnoList missing)
      • addToSendBuffer

        protected boolean addToSendBuffer​(Buffer<Message> win,
                                          long seq,
                                          Message msg,
                                          java.util.function.Predicate<Message> filter,
                                          boolean dont_block)
        Adds the message to the send buffer. The loop tries to handle temporary OOMEs by retrying if add() failed.
        Returns:
        True if added successfully. False if not, e.g. no space in buffer and DONT_BLOCK set, or message already present, or seqno lower than buffer.low
      • resend

        protected void resend​(Message msg)
      • deliverMessage

        protected void deliverMessage​(Message msg,
                                      Address sender,
                                      long seqno)
      • getTimestamp

        protected long getTimestamp()
      • startRetransmitTask

        public void startRetransmitTask()
      • stopRetransmitTask

        public void stopRetransmitTask()
      • isCallerRunsHandler

        protected static boolean isCallerRunsHandler​(java.util.concurrent.RejectedExecutionHandler h)
      • getNewConnectionId

        protected short getNewConnectionId()
      • sendRequestForFirstSeqno

        protected void sendRequestForFirstSeqno​(Address dest,
                                                Address original_dest)
      • sendClose

        public void sendClose​(Address dest,
                              short conn_id)
      • closeIdleConnections

        public void closeIdleConnections()
      • removeExpiredConnections

        public int removeExpiredConnections()
      • removeConnections

        public int removeConnections​(boolean remove_send_connections,
                                     boolean remove_receive_connections)
        Removes send- and/or receive-connections whose state is not OPEN (CLOSING or CLOSED).
        Parameters:
        remove_send_connections - If true, send connections whose state is !OPEN are destroyed and removed
        remove_receive_connections - If true, receive connections with state !OPEN are destroyed and removed
        Returns:
        The number of connections which were removed
      • triggerXmit

        public void triggerXmit()
      • sendPendingAcks

        public void sendPendingAcks()
      • compare

        protected static int compare​(int ts1,
                                     int ts2)
        Compares 2 timestamps, handles numeric overflow
      • accumulate

        @SafeVarargs
        protected static int accumulate​(java.util.function.ToIntFunction<Buffer<Message>> func,
                                        java.util.Collection<? extends ReliableUnicast.Entry>... entries)
      • max

        protected static short max​(short a,
                                   short b)
      • getLowestSeqno

        protected static Tuple<java.lang.Long,​java.lang.Boolean> getLowestSeqno​(short prot_id,
                                                                                      java.util.List<LongTuple<Message>> list)