org.jgroups.stack
Class NakReceiverWindow

java.lang.Object
  extended by org.jgroups.stack.NakReceiverWindow

public class NakReceiverWindow
extends java.lang.Object

Keeps track of messages according to their sequence numbers. Allows messages to be added out of order, and with gaps between sequence numbers. Method remove() removes the first message with a sequence number that is 1 higher than next_to_remove (this variable is then incremented), or it returns null if no message is present, or if no message's sequence number is 1 higher.

When there is a gap upon adding a message, its seqno will be added to the Retransmitter, which (using a timer) requests retransmissions of missing messages and keeps on trying until the message has been received, or the member who sent the message is suspected. There are 2 variables which keep track of messages:

Note that the first seqno expected is 1. This design is described in doc/design.NAKACK.txt

Example: 1,2,3,5,6,8: highest_delivered=2 (or 3, depending on whether remove() was called !), highest_received=8

Author:
Bela Ban

Nested Class Summary
static interface NakReceiverWindow.Listener
           
 
Field Summary
protected static Log log
           
 
Constructor Summary
NakReceiverWindow(Address sender, Retransmitter.RetransmitCommand cmd, long highest_delivered_seqno, TimeScheduler sched)
          Creates a new instance with the given retransmit command
NakReceiverWindow(Address sender, Retransmitter.RetransmitCommand cmd, long highest_delivered_seqno, TimeScheduler sched, boolean use_range_based_retransmitter)
           
NakReceiverWindow(Address sender, Retransmitter.RetransmitCommand cmd, long highest_delivered_seqno, TimeScheduler sched, boolean use_range_based_retransmitter, int num_rows, int msgs_per_row, double resize_factor, long max_compaction_time, boolean automatic_purging)
           
 
Method Summary
 boolean add(long seqno, Message msg)
          Adds a message according to its seqno (sequence number).
 void compact()
           
 void destroy()
          Destroys the NakReceiverWindow.
 Message get(long seqno)
          Returns the message from xmit_table
 java.util.List<Message> get(long from, long to)
          Returns a list of messages in the range [from ..
 long[] getDigest()
          Returns the lowest, highest delivered and highest received seqnos
 long getHighestDelivered()
          Returns the highest sequence number of a message consumed by the application (by remove()).
 long getHighestReceived()
          Returns the highest sequence number received so far (which may be higher than the highest seqno delivered so far; e.g., for 1,2,3,5,6 it would be 6.
 int getMissingMessages()
           
 int getPendingXmits()
           
 java.util.concurrent.atomic.AtomicBoolean getProcessing()
           
 int getRetransmitTableCapacity()
           
 double getRetransmitTableFillFactor()
           
 long getRetransmitTableOffset()
           
 int getRetransmitTableSize()
           
 java.lang.String printLossRate()
           
protected  java.lang.String printMessages()
          Prints xmit_table.
 java.lang.String printRetransmitStats()
           
 Message remove()
           
 Message remove(boolean acquire_lock, boolean remove_msg)
           
 java.util.List<Message> removeMany(java.util.concurrent.atomic.AtomicBoolean processing)
          Removes as many messages as possible
 java.util.List<Message> removeMany(java.util.concurrent.atomic.AtomicBoolean processing, boolean remove_msgs, int max_results)
          Removes as many messages as possible
 long setHighestDelivered(long new_val)
           
 void setListener(NakReceiverWindow.Listener l)
           
 void setRetransmitTimeouts(Interval timeouts)
           
 void setXmitStaggerTimeout(long timeout)
           
 int size()
           
 long sizeOfAllMessages(boolean include_headers)
          Returns the number of bytes taken up by all of the messages in the RetransmitTable
 void stable(long seqno)
          Delete all messages <= seqno (they are stable, that is, have been delivered by all members).
 java.lang.String toString()
           
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
 

Field Detail

log

protected static final Log log
Constructor Detail

NakReceiverWindow

public NakReceiverWindow(Address sender,
                         Retransmitter.RetransmitCommand cmd,
                         long highest_delivered_seqno,
                         TimeScheduler sched)
Creates a new instance with the given retransmit command

Parameters:
sender - The sender associated with this instance
cmd - The command used to retransmit a missing message, will be invoked by the table. If null, the retransmit thread will not be started
highest_delivered_seqno - The next seqno to remove is highest_delivered_seqno +1
sched - the external scheduler to use for retransmission requests of missing msgs. If it's not provided or is null, an internal

NakReceiverWindow

public NakReceiverWindow(Address sender,
                         Retransmitter.RetransmitCommand cmd,
                         long highest_delivered_seqno,
                         TimeScheduler sched,
                         boolean use_range_based_retransmitter)

NakReceiverWindow

public NakReceiverWindow(Address sender,
                         Retransmitter.RetransmitCommand cmd,
                         long highest_delivered_seqno,
                         TimeScheduler sched,
                         boolean use_range_based_retransmitter,
                         int num_rows,
                         int msgs_per_row,
                         double resize_factor,
                         long max_compaction_time,
                         boolean automatic_purging)
Method Detail

getProcessing

public java.util.concurrent.atomic.AtomicBoolean getProcessing()

setRetransmitTimeouts

public void setRetransmitTimeouts(Interval timeouts)

setXmitStaggerTimeout

public void setXmitStaggerTimeout(long timeout)

setListener

public void setListener(NakReceiverWindow.Listener l)

getPendingXmits

public int getPendingXmits()

getRetransmitTableSize

public int getRetransmitTableSize()

getRetransmitTableCapacity

public int getRetransmitTableCapacity()

getRetransmitTableFillFactor

public double getRetransmitTableFillFactor()

getRetransmitTableOffset

public long getRetransmitTableOffset()

compact

public void compact()

add

public boolean add(long seqno,
                   Message msg)
Adds a message according to its seqno (sequence number).

There are 4 cases where messages are added:

  1. seqno is the next to be expected seqno: added to map
  2. seqno is <= highest_delivered: discard as we've already delivered it
  3. seqno is smaller than the next expected seqno: missing message, add it
  4. seqno is greater than the next expected seqno: add it to map and fill the gaps with null messages for retransmission. Add the seqno to the retransmitter too

Returns:
True if the message was added successfully, false otherwise (e.g. duplicate message)

remove

public Message remove()

remove

public Message remove(boolean acquire_lock,
                      boolean remove_msg)

removeMany

public java.util.List<Message> removeMany(java.util.concurrent.atomic.AtomicBoolean processing)
Removes as many messages as possible

Returns:
List A list of messages, or null if no available messages were found

removeMany

public java.util.List<Message> removeMany(java.util.concurrent.atomic.AtomicBoolean processing,
                                          boolean remove_msgs,
                                          int max_results)
Removes as many messages as possible

Parameters:
remove_msgs - Removes messages from xmit_table
max_results - Max number of messages to remove in one batch
Returns:
List A list of messages, or null if no available messages were found

stable

public void stable(long seqno)
Delete all messages <= seqno (they are stable, that is, have been delivered by all members). Stop when a number > seqno is encountered (all messages are ordered on seqnos).


destroy

public void destroy()
Destroys the NakReceiverWindow. After this method returns, no new messages can be added and a new NakReceiverWindow should be used instead. Note that messages can still be removed though.


getDigest

public long[] getDigest()
Returns the lowest, highest delivered and highest received seqnos


getHighestDelivered

public long getHighestDelivered()
Returns the highest sequence number of a message consumed by the application (by remove()). Note that this is different from the highest deliverable seqno. E.g. in 23,24,26,27,29, the highest delivered message may be 22, whereas the highest deliverable message may be 24 !

Returns:
the highest sequence number of a message consumed by the application (by remove())

setHighestDelivered

public long setHighestDelivered(long new_val)

getHighestReceived

public long getHighestReceived()
Returns the highest sequence number received so far (which may be higher than the highest seqno delivered so far; e.g., for 1,2,3,5,6 it would be 6.

See Also:
getHighestDelivered()

get

public Message get(long seqno)
Returns the message from xmit_table

Parameters:
seqno -
Returns:
Message from xmit_table

get

public java.util.List<Message> get(long from,
                                   long to)
Returns a list of messages in the range [from .. to], including from and to

Parameters:
from -
to -
Returns:
A list of messages, or null if none in range [from .. to] was found

size

public int size()

sizeOfAllMessages

public long sizeOfAllMessages(boolean include_headers)
Returns the number of bytes taken up by all of the messages in the RetransmitTable

Parameters:
include_headers -
Returns:

getMissingMessages

public int getMissingMessages()

toString

public java.lang.String toString()
Overrides:
toString in class java.lang.Object

printMessages

protected java.lang.String printMessages()
Prints xmit_table. Requires read lock to be present

Returns:
String

printLossRate

public java.lang.String printLossRate()

printRetransmitStats

public java.lang.String printRetransmitStats()


Copyright © 1998-2012 Bela Ban / Red Hat. All Rights Reserved.