Package org.jgroups.protocols
Class UNICAST3
- java.lang.Object
-
- org.jgroups.stack.Protocol
-
- org.jgroups.protocols.UNICAST3
-
- All Implemented Interfaces:
AgeOutCache.Handler<Address>
public class UNICAST3 extends Protocol implements AgeOutCache.Handler<Address>
Reliable unicast protocol using a combination of positive and negative acks. See docs/design/UNICAST3.txt for details.- Since:
- 3.3
- Author:
- Bela Ban
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description protected classUNICAST3.Entryprotected classUNICAST3.ReceiverEntryprotected classUNICAST3.RetransmitTaskRetransmitter task which periodically (every xmit_interval ms): If any of the receiver windows have the ack flag set, clears the flag and sends an ack for the highest delivered seqno to the sender Checks all receiver windows for missing messages and asks senders for retransmission For all sender windows, checks if highest acked (HA) < highest sent (HS).protected classUNICAST3.SenderEntryprotected static classUNICAST3.State
-
Field Summary
Fields Modifier and Type Field Description protected intack_thresholdprotected AverageMinMaxavg_delivery_batch_sizeprotected static java.util.function.BiConsumer<MessageBatch,Message>BATCH_ACCUMULATORprotected AgeOutCache<Address>cacheprotected longconn_close_timeoutprotected longconn_expiry_timeoutprotected static longDEFAULT_FIRST_SEQNOprotected static java.util.function.Predicate<Message>dont_loopback_filterprotected java.util.function.Predicate<Message>drop_oob_and_dont_loopback_msgs_filterprotected static MessageDUMMY_OOB_MSGprotected booleanis_traceprotected shortlast_conn_idprotected ExpiryCache<Address>last_sync_sentKeep track of when a SEND_FIRST_SEQNO message was sent to a given senderprotected Addresslocal_addrprotected booleanlog_not_found_msgsprotected intmax_batch_sizeprotected longmax_retransmit_timeprotected intmax_xmit_req_sizeprotected java.util.List<Address>membersprotected MessageCachemsg_cacheprotected longnum_acks_receivedprotected longnum_acks_sentprotected longnum_msgs_receivedprotected longnum_msgs_sentprotected longnum_xmitsprotected java.util.concurrent.ConcurrentMap<Address,UNICAST3.ReceiverEntry>recv_tableprotected java.util.concurrent.locks.ReentrantLockrecv_table_lockprotected booleanrunningprotected java.util.concurrent.ConcurrentMap<Address,UNICAST3.SenderEntry>send_tableprotected booleansends_can_blockprotected longsync_min_intervalprotected TimeServicetime_serviceprotected TimeSchedulertimerprotected java.util.concurrent.atomic.AtomicIntegertimestamperprotected longxmit_intervalprotected java.util.concurrent.atomic.LongAdderxmit_reqs_receivedprotected java.util.concurrent.atomic.LongAdderxmit_reqs_sentprotected java.util.concurrent.atomic.LongAdderxmit_rsps_sentprotected longxmit_table_max_compaction_timeprotected intxmit_table_msgs_per_rowprotected intxmit_table_num_rowsprotected doublexmit_table_resize_factorprotected java.util.concurrent.Future<?>xmit_taskRetransmitTask running every xmit_interval msprotected java.util.Map<Address,java.lang.Long>xmit_task_mapUsed by the retransmit task to keep the last retransmitted seqno per sender (https://issues.jboss.org/browse/JGRP-1539)-
Fields inherited from class org.jgroups.stack.Protocol
after_creation_hook, down_prot, ergonomics, id, log, stack, stats, up_prot
-
-
Constructor Summary
Constructors Constructor Description UNICAST3()
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Modifier and Type Method Description protected static intaccumulate(java.util.function.ToIntFunction<Table> func, java.util.Collection<? extends UNICAST3.Entry>... entries)protected voidaddMessage(UNICAST3.ReceiverEntry entry, Address sender, long seqno, Message msg)protected voidaddQueuedMessages(Address sender, UNICAST3.ReceiverEntry entry, java.util.Collection<Message> queued_msgs)voidcloseConnection(Address mbr)Removes and resets from connection table (which is already locked).voidcloseIdleConnections()voidcloseReceiveConnection(Address mbr)voidcloseSendConnection(Address mbr)protected static intcompare(int ts1, int ts2)Compares 2 timestamps, handles numeric overflowprotected UNICAST3.ReceiverEntrycreateReceiverEntry(Address sender, long seqno, short conn_id)protected TablecreateTable(long seqno)protected voiddeliverBatch(MessageBatch batch)protected voiddeliverMessage(Message msg, Address sender, long seqno)java.lang.Objectdown(Event evt)An event is to be sent down the stack.java.lang.Objectdown(Message msg)A message is sent down the stack.voidexpired(Address key)Called by AgeOutCache, to removed expired connectionsintgetAckThreshold()AgeOutCache<Address>getAgeOutCache()intgetAgeOutCacheSize()java.lang.StringgetAvgBatchDeliverySize()java.lang.StringgetLocalAddress()longgetMaxRetransmitTime()protected shortgetNewConnectionId()longgetNumAcksReceived()longgetNumAcksSent()intgetNumConnections()longgetNumMessagesReceived()longgetNumMessagesSent()intgetNumReceiveConnections()intgetNumSendConnections()intgetNumUnackedMessages()The number of messages in all Entry.sent_msgs tables (haven't received an ACK yet)longgetNumXmits()protected UNICAST3.ReceiverEntrygetReceiverEntry(Address sender, long seqno, boolean first, short conn_id)protected UNICAST3.SenderEntrygetSenderEntry(Address dst)protected longgetTimestamp()intgetTimestamper()intgetXmitTableDeliverableMessages()intgetXmitTableMissingMessages()intgetXmitTableMsgsPerRow()intgetXmitTableNumCompactions()intgetXmitTableNumMoves()intgetXmitTableNumPurges()intgetXmitTableNumResizes()intgetXmitTableNumRows()intgetXmitTableUndeliveredMessages()protected voidhandleAckReceived(Address sender, long seqno, short conn_id, int timestamp)Add the ACK to hashtable.sender.sent_msgsprotected voidhandleBatchFromSelf(MessageBatch batch, UNICAST3.Entry entry)protected voidhandleBatchReceived(UNICAST3.ReceiverEntry entry, Address sender, java.util.List<LongTuple<Message>> msgs, boolean oob)protected voidhandleDataReceived(Address sender, long seqno, short conn_id, boolean first, Message msg)Check whether the hashtable contains an entry e forsender(create if not).protected voidhandleDataReceivedFromSelf(Address sender, long seqno, Message msg)Called when the sender of a message is the local member.protected voidhandleResendingOfFirstMessage(Address sender, int timestamp)We need to resend the first message with our conn_idprotected voidhandleUpEvent(Address sender, Message msg, UnicastHeader3 hdr)protected voidhandleXmitRequest(Address sender, SeqnoList missing)booleanhasSendConnectionTo(Address dest)Used for testing onlyvoidinit()Called after instance has been created (null constructor) and before protocol is started.booleanisXmitTaskRunning()java.lang.StringprintAgeOutCache()java.lang.StringprintConnections()protected java.lang.StringprintMessageList(java.util.List<LongTuple<Message>> list)java.lang.StringprintReceiveWindowMessages()java.lang.StringprintSendWindowMessages()protected voidprocessInternalMessage(Table<Message> win, Address sender)voidremoveAllConnections()This method is public only so it can be invoked by unit testing, but should not otherwise be used !protected voidremoveAndDeliver(Table<Message> win, Address sender)Try to remove as many messages as possible from the table as pass them up.intremoveConnections(boolean remove_send_connections, boolean remove_receive_connections)Removes send- and/or receive-connections whose state is not OPEN (CLOSING or CLOSED).intremoveExpiredConnections()protected voidremoveReceiveConnection(Address mbr)protected voidremoveSendConnection(Address mbr)voidresetStats()protected voidretransmit(Message msg)Called by the sender to resend messages for which no ACK has been received yetprotected voidretransmit(SeqnoList missing, Address sender)Sends a retransmit request to the given senderprotected voidsendAck(Address dst, long seqno, short conn_id)protected voidsendAckFor(Address dest)voidsendClose(Address dest, short conn_id)voidsendPendingAcks()protected voidsendRequestForFirstSeqno(Address dest)UNICAST3setAckThreshold(int ack_threshold)<T extends Protocol>
TsetLevel(java.lang.String level)Sets the level of a logger.voidsetMaxRetransmitTime(long max_retransmit_time)<T extends UNICAST3>
TsetXmitInterval(long interval)UNICAST3setXmitTableMsgsPerRow(int xmit_table_msgs_per_row)UNICAST3setXmitTableNumRows(int xmit_table_num_rows)voidstart()This method is called on aJChannel.connect(String).protected voidstartRetransmitTask()voidstop()This method is called on aJChannel.disconnect().protected voidstopRetransmitTask()voidtriggerXmit()java.lang.Objectup(Message msg)A single message was received.voidup(MessageBatch batch)Sends up a multiple messages in aMessageBatch.protected voidupdate(UNICAST3.Entry entry, int num_received)-
Methods inherited from class org.jgroups.stack.Protocol
accept, afterCreationHook, destroy, enableStats, getConfigurableObjects, getDownProtocol, getDownServices, getId, getIdsAbove, getLevel, getLog, getName, getProtocolStack, getSocketFactory, getThreadFactory, getTransport, getUpProtocol, getUpServices, getValue, isErgonomics, level, parse, providedDownServices, providedUpServices, requiredDownServices, requiredUpServices, resetStatistics, setDownProtocol, setErgonomics, setId, setProtocolStack, setSocketFactory, setUpProtocol, setValue, statsEnabled, up
-
-
-
-
Field Detail
-
DEFAULT_FIRST_SEQNO
protected static final long DEFAULT_FIRST_SEQNO
- See Also:
- Constant Field Values
-
conn_expiry_timeout
protected long conn_expiry_timeout
-
conn_close_timeout
protected long conn_close_timeout
-
xmit_table_num_rows
protected int xmit_table_num_rows
-
xmit_table_msgs_per_row
protected int xmit_table_msgs_per_row
-
xmit_table_resize_factor
protected double xmit_table_resize_factor
-
xmit_table_max_compaction_time
protected long xmit_table_max_compaction_time
-
max_retransmit_time
protected long max_retransmit_time
-
xmit_interval
protected long xmit_interval
-
log_not_found_msgs
protected boolean log_not_found_msgs
-
ack_threshold
protected int ack_threshold
-
sync_min_interval
protected long sync_min_interval
-
max_xmit_req_size
protected int max_xmit_req_size
-
max_batch_size
protected int max_batch_size
-
num_msgs_sent
protected long num_msgs_sent
-
num_msgs_received
protected long num_msgs_received
-
num_acks_sent
protected long num_acks_sent
-
num_acks_received
protected long num_acks_received
-
num_xmits
protected long num_xmits
-
xmit_reqs_received
protected final java.util.concurrent.atomic.LongAdder xmit_reqs_received
-
xmit_reqs_sent
protected final java.util.concurrent.atomic.LongAdder xmit_reqs_sent
-
xmit_rsps_sent
protected final java.util.concurrent.atomic.LongAdder xmit_rsps_sent
-
avg_delivery_batch_size
protected final AverageMinMax avg_delivery_batch_size
-
sends_can_block
protected boolean sends_can_block
-
is_trace
protected boolean is_trace
-
send_table
protected final java.util.concurrent.ConcurrentMap<Address,UNICAST3.SenderEntry> send_table
-
recv_table
protected final java.util.concurrent.ConcurrentMap<Address,UNICAST3.ReceiverEntry> recv_table
-
recv_table_lock
protected final java.util.concurrent.locks.ReentrantLock recv_table_lock
-
xmit_task_map
protected final java.util.Map<Address,java.lang.Long> xmit_task_map
Used by the retransmit task to keep the last retransmitted seqno per sender (https://issues.jboss.org/browse/JGRP-1539)
-
xmit_task
protected java.util.concurrent.Future<?> xmit_task
RetransmitTask running every xmit_interval ms
-
members
protected volatile java.util.List<Address> members
-
local_addr
protected Address local_addr
-
timer
protected TimeScheduler timer
-
running
protected volatile boolean running
-
last_conn_id
protected short last_conn_id
-
cache
protected AgeOutCache<Address> cache
-
time_service
protected TimeService time_service
-
timestamper
protected final java.util.concurrent.atomic.AtomicInteger timestamper
-
last_sync_sent
protected ExpiryCache<Address> last_sync_sent
Keep track of when a SEND_FIRST_SEQNO message was sent to a given sender
-
msg_cache
protected final MessageCache msg_cache
-
DUMMY_OOB_MSG
protected static final Message DUMMY_OOB_MSG
-
drop_oob_and_dont_loopback_msgs_filter
protected final java.util.function.Predicate<Message> drop_oob_and_dont_loopback_msgs_filter
-
dont_loopback_filter
protected static final java.util.function.Predicate<Message> dont_loopback_filter
-
BATCH_ACCUMULATOR
protected static final java.util.function.BiConsumer<MessageBatch,Message> BATCH_ACCUMULATOR
-
-
Method Detail
-
getLocalAddress
public java.lang.String getLocalAddress()
-
getNumSendConnections
public int getNumSendConnections()
-
getNumReceiveConnections
public int getNumReceiveConnections()
-
getNumConnections
public int getNumConnections()
-
getTimestamper
public int getTimestamper()
-
getAvgBatchDeliverySize
public java.lang.String getAvgBatchDeliverySize()
-
getAckThreshold
public int getAckThreshold()
-
setAckThreshold
public UNICAST3 setAckThreshold(int ack_threshold)
-
setLevel
public <T extends Protocol> T setLevel(java.lang.String level)
Description copied from class:ProtocolSets the level of a logger. This method is used to dynamically change the logging level of a running system, e.g. via JMX. The appender of a level needs to exist.
-
setXmitInterval
public <T extends UNICAST3> T setXmitInterval(long interval)
-
getXmitTableNumRows
public int getXmitTableNumRows()
-
setXmitTableNumRows
public UNICAST3 setXmitTableNumRows(int xmit_table_num_rows)
-
getXmitTableMsgsPerRow
public int getXmitTableMsgsPerRow()
-
setXmitTableMsgsPerRow
public UNICAST3 setXmitTableMsgsPerRow(int xmit_table_msgs_per_row)
-
printConnections
public java.lang.String printConnections()
-
getNumMessagesSent
public long getNumMessagesSent()
-
getNumMessagesReceived
public long getNumMessagesReceived()
-
getNumAcksSent
public long getNumAcksSent()
-
getNumAcksReceived
public long getNumAcksReceived()
-
getNumXmits
public long getNumXmits()
-
getMaxRetransmitTime
public long getMaxRetransmitTime()
-
setMaxRetransmitTime
public void setMaxRetransmitTime(long max_retransmit_time)
-
isXmitTaskRunning
public boolean isXmitTaskRunning()
-
getAgeOutCacheSize
public int getAgeOutCacheSize()
-
printAgeOutCache
public java.lang.String printAgeOutCache()
-
getAgeOutCache
public AgeOutCache<Address> getAgeOutCache()
-
hasSendConnectionTo
public boolean hasSendConnectionTo(Address dest)
Used for testing only
-
getNumUnackedMessages
public int getNumUnackedMessages()
The number of messages in all Entry.sent_msgs tables (haven't received an ACK yet)
-
getXmitTableUndeliveredMessages
public int getXmitTableUndeliveredMessages()
-
getXmitTableMissingMessages
public int getXmitTableMissingMessages()
-
getXmitTableDeliverableMessages
public int getXmitTableDeliverableMessages()
-
getXmitTableNumCompactions
public int getXmitTableNumCompactions()
-
getXmitTableNumMoves
public int getXmitTableNumMoves()
-
getXmitTableNumResizes
public int getXmitTableNumResizes()
-
getXmitTableNumPurges
public int getXmitTableNumPurges()
-
printReceiveWindowMessages
public java.lang.String printReceiveWindowMessages()
-
printSendWindowMessages
public java.lang.String printSendWindowMessages()
-
resetStats
public void resetStats()
- Overrides:
resetStatsin classProtocol
-
init
public void init() throws java.lang.ExceptionDescription copied from class:ProtocolCalled after instance has been created (null constructor) and before protocol is started. Properties are already set. Other protocols are not yet connected and events cannot yet be sent.
-
start
public void start() throws java.lang.ExceptionDescription copied from class:ProtocolThis method is called on aJChannel.connect(String). Starts work. Protocols are connected and queues are ready to receive events. Will be called from bottom to top. This call will replace the START and START_OK events.- Overrides:
startin classProtocol- Throws:
java.lang.Exception- Thrown if protocol cannot be started successfully. This will cause the ProtocolStack to fail, soJChannel.connect(String)will throw an exception
-
stop
public void stop()
Description copied from class:ProtocolThis method is called on aJChannel.disconnect(). Stops work (e.g. by closing multicast socket). Will be called from top to bottom. This means that at the time of the method invocation the neighbor protocol below is still working. This method will replace the STOP, STOP_OK, CLEANUP and CLEANUP_OK events. The ProtocolStack guarantees that when this method is called all messages in the down queue will have been flushed
-
up
public java.lang.Object up(Message msg)
Description copied from class:ProtocolA single message was received. Protocols may examine the message and do something (e.g. add a header) with it before passing it up.
-
handleUpEvent
protected void handleUpEvent(Address sender, Message msg, UnicastHeader3 hdr)
-
up
public void up(MessageBatch batch)
Description copied from class:ProtocolSends up a multiple messages in aMessageBatch. The sender of the batch is always the same, and so is the destination (null == multicast messages). Messages in a batch can be OOB messages, regular messages, or mixed messages, although the transport itself will create initial MessageBatches that contain only either OOB or regular messages. The default processing below sends messages up the stack individually, based on a matching criteria (callingProtocol.accept(org.jgroups.Message)), and - if true - callsProtocol.up(org.jgroups.Event)for that message and removes the message. If the batch is not empty, it is passed up, or else it is dropped. Subclasses should check if there are any messages destined for them (e.g. usingMessageBatch.getMatchingMessages(short,boolean)), then possibly remove and process them and finally pass the batch up to the next protocol. Protocols can also modify messages in place, e.g. ENCRYPT could decrypt all encrypted messages in the batch, not remove them, and pass the batch up when done.
-
handleBatchFromSelf
protected void handleBatchFromSelf(MessageBatch batch, UNICAST3.Entry entry)
-
down
public java.lang.Object down(Event evt)
Description copied from class:ProtocolAn event is to be sent down the stack. A protocol may want to examine its type and perform some action on it, depending on the event's type. If the event is a message MSG, then the protocol may need to add a header to it (or do nothing at all) before sending it down the stack usingdown_prot.down().
-
down
public java.lang.Object down(Message msg)
Description copied from class:ProtocolA message is sent down the stack. Protocols may examine the message and do something (e.g. add a header) with it before passing it down.
-
closeConnection
public void closeConnection(Address mbr)
Removes and resets from connection table (which is already locked). Returns true if member was found, otherwise false. This method is public only so it can be invoked by unit testing, but should not be used !
-
closeSendConnection
public void closeSendConnection(Address mbr)
-
closeReceiveConnection
public void closeReceiveConnection(Address mbr)
-
removeSendConnection
protected void removeSendConnection(Address mbr)
-
removeReceiveConnection
protected void removeReceiveConnection(Address mbr)
-
removeAllConnections
public void removeAllConnections()
This method is public only so it can be invoked by unit testing, but should not otherwise be used !
-
retransmit
protected void retransmit(SeqnoList missing, Address sender)
Sends a retransmit request to the given sender
-
retransmit
protected void retransmit(Message msg)
Called by the sender to resend messages for which no ACK has been received yet
-
expired
public void expired(Address key)
Called by AgeOutCache, to removed expired connections- Specified by:
expiredin interfaceAgeOutCache.Handler<Address>- Parameters:
key-
-
handleDataReceived
protected void handleDataReceived(Address sender, long seqno, short conn_id, boolean first, Message msg)
Check whether the hashtable contains an entry e forsender(create if not). If e.received_msgs is null andfirstis true: create a new AckReceiverWindow(seqno) and add message. Set e.received_msgs to the new window. Else just add the message.
-
addMessage
protected void addMessage(UNICAST3.ReceiverEntry entry, Address sender, long seqno, Message msg)
-
addQueuedMessages
protected void addQueuedMessages(Address sender, UNICAST3.ReceiverEntry entry, java.util.Collection<Message> queued_msgs)
-
handleDataReceivedFromSelf
protected void handleDataReceivedFromSelf(Address sender, long seqno, Message msg)
Called when the sender of a message is the local member. In this case, we don't need to add the message to the table as the sender already did that
-
handleBatchReceived
protected void handleBatchReceived(UNICAST3.ReceiverEntry entry, Address sender, java.util.List<LongTuple<Message>> msgs, boolean oob)
-
removeAndDeliver
protected void removeAndDeliver(Table<Message> win, Address sender)
Try to remove as many messages as possible from the table as pass them up. Prevents concurrent passing up of messages by different threads (http://jira.jboss.com/jira/browse/JGRP-198); lots of threads can come up to this point concurrently, but only 1 is allowed to pass at a time. We *can* deliver messages from *different* senders concurrently, e.g. reception of P1, Q1, P2, Q2 can result in delivery of P1, Q1, Q2, P2: FIFO (implemented by UNICAST) says messages need to be delivered in the order in which they were sent
-
printMessageList
protected java.lang.String printMessageList(java.util.List<LongTuple<Message>> list)
-
getReceiverEntry
protected UNICAST3.ReceiverEntry getReceiverEntry(Address sender, long seqno, boolean first, short conn_id)
-
getSenderEntry
protected UNICAST3.SenderEntry getSenderEntry(Address dst)
-
createReceiverEntry
protected UNICAST3.ReceiverEntry createReceiverEntry(Address sender, long seqno, short conn_id)
-
createTable
protected Table createTable(long seqno)
-
handleAckReceived
protected void handleAckReceived(Address sender, long seqno, short conn_id, int timestamp)
Add the ACK to hashtable.sender.sent_msgs
-
handleResendingOfFirstMessage
protected void handleResendingOfFirstMessage(Address sender, int timestamp)
We need to resend the first message with our conn_id- Parameters:
sender-
-
deliverBatch
protected void deliverBatch(MessageBatch batch)
-
getTimestamp
protected long getTimestamp()
-
startRetransmitTask
protected void startRetransmitTask()
-
stopRetransmitTask
protected void stopRetransmitTask()
-
sendAck
protected void sendAck(Address dst, long seqno, short conn_id)
-
getNewConnectionId
protected short getNewConnectionId()
-
sendRequestForFirstSeqno
protected void sendRequestForFirstSeqno(Address dest)
-
sendClose
public void sendClose(Address dest, short conn_id)
-
closeIdleConnections
public void closeIdleConnections()
-
removeExpiredConnections
public int removeExpiredConnections()
-
removeConnections
public int removeConnections(boolean remove_send_connections, boolean remove_receive_connections)Removes send- and/or receive-connections whose state is not OPEN (CLOSING or CLOSED).- Parameters:
remove_send_connections- If true, send connections whose state is !OPEN are destroyed and removedremove_receive_connections- If true, receive connections with state !OPEN are destroyed and removed- Returns:
- The number of connections which were removed
-
triggerXmit
public void triggerXmit()
-
sendPendingAcks
public void sendPendingAcks()
-
sendAckFor
protected void sendAckFor(Address dest)
-
update
protected void update(UNICAST3.Entry entry, int num_received)
-
compare
protected static int compare(int ts1, int ts2)Compares 2 timestamps, handles numeric overflow
-
accumulate
@SafeVarargs protected static int accumulate(java.util.function.ToIntFunction<Table> func, java.util.Collection<? extends UNICAST3.Entry>... entries)
-
-