org.jgroups.protocols.pbcast
Class NAKACK2

java.lang.Object
  extended by org.jgroups.stack.Protocol
      extended by org.jgroups.protocols.pbcast.NAKACK2
All Implemented Interfaces:
DiagnosticsHandler.ProbeHandler

public class NAKACK2
extends Protocol
implements DiagnosticsHandler.ProbeHandler

Negative AcKnowledgement layer (NAKs). Messages are assigned a monotonically increasing sequence number (seqno). Receivers deliver messages ordered according to seqno and request retransmission of missing messages.
Retransmit requests are usually sent to the original sender of a message, but this can be changed by xmit_from_random_member (send to random member) or use_mcast_xmit_req (send to everyone). Responses can also be sent to everyone instead of the requester by setting use_mcast_xmit to true.

Author:
Bela Ban

Nested Class Summary
protected static class NAKACK2.Counter
           
protected  class NAKACK2.RetransmitTask
          Retransmitter task which periodically (every xmit_interval ms) looks at all the retransmit tables and sends retransmit request to all members from which we have missing messages
 
Field Summary
protected  BoundedList<Message> become_server_queue
           
protected  int become_server_queue_size
           
protected  BoundedList<java.lang.String> digest_history
          Keeps a bounded list of the last N digest sets
protected  boolean discard_delivered_msgs
          Messages that have been received in order are sent up the stack (= delivered to the application).
protected  boolean is_server
           
protected  boolean leaving
           
protected  Address local_addr
           
protected  boolean log_discard_msgs
          If true, logs messages discarded because received from other members
protected  boolean log_not_found_msgs
           
protected  int max_msg_batch_size
           
protected  long max_rebroadcast_timeout
           
protected  java.util.List<Address> members
           
protected  int num_messages_received
           
protected  int num_messages_sent
           
protected static int NUM_REBROADCAST_MSGS
           
protected  boolean print_stability_history_on_failed_xmit
          When not finding a message on an XMIT request, include the last N stability messages in the error message
protected  Digest rebroadcast_digest
           
protected  java.util.concurrent.locks.Lock rebroadcast_digest_lock
           
protected  java.util.concurrent.locks.Condition rebroadcast_done
           
protected  java.util.concurrent.locks.Lock rebroadcast_lock
           
protected  boolean rebroadcasting
           
protected  boolean running
           
protected  BoundedList<Digest> stability_msgs
          BoundedList, keeps the last 10 stability messages
protected  SuppressLog<Address> suppress_log_non_member
          Log to suppress identical warnings for messages from non-members
protected  long suppress_time_non_member_warnings
           
protected  TimeScheduler timer
           
protected  boolean use_mcast_xmit
          Retransmit messages using multicast rather than unicast.
protected  boolean use_mcast_xmit_req
          Use a multicast to request retransmission of missing messages.
protected  View view
           
protected  boolean xmit_from_random_member
          Ask a random member for retransmission of a missing message.
protected  long xmit_interval
           
protected  java.util.concurrent.atomic.AtomicLong xmit_reqs_received
           
protected  java.util.concurrent.atomic.AtomicLong xmit_reqs_sent
           
protected  java.util.concurrent.atomic.AtomicLong xmit_rsps_received
           
protected  java.util.concurrent.atomic.AtomicLong xmit_rsps_sent
           
protected  java.util.concurrent.ConcurrentMap<Address,Table<Message>> xmit_table
          Map to store sent and received messages (keyed by sender)
protected  long xmit_table_max_compaction_time
           
protected  int xmit_table_msgs_per_row
           
protected  int xmit_table_num_rows
           
protected  double xmit_table_resize_factor
           
protected  java.util.concurrent.Future<?> xmit_task
          RetransmitTask running every xmit_interval ms
protected  java.util.Map<Address,java.lang.Long> xmit_task_map
          Used by the retransmit task to keep the last retransmitted seqno per sender (https://issues.jboss.org/browse/JGRP-1539)
 
Fields inherited from class org.jgroups.stack.Protocol
down_prot, ergonomics, id, log, name, stack, stats, up_prot
 
Constructor Summary
NAKACK2()
           
 
Method Summary
protected  void adjustReceivers(java.util.List<Address> new_members)
          Remove old members from the retransmission buffers.
protected  void cancelRebroadcasting()
           
protected  void checkForRebroadcasts()
           
 void clearNonMemberCache()
           
 void compact()
           
protected  Table<Message> createTable(long initial_seqno)
           
 java.lang.Object down(Event evt)
          Callback.
 java.util.Map<java.lang.String,java.lang.Object> dumpStats()
           
 java.lang.String dumpXmitTablesNumCurrentRows()
           
protected  void flushBecomeServerQueue()
          Flushes the queue.
 int getBecomeServerQueueSizeActual()
           
 long getCurrentSeqno()
           
 Digest getDigest()
          Returns a message digest: for each member P the highest delivered and received seqno is added
 Digest getDigest(Address mbr)
           
 boolean getLogDiscardMessages()
           
 int getNonMemberMessages()
           
 long getSizeOfAllMessages()
           
 long getSizeOfAllMessagesInclHeaders()
           
 Table<Message> getWindow(Address sender)
          Returns the receive window for sender; only used for testing.
 long getXmitRequestsReceived()
           
 long getXmitRequestsSent()
           
 long getXmitResponsesReceived()
           
 long getXmitResponsesSent()
           
 long getXmitTableCapacity()
           
 int getXmitTableMissingMessages()
           
 int getXmitTableNumCompactions()
           
 int getXmitTableNumCurrentRows()
           
 int getXmitTableNumMoves()
           
 int getXmitTableNumPurges()
           
 int getXmitTableNumResizes()
           
 int getXmitTableUndeliveredMsgs()
           
protected  void handleMessage(Message msg, NakAckHeader2 hdr)
          Finds the corresponding retransmit buffer and adds the message to it (according to seqno).
 java.util.Map<java.lang.String,java.lang.String> handleProbe(java.lang.String... keys)
          Handles a probe.
protected  void handleXmitReq(Address xmit_requester, SeqnoList missing_msgs, Address original_sender)
          Retransmits messsages first_seqno to last_seqno from original_sender from xmit_table to xmit_requester, called when XMIT_REQ is received.
protected  void handleXmitRsp(Message msg, NakAckHeader2 hdr)
           
 void init()
          Called after instance has been created (null constructor) and before protocol is started.
 boolean isDiscardDeliveredMsgs()
           
 boolean isUseMcastXmit()
           
 boolean isXmitFromRandomMember()
           
 boolean isXmitTaskRunning()
           
protected  void mergeDigest(Digest digest)
          For all members of the digest, adjust the retransmit buffers in xmit_table.
protected  void overwriteDigest(Digest digest)
          Overwrites existing entries, but does NOT remove entries not found in the digest
 java.lang.String printDigestHistory()
           
 java.lang.String printMessages()
           
 java.lang.String printStabilityHistory()
           
 java.lang.String printStabilityMessages()
           
 java.lang.String printStats()
           
 java.util.List<java.lang.Integer> providedUpServices()
          List of events that are provided to layers above (they will be handled when sent down from above)
protected  void rebroadcastMessages()
          Takes the argument highest_seqnos and compares it to the current digest.
protected  void reset()
           
 void resetStats()
           
protected  void retransmit(long first_seqno, long last_seqno, Address sender)
           
protected  void retransmit(long first_seqno, long last_seqno, Address sender, boolean multicast_xmit_request)
           
protected  void retransmit(SeqnoList missing_msgs, Address sender, boolean multicast_xmit_request)
           
protected  void send(Event evt, Message msg)
          Adds the message to the sent_msgs table and then passes it down the stack.
protected  void sendXmitRsp(Address dest, Message msg)
          Sends a message msg to the requester.
protected  void setDigest(Digest digest)
          Creates a retransmit buffer for each sender in the digest according to the sender's seqno.
protected  void setDigest(Digest digest, boolean merge)
          Sets or merges the digest.
 void setDiscardDeliveredMsgs(boolean discard_delivered_msgs)
           
 void setLogDiscardMessages(boolean flag)
           
 void setTimer(TimeScheduler timer)
          Only used for unit tests, don't use !
 void setUseMcastXmit(boolean use_mcast_xmit)
           
 void setUseMcastXmitReq(boolean flag)
           
 void setXmitFromRandomMember(boolean xmit_from_random_member)
           
protected static long sizeOfAllMessages(Table<Message> buf, boolean include_headers)
           
protected  void stable(Digest digest)
          Garbage collect messages that have been seen by all members.
 void start()
          This method is called on a Channel.connect(String).
protected  void startRetransmitTask()
           
 void stop()
          This method is called on a Channel.disconnect().
protected  void stopRetransmitTask()
           
 java.lang.String[] supportedKeys()
          Returns a list of supported keys
 void triggerXmit()
           
 java.lang.Object up(Event evt)
          Callback.
 
Methods inherited from class org.jgroups.stack.Protocol
destroy, enableStats, getConfigurableObjects, getDownProtocol, getDownServices, getId, getIdsAbove, getLevel, getName, getProtocolStack, getSocketFactory, getThreadFactory, getTransport, getUpProtocol, getUpServices, getValue, isErgonomics, providedDownServices, requiredDownServices, requiredUpServices, resetStatistics, setDownProtocol, setErgonomics, setId, setLevel, setProtocolStack, setSocketFactory, setUpProtocol, setValue, setValues, statsEnabled
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Field Detail

NUM_REBROADCAST_MSGS

protected static final int NUM_REBROADCAST_MSGS
See Also:
Constant Field Values

max_msg_batch_size

protected int max_msg_batch_size

use_mcast_xmit

protected boolean use_mcast_xmit
Retransmit messages using multicast rather than unicast. This has the advantage that, if many receivers lost a message, the sender only retransmits once


use_mcast_xmit_req

protected boolean use_mcast_xmit_req
Use a multicast to request retransmission of missing messages. This may be costly as every member in the cluster will send a response


xmit_from_random_member

protected boolean xmit_from_random_member
Ask a random member for retransmission of a missing message. If set to true, discard_delivered_msgs will be set to false


discard_delivered_msgs

protected boolean discard_delivered_msgs
Messages that have been received in order are sent up the stack (= delivered to the application). Delivered messages are removed from the retransmission buffer, so they can get GC'ed by the JVM. When this property is true, everyone (except the sender of a message) removes the message from their retransission buffers as soon as it has been delivered to the application


max_rebroadcast_timeout

protected long max_rebroadcast_timeout

print_stability_history_on_failed_xmit

protected boolean print_stability_history_on_failed_xmit
When not finding a message on an XMIT request, include the last N stability messages in the error message


log_discard_msgs

protected boolean log_discard_msgs
If true, logs messages discarded because received from other members


log_not_found_msgs

protected boolean log_not_found_msgs

xmit_interval

protected long xmit_interval

xmit_table_num_rows

protected int xmit_table_num_rows

xmit_table_msgs_per_row

protected int xmit_table_msgs_per_row

xmit_table_resize_factor

protected double xmit_table_resize_factor

xmit_table_max_compaction_time

protected long xmit_table_max_compaction_time

become_server_queue_size

protected int become_server_queue_size

suppress_time_non_member_warnings

protected long suppress_time_non_member_warnings

xmit_reqs_received

protected final java.util.concurrent.atomic.AtomicLong xmit_reqs_received

xmit_reqs_sent

protected final java.util.concurrent.atomic.AtomicLong xmit_reqs_sent

xmit_rsps_received

protected final java.util.concurrent.atomic.AtomicLong xmit_rsps_received

xmit_rsps_sent

protected final java.util.concurrent.atomic.AtomicLong xmit_rsps_sent

num_messages_sent

protected int num_messages_sent

num_messages_received

protected int num_messages_received

is_server

protected volatile boolean is_server

local_addr

protected Address local_addr

members

protected volatile java.util.List<Address> members

view

protected View view

xmit_table

protected final java.util.concurrent.ConcurrentMap<Address,Table<Message>> xmit_table
Map to store sent and received messages (keyed by sender)


xmit_task

protected java.util.concurrent.Future<?> xmit_task
RetransmitTask running every xmit_interval ms


xmit_task_map

protected final java.util.Map<Address,java.lang.Long> xmit_task_map
Used by the retransmit task to keep the last retransmitted seqno per sender (https://issues.jboss.org/browse/JGRP-1539)


leaving

protected volatile boolean leaving

running

protected volatile boolean running

timer

protected TimeScheduler timer

rebroadcast_lock

protected final java.util.concurrent.locks.Lock rebroadcast_lock

rebroadcast_done

protected final java.util.concurrent.locks.Condition rebroadcast_done

rebroadcasting

protected volatile boolean rebroadcasting

rebroadcast_digest_lock

protected final java.util.concurrent.locks.Lock rebroadcast_digest_lock

rebroadcast_digest

protected Digest rebroadcast_digest

stability_msgs

protected final BoundedList<Digest> stability_msgs
BoundedList, keeps the last 10 stability messages


digest_history

protected final BoundedList<java.lang.String> digest_history
Keeps a bounded list of the last N digest sets


become_server_queue

protected BoundedList<Message> become_server_queue

suppress_log_non_member

protected SuppressLog<Address> suppress_log_non_member
Log to suppress identical warnings for messages from non-members

Constructor Detail

NAKACK2

public NAKACK2()
Method Detail

isXmitTaskRunning

public boolean isXmitTaskRunning()

getNonMemberMessages

public int getNonMemberMessages()

clearNonMemberCache

public void clearNonMemberCache()

getXmitRequestsReceived

public long getXmitRequestsReceived()

getXmitRequestsSent

public long getXmitRequestsSent()

getXmitResponsesReceived

public long getXmitResponsesReceived()

getXmitResponsesSent

public long getXmitResponsesSent()

isUseMcastXmit

public boolean isUseMcastXmit()

isXmitFromRandomMember

public boolean isXmitFromRandomMember()

isDiscardDeliveredMsgs

public boolean isDiscardDeliveredMsgs()

getLogDiscardMessages

public boolean getLogDiscardMessages()

setUseMcastXmit

public void setUseMcastXmit(boolean use_mcast_xmit)

setUseMcastXmitReq

public void setUseMcastXmitReq(boolean flag)

setLogDiscardMessages

public void setLogDiscardMessages(boolean flag)

setXmitFromRandomMember

public void setXmitFromRandomMember(boolean xmit_from_random_member)

setDiscardDeliveredMsgs

public void setDiscardDeliveredMsgs(boolean discard_delivered_msgs)

getBecomeServerQueueSizeActual

public int getBecomeServerQueueSizeActual()

getWindow

public Table<Message> getWindow(Address sender)
Returns the receive window for sender; only used for testing. Do not use !


setTimer

public void setTimer(TimeScheduler timer)
Only used for unit tests, don't use !


getXmitTableUndeliveredMsgs

public int getXmitTableUndeliveredMsgs()

getXmitTableMissingMessages

public int getXmitTableMissingMessages()

getXmitTableCapacity

public long getXmitTableCapacity()

getXmitTableNumCurrentRows

public int getXmitTableNumCurrentRows()

getSizeOfAllMessages

public long getSizeOfAllMessages()

getSizeOfAllMessagesInclHeaders

public long getSizeOfAllMessagesInclHeaders()

getXmitTableNumCompactions

public int getXmitTableNumCompactions()

getXmitTableNumMoves

public int getXmitTableNumMoves()

getXmitTableNumResizes

public int getXmitTableNumResizes()

getXmitTableNumPurges

public int getXmitTableNumPurges()

printMessages

public java.lang.String printMessages()

getCurrentSeqno

public long getCurrentSeqno()

printStabilityMessages

public java.lang.String printStabilityMessages()

printDigestHistory

public java.lang.String printDigestHistory()

compact

public void compact()

dumpXmitTablesNumCurrentRows

public java.lang.String dumpXmitTablesNumCurrentRows()

resetStats

public void resetStats()
Overrides:
resetStats in class Protocol

init

public void init()
          throws java.lang.Exception
Description copied from class: Protocol
Called after instance has been created (null constructor) and before protocol is started. Properties are already set. Other protocols are not yet connected and events cannot yet be sent.

Overrides:
init in class Protocol
Throws:
java.lang.Exception - Thrown if protocol cannot be initialized successfully. This will cause the ProtocolStack to fail, so the channel constructor will throw an exception

dumpStats

public java.util.Map<java.lang.String,java.lang.Object> dumpStats()
Overrides:
dumpStats in class Protocol

printStats

public java.lang.String printStats()
Overrides:
printStats in class Protocol

printStabilityHistory

public java.lang.String printStabilityHistory()

providedUpServices

public java.util.List<java.lang.Integer> providedUpServices()
Description copied from class: Protocol
List of events that are provided to layers above (they will be handled when sent down from above)

Overrides:
providedUpServices in class Protocol

start

public void start()
           throws java.lang.Exception
Description copied from class: Protocol
This method is called on a Channel.connect(String). Starts work. Protocols are connected and queues are ready to receive events. Will be called from bottom to top. This call will replace the START and START_OK events.

Overrides:
start in class Protocol
Throws:
java.lang.Exception - Thrown if protocol cannot be started successfully. This will cause the ProtocolStack to fail, so Channel.connect(String) will throw an exception

stop

public void stop()
Description copied from class: Protocol
This method is called on a Channel.disconnect(). Stops work (e.g. by closing multicast socket). Will be called from top to bottom. This means that at the time of the method invocation the neighbor protocol below is still working. This method will replace the STOP, STOP_OK, CLEANUP and CLEANUP_OK events. The ProtocolStack guarantees that when this method is called all messages in the down queue will have been flushed

Overrides:
stop in class Protocol

down

public java.lang.Object down(Event evt)
Callback. Called by superclass when event may be handled.

Do not use down_prot.down() in this method as the event is passed down by default by the superclass after this method returns !

Overrides:
down in class Protocol

up

public java.lang.Object up(Event evt)
Callback. Called by superclass when event may be handled.

Do not use PassUp in this method as the event is passed up by default by the superclass after this method returns !

Overrides:
up in class Protocol

handleProbe

public java.util.Map<java.lang.String,java.lang.String> handleProbe(java.lang.String... keys)
Description copied from interface: DiagnosticsHandler.ProbeHandler
Handles a probe. For each key that is handled, the key and its result should be in the returned map.

Specified by:
handleProbe in interface DiagnosticsHandler.ProbeHandler
Returns:
Map. A map of keys and values. A null return value is permissible.

supportedKeys

public java.lang.String[] supportedKeys()
Description copied from interface: DiagnosticsHandler.ProbeHandler
Returns a list of supported keys

Specified by:
supportedKeys in interface DiagnosticsHandler.ProbeHandler

send

protected void send(Event evt,
                    Message msg)
Adds the message to the sent_msgs table and then passes it down the stack. Change Bela Ban May 26 2002: we don't store a copy of the message, but a reference ! This saves us a lot of memory. However, this also means that a message should not be changed after storing it in the sent-table ! See protocols/DESIGN for details. Made seqno increment and adding to sent_msgs atomic, e.g. seqno won't get incremented if adding to sent_msgs fails e.g. due to an OOM (see http://jira.jboss.com/jira/browse/JGRP-179). bela Jan 13 2006


handleMessage

protected void handleMessage(Message msg,
                             NakAckHeader2 hdr)
Finds the corresponding retransmit buffer and adds the message to it (according to seqno). Then removes as many messages as possible and passes them up the stack. Discards messages from non-members.


handleXmitReq

protected void handleXmitReq(Address xmit_requester,
                             SeqnoList missing_msgs,
                             Address original_sender)
Retransmits messsages first_seqno to last_seqno from original_sender from xmit_table to xmit_requester, called when XMIT_REQ is received.

Parameters:
xmit_requester - The sender of the XMIT_REQ, we have to send the requested copy of the message to this address
missing_msgs - A list of seqnos that have to be retransmitted
original_sender - The member who originally sent the messsage. Guaranteed to be non-null

flushBecomeServerQueue

protected void flushBecomeServerQueue()
Flushes the queue. Done in a separate thread as we don't want to block the ClientGmsImpl.installView(org.jgroups.View,org.jgroups.util.Digest) method (called when a view is installed).


cancelRebroadcasting

protected void cancelRebroadcasting()

sendXmitRsp

protected void sendXmitRsp(Address dest,
                           Message msg)
Sends a message msg to the requester. We have to wrap the original message into a retransmit message, as we need to preserve the original message's properties, such as src, headers etc.

Parameters:
dest -
msg -

handleXmitRsp

protected void handleXmitRsp(Message msg,
                             NakAckHeader2 hdr)

rebroadcastMessages

protected void rebroadcastMessages()
Takes the argument highest_seqnos and compares it to the current digest. If the current digest has fewer messages, then send retransmit messages for the missing messages. Return when all missing messages have been received. If we're waiting for a missing message from P, and P crashes while waiting, we need to exclude P from the wait set.


checkForRebroadcasts

protected void checkForRebroadcasts()

adjustReceivers

protected void adjustReceivers(java.util.List<Address> new_members)
Remove old members from the retransmission buffers. Essentially removes all members that are not in members. This method is not called concurrently multiple times


getDigest

public Digest getDigest()
Returns a message digest: for each member P the highest delivered and received seqno is added


getDigest

public Digest getDigest(Address mbr)

setDigest

protected void setDigest(Digest digest)
Creates a retransmit buffer for each sender in the digest according to the sender's seqno. If a buffer already exists, it resets it.


mergeDigest

protected void mergeDigest(Digest digest)
For all members of the digest, adjust the retransmit buffers in xmit_table. If no entry exists, create one with the initial seqno set to the seqno of the member in the digest. If the member already exists, and is not the local address, replace it with the new entry (http://jira.jboss.com/jira/browse/JGRP-699) if the digest's seqno is greater than the seqno in the window.


overwriteDigest

protected void overwriteDigest(Digest digest)
Overwrites existing entries, but does NOT remove entries not found in the digest

Parameters:
digest -

setDigest

protected void setDigest(Digest digest,
                         boolean merge)
Sets or merges the digest. If there is no entry for a given member in xmit_table, create a new buffer. Else skip the existing entry, unless it is a merge. In this case, skip the existing entry if its seqno is greater than or equal to the one in the digest, or reset the window and create a new one if not.

Parameters:
digest - The digest
merge - Whether to merge the new digest with our own, or not

createTable

protected Table<Message> createTable(long initial_seqno)

stable

protected void stable(Digest digest)
Garbage collect messages that have been seen by all members. Update sent_msgs: for the sender P in the digest which is equal to the local address, garbage collect all messages <= seqno at digest[P]. Update xmit_table: for each sender P in the digest and its highest seqno seen SEQ, garbage collect all delivered_msgs in the retransmit buffer corresponding to P which are <= seqno at digest[P].


retransmit

protected void retransmit(long first_seqno,
                          long last_seqno,
                          Address sender)

retransmit

protected void retransmit(long first_seqno,
                          long last_seqno,
                          Address sender,
                          boolean multicast_xmit_request)

retransmit

protected void retransmit(SeqnoList missing_msgs,
                          Address sender,
                          boolean multicast_xmit_request)

reset

protected void reset()

sizeOfAllMessages

protected static long sizeOfAllMessages(Table<Message> buf,
                                        boolean include_headers)

startRetransmitTask

protected void startRetransmitTask()

stopRetransmitTask

protected void stopRetransmitTask()

triggerXmit

public void triggerXmit()


Copyright © 1998-2012 Bela Ban / Red Hat. All Rights Reserved.