to the hashtable (peer address is the key). All
messages sent to that peer will be added to hashtable.peer_addr.sent_msgs. When we receive a
message from a peer for the first time, another entry will be created and added to the hashtable
(unless already existing). Msgs will then be added to hashtable.peer_addr.received_msgs. This
layer is used to reliably transmit point-to-point messages, that is, either messages sent to a
single receiver (vs. messages multicast to a group) or for example replies to a multicast message. The
sender uses an AckSenderWindow
which retransmits messages for which it hasn't received
an ACK, the receiver uses AckReceiverWindow
which keeps track of the lowest seqno
received so far, and keeps messages in order.
Messages in both AckSenderWindows and AckReceiverWindows will be removed. A message will be removed from
AckSenderWindow when an ACK has been received for it and messages will be removed from AckReceiverWindow
whenever a message is received: the new message is added and then we try to remove as many messages as
possible (until we stop at a gap, or there are no more messages).
- Author:
- Bela Ban
Method Summary |
java.lang.Object |
down(Event evt)
An event is to be sent down the stack. |
java.util.Map<java.lang.String,java.lang.Object> |
dumpStats()
|
void |
expired(Address key)
Called by AgeOutCache, to removed expired connections |
AgeOutCache<Address> |
getAgeOutCache()
|
int |
getAgeOutCacheSize()
|
java.lang.String |
getLocalAddress()
|
long |
getMaxRetransmitTime()
|
java.lang.String |
getMembers()
|
protected short |
getNewConnectionId()
|
long |
getNumAcksReceived()
|
long |
getNumAcksSent()
|
int |
getNumberOfMessagesInReceiveWindows()
|
int |
getNumConnections()
|
long |
getNumMessagesReceived()
|
long |
getNumMessagesSent()
|
int |
getNumReceiveConnections()
|
int |
getNumSendConnections()
|
int |
getNumUnackedMessages()
The number of messages in all Entry.sent_msgs tables (haven't received an ACK yet) |
long |
getNumXmits()
|
protected UNICAST.ReceiverEntry |
getOrCreateReceiverEntry(Address sender,
long seqno,
short conn_id)
|
protected UNICAST.ReceiverEntry |
getReceiverEntry(Address sender,
long seqno,
boolean first,
short conn_id)
|
int[] |
getTimeout()
|
long |
getXmitTableMissingMessages()
|
int |
getXmitTableNumCompactions()
|
int |
getXmitTableNumMoves()
|
int |
getXmitTableNumPurges()
|
int |
getXmitTableNumResizes()
|
long |
getXmitTableUndeliveredMessages()
|
protected void |
handleAckReceived(Address sender,
long seqno)
Add the ACK to hashtable.sender.sent_msgs |
protected void |
handleDataReceived(Address sender,
long seqno,
short conn_id,
boolean first,
Message msg,
Event evt)
Check whether the hashtable contains an entry e for sender (create if not). |
protected void |
handleResendingOfFirstMessage(Address sender,
long seqno)
We need to resend our first message with our conn_id |
boolean |
isConnectionReaperRunning()
|
boolean |
isXmitTaskRunning()
|
java.lang.String |
printAgeOutCache()
|
java.lang.String |
printConnections()
|
java.lang.String |
printReceiveWindowMessages()
|
java.lang.String |
printSendWindowMessages()
|
void |
reapIdleConnections()
|
void |
removeAllConnections()
This method is public only so it can be invoked by unit testing, but should not otherwise be used ! |
protected int |
removeAndDeliver(java.util.concurrent.atomic.AtomicBoolean processing,
Table<Message> win)
|
void |
removeConnection(Address mbr)
Removes and resets from connection table (which is already locked). |
void |
removeReceiveConnection(Address mbr)
|
void |
removeSendConnection(Address mbr)
|
void |
resetStats()
|
void |
retransmit(Message msg)
Called by AckSenderWindow to resend messages for which no ACK has been received yet |
protected void |
send(Message msg,
Event evt)
|
protected void |
sendAck(Address dst,
long seqno)
|
protected void |
sendRequestForFirstSeqno(Address dest,
long seqno_received)
|
void |
setMaxMessageBatchSize(int size)
|
void |
setMaxRetransmitTime(long max_retransmit_time)
|
void |
setTimeout(int[] val)
Deprecated. |
void |
start()
This method is called on a Channel.connect(String) . |
protected void |
startConnectionReaper()
|
protected void |
startRetransmitTask()
|
void |
stop()
This method is called on a Channel.disconnect() . |
protected void |
stopConnectionReaper()
|
protected void |
stopRetransmitTask()
|
java.lang.Object |
up(Event evt)
An event was received from the layer below. |
Methods inherited from class org.jgroups.stack.Protocol |
destroy, enableStats, getConfigurableObjects, getDownProtocol, getDownServices, getId, getIdsAbove, getLevel, getName, getProtocolStack, getSocketFactory, getThreadFactory, getTransport, getUpProtocol, getUpServices, getValue, init, isErgonomics, printStats, providedDownServices, providedUpServices, requiredDownServices, requiredUpServices, resetStatistics, setDownProtocol, setErgonomics, setId, setLevel, setProtocolStack, setSocketFactory, setUpProtocol, setValue, setValues, statsEnabled |
Methods inherited from class java.lang.Object |
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait |
DEFAULT_FIRST_SEQNO
public static final long DEFAULT_FIRST_SEQNO
- See Also:
- Constant Field Values
timeout
@Deprecated
protected int[] timeout
- Deprecated.
max_msg_batch_size
protected int max_msg_batch_size
conn_expiry_timeout
protected long conn_expiry_timeout
segment_capacity
@Deprecated
protected int segment_capacity
- Deprecated.
xmit_table_num_rows
protected int xmit_table_num_rows
xmit_table_msgs_per_row
protected int xmit_table_msgs_per_row
xmit_table_resize_factor
protected double xmit_table_resize_factor
xmit_table_max_compaction_time
protected long xmit_table_max_compaction_time
xmit_interval
protected long xmit_interval
num_msgs_sent
protected long num_msgs_sent
num_msgs_received
protected long num_msgs_received
num_acks_sent
protected long num_acks_sent
num_acks_received
protected long num_acks_received
num_xmits
protected long num_xmits
send_table
protected final java.util.concurrent.ConcurrentMap<Address,UNICAST.SenderEntry> send_table
recv_table
protected final java.util.concurrent.ConcurrentMap<Address,UNICAST.ReceiverEntry> recv_table
recv_table_lock
protected final java.util.concurrent.locks.ReentrantLock recv_table_lock
xmit_task
protected java.util.concurrent.Future<?> xmit_task
- RetransmitTask running every xmit_interval ms
members
protected volatile java.util.List<Address> members
local_addr
protected Address local_addr
timer
protected TimeScheduler timer
running
protected volatile boolean running
last_conn_id
protected short last_conn_id
max_retransmit_time
protected long max_retransmit_time
cache
protected AgeOutCache<Address> cache
connection_reaper
protected java.util.concurrent.Future<?> connection_reaper
UNICAST
public UNICAST()
getTimeout
public int[] getTimeout()
setTimeout
@Deprecated
public void setTimeout(int[] val)
- Deprecated.
setMaxMessageBatchSize
public void setMaxMessageBatchSize(int size)
getLocalAddress
public java.lang.String getLocalAddress()
getMembers
public java.lang.String getMembers()
isConnectionReaperRunning
public boolean isConnectionReaperRunning()
getNumSendConnections
public int getNumSendConnections()
getNumReceiveConnections
public int getNumReceiveConnections()
getNumConnections
public int getNumConnections()
printConnections
public java.lang.String printConnections()
getNumMessagesSent
public long getNumMessagesSent()
getNumMessagesReceived
public long getNumMessagesReceived()
getNumAcksSent
public long getNumAcksSent()
getNumAcksReceived
public long getNumAcksReceived()
getNumXmits
public long getNumXmits()
getMaxRetransmitTime
public long getMaxRetransmitTime()
setMaxRetransmitTime
public void setMaxRetransmitTime(long max_retransmit_time)
isXmitTaskRunning
public boolean isXmitTaskRunning()
getAgeOutCacheSize
public int getAgeOutCacheSize()
printAgeOutCache
public java.lang.String printAgeOutCache()
getAgeOutCache
public AgeOutCache<Address> getAgeOutCache()
getNumUnackedMessages
public int getNumUnackedMessages()
- The number of messages in all Entry.sent_msgs tables (haven't received an ACK yet)
getNumberOfMessagesInReceiveWindows
public int getNumberOfMessagesInReceiveWindows()
getXmitTableUndeliveredMessages
public long getXmitTableUndeliveredMessages()
getXmitTableMissingMessages
public long getXmitTableMissingMessages()
getXmitTableNumCompactions
public int getXmitTableNumCompactions()
getXmitTableNumMoves
public int getXmitTableNumMoves()
getXmitTableNumResizes
public int getXmitTableNumResizes()
getXmitTableNumPurges
public int getXmitTableNumPurges()
printReceiveWindowMessages
public java.lang.String printReceiveWindowMessages()
printSendWindowMessages
public java.lang.String printSendWindowMessages()
resetStats
public void resetStats()
- Overrides:
resetStats
in class Protocol
dumpStats
public java.util.Map<java.lang.String,java.lang.Object> dumpStats()
- Overrides:
dumpStats
in class Protocol
start
public void start()
throws java.lang.Exception
- Description copied from class:
Protocol
- This method is called on a
Channel.connect(String)
. Starts work.
Protocols are connected and queues are ready to receive events.
Will be called from bottom to top. This call will replace
the START and START_OK events.
- Overrides:
start
in class Protocol
- Throws:
java.lang.Exception
- Thrown if protocol cannot be started successfully. This will cause the ProtocolStack
to fail, so Channel.connect(String)
will throw an exception
stop
public void stop()
- Description copied from class:
Protocol
- This method is called on a
Channel.disconnect()
. Stops work (e.g. by closing multicast socket).
Will be called from top to bottom. This means that at the time of the method invocation the
neighbor protocol below is still working. This method will replace the
STOP, STOP_OK, CLEANUP and CLEANUP_OK events. The ProtocolStack guarantees that
when this method is called all messages in the down queue will have been flushed
- Overrides:
stop
in class Protocol
up
public java.lang.Object up(Event evt)
- Description copied from class:
Protocol
- An event was received from the layer below. Usually the current layer will want to examine
the event type and - depending on its type - perform some computation
(e.g. removing headers from a MSG event type, or updating the internal membership list
when receiving a VIEW_CHANGE event).
Finally the event is either a) discarded, or b) an event is sent down
the stack using
down_prot.down()
or c) the event (or another event) is sent up
the stack using up_prot.up()
.
- Overrides:
up
in class Protocol
down
public java.lang.Object down(Event evt)
- Description copied from class:
Protocol
- An event is to be sent down the stack. The layer may want to examine its type and perform
some action on it, depending on the event's type. If the event is a message MSG, then
the layer may need to add a header to it (or do nothing at all) before sending it down
the stack using
down_prot.down()
. In case of a GET_ADDRESS event (which tries to
retrieve the stack's address from one of the bottom layers), the layer may need to send
a new response event back up the stack using up_prot.up()
.
- Overrides:
down
in class Protocol
send
protected void send(Message msg,
Event evt)
removeConnection
public void removeConnection(Address mbr)
- Removes and resets from connection table (which is already locked). Returns true if member was found,
otherwise false. This method is public only so it can be invoked by unit testing, but should not otherwise be
used !
removeSendConnection
public void removeSendConnection(Address mbr)
removeReceiveConnection
public void removeReceiveConnection(Address mbr)
removeAllConnections
public void removeAllConnections()
- This method is public only so it can be invoked by unit testing, but should not otherwise be used !
retransmit
public void retransmit(Message msg)
- Called by AckSenderWindow to resend messages for which no ACK has been received yet
expired
public void expired(Address key)
- Called by AgeOutCache, to removed expired connections
- Specified by:
expired
in interface AgeOutCache.Handler<Address>
- Parameters:
key
-
handleDataReceived
protected void handleDataReceived(Address sender,
long seqno,
short conn_id,
boolean first,
Message msg,
Event evt)
- Check whether the hashtable contains an entry e for
sender
(create if not). If
e.received_msgs is null and first
is true: create a new AckReceiverWindow(seqno) and
add message. Set e.received_msgs to the new window. Else just add the message.
removeAndDeliver
protected int removeAndDeliver(java.util.concurrent.atomic.AtomicBoolean processing,
Table<Message> win)
getReceiverEntry
protected UNICAST.ReceiverEntry getReceiverEntry(Address sender,
long seqno,
boolean first,
short conn_id)
getOrCreateReceiverEntry
protected UNICAST.ReceiverEntry getOrCreateReceiverEntry(Address sender,
long seqno,
short conn_id)
handleAckReceived
protected void handleAckReceived(Address sender,
long seqno)
- Add the ACK to hashtable.sender.sent_msgs
handleResendingOfFirstMessage
protected void handleResendingOfFirstMessage(Address sender,
long seqno)
- We need to resend our first message with our conn_id
- Parameters:
sender
- seqno
- Resend the non null messages in the range [lowest .. seqno]
startRetransmitTask
protected void startRetransmitTask()
stopRetransmitTask
protected void stopRetransmitTask()
sendAck
protected void sendAck(Address dst,
long seqno)
startConnectionReaper
protected void startConnectionReaper()
stopConnectionReaper
protected void stopConnectionReaper()
getNewConnectionId
protected short getNewConnectionId()
sendRequestForFirstSeqno
protected void sendRequestForFirstSeqno(Address dest,
long seqno_received)
reapIdleConnections
public void reapIdleConnections()
Copyright © 1998-2012 Bela Ban / Red Hat. All Rights Reserved.