Package org.jgroups.protocols
Class ReliableUnicast
- java.lang.Object
-
- org.jgroups.stack.Protocol
-
- org.jgroups.protocols.ReliableUnicast
-
- All Implemented Interfaces:
Lifecycle
,AgeOutCache.Handler<Address>
- Direct Known Subclasses:
UNICAST4
public abstract class ReliableUnicast extends Protocol implements AgeOutCache.Handler<Address>
Base class for reliable unicast protocols- Since:
- 5.4
- Author:
- Bela Ban
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description protected class
ReliableUnicast.Entry
class
ReliableUnicast.ReceiverEntry
protected class
ReliableUnicast.RetransmitTask
Retransmitter task which periodically (every xmit_interval ms): If any of the receiver windows have the ack flag set, clears the flag and sends an ack for the highest delivered seqno to the sender Checks all receiver windows for missing messages and asks senders for retransmission For all sender windows, checks if highest acked (HA) < highest sent (HS).protected class
ReliableUnicast.SenderEntry
protected static class
ReliableUnicast.State
-
Field Summary
Fields Modifier and Type Field Description protected AverageMinMax
avg_delivery_batch_size
protected static java.util.function.BiConsumer<MessageBatch,Message>
BATCH_ACCUMULATOR
protected AgeOutCache<Address>
cache
protected java.util.Map<Address,MessageBatch>
cached_batches
To cache batches for sending messages up the stack (https://issues.redhat.com/browse/JGRP-2841).protected long
conn_close_timeout
protected long
conn_expiry_timeout
protected static long
DEFAULT_FIRST_SEQNO
protected static int
DEFAULT_INCREMENT
protected static int
DEFAULT_INITIAL_CAPACITY
protected static long
DEFAULT_XMIT_INTERVAL
protected java.util.function.Predicate<Message>
drop_oob_and_dont_loopback_msgs_filter
protected static Message
DUMMY_OOB_MSG
protected boolean
is_trace
protected short
last_conn_id
protected ExpiryCache<Address>
last_sync_sent
Keep track of when a SEND_FIRST_SEQNO message was sent to a given senderprotected boolean
log_not_found_msgs
protected boolean
loopback
protected int
max_batch_size
protected long
max_retransmit_time
protected int
max_xmit_req_size
protected java.util.List<Address>
members
protected MessageCache
msg_cache
protected java.util.concurrent.atomic.LongAdder
num_acks_received
protected java.util.concurrent.atomic.LongAdder
num_acks_sent
protected java.util.concurrent.atomic.LongAdder
num_loopbacks
protected java.util.concurrent.atomic.LongAdder
num_msgs_received
protected java.util.concurrent.atomic.LongAdder
num_msgs_sent
protected java.util.concurrent.atomic.LongAdder
num_xmits
protected java.util.Map<Address,ReliableUnicast.ReceiverEntry>
recv_table
protected java.util.concurrent.locks.ReentrantLock
recv_table_lock
protected boolean
relay_present
protected static java.util.function.Predicate<Message>
remove_filter
protected boolean
reuse_message_batches
protected boolean
running
protected boolean
send_atomically
protected java.util.Map<Address,ReliableUnicast.SenderEntry>
send_table
protected boolean
sends_can_block
protected long
sync_min_interval
protected TimeService
time_service
protected TimeScheduler
timer
protected java.util.concurrent.atomic.AtomicInteger
timestamper
protected long
xmit_interval
protected java.util.concurrent.atomic.LongAdder
xmit_reqs_received
protected java.util.concurrent.atomic.LongAdder
xmit_reqs_sent
protected java.util.concurrent.atomic.LongAdder
xmit_rsps_sent
protected java.util.concurrent.Future<?>
xmit_task
RetransmitTask running every xmit_interval msprotected java.util.Map<Address,java.lang.Long>
xmit_task_map
Used by the retransmit task to keep the last retransmitted seqno per member (applicable only for received messages (ReceiverEntry)): https://issues.redhat.com/browse/JGRP-1539protected boolean
xmits_enabled
-
Fields inherited from class org.jgroups.stack.Protocol
after_creation_hook, down_prot, ergonomics, id, local_addr, log, policies, stack, stats, up_prot
-
-
Constructor Summary
Constructors Constructor Description ReliableUnicast()
-
Method Summary
All Methods Static Methods Instance Methods Abstract Methods Concrete Methods Deprecated Methods Modifier and Type Method Description ReliableUnicast.ReceiverEntry
_getReceiverEntry(Address sender, long seqno, boolean first, short conn_id, Address real_dest)
protected static int
accumulate(java.util.function.ToIntFunction<Buffer<Message>> func, java.util.Collection<? extends ReliableUnicast.Entry>... entries)
protected void
addMessage(ReliableUnicast.ReceiverEntry entry, Address sender, long seqno, Message msg)
protected void
addQueuedMessages(Address sender, ReliableUnicast.ReceiverEntry entry, java.util.Collection<Message> queued_msgs)
protected boolean
addToSendBuffer(Buffer<Message> win, long seq, Message msg, java.util.function.Predicate<Message> filter, boolean dont_block)
Adds the message to the send buffer.ReliableUnicast
clearCachedBatches()
void
closeConnection(Address mbr)
Removes and resets from connection table (which is already locked).void
closeIdleConnections()
void
closeReceiveConnection(Address mbr)
void
closeSendConnection(Address mbr)
protected static int
compare(int ts1, int ts2)
Compares 2 timestamps, handles numeric overflowprotected ReliableUnicast.ReceiverEntry
compareConnIds(short other, short mine, boolean first, ReliableUnicast.ReceiverEntry e, Address sender, long seqno, Address real_dest)
protected abstract Buffer<Message>
createBuffer(long initial_seqno)
protected ReliableUnicast.ReceiverEntry
createReceiverEntry(Address sender, long seqno, short conn_id, Address dest)
protected void
deliverBatch(MessageBatch batch, ReliableUnicast.Entry entry, Address original_dest)
protected void
deliverMessage(Message msg, Address sender, long seqno)
java.lang.Object
down(Event evt)
An event is to be sent down the stack.java.lang.Object
down(Message msg)
A message is sent down the stack.void
expired(Address key)
Called by AgeOutCache, to removed expired connectionsAgeOutCache<Address>
getAgeOutCache()
int
getAgeOutCacheSize()
long
getConnCloseTimeout()
long
getConnExpiryTimeout()
protected static Tuple<java.lang.Long,java.lang.Boolean>
getLowestSeqno(short prot_id, java.util.List<LongTuple<Message>> list)
long
getMaxRetransmitTime()
int
getMaxXmitReqSize()
protected short
getNewConnectionId()
long
getNumAcksReceived()
long
getNumAcksSent()
int
getNumConnections()
long
getNumLoopbacks()
long
getNumMessagesReceived()
Deprecated.long
getNumMessagesSent()
Deprecated.int
getNumReceiveConnections()
int
getNumSendConnections()
int
getNumUnackedMessages()
The number of messages in all Entry.sent_msgs tables (haven't received an ACK yet)long
getNumXmits()
protected ReliableUnicast.ReceiverEntry
getReceiverEntry(Address sender, long seqno, boolean first, short conn_id, Address real_dest)
protected ReliableUnicast.SenderEntry
getSenderEntry(Address dst)
long
getSyncMinInterval()
protected long
getTimestamp()
int
getTimestamper()
long
getXmitInterval()
int
getXmitTableDeliverableMessages()
int
getXmitTableMissingMessages()
int
getXmitTableUndeliveredMessages()
protected void
handleAckReceived(Address sender, long seqno, short conn_id, int timestamp)
Add the ACK to hashtable.sender.sent_msgsprotected void
handleBatchFromSelf(MessageBatch batch, ReliableUnicast.Entry entry)
protected void
handleBatchReceived(ReliableUnicast.ReceiverEntry entry, Address sender, java.util.List<LongTuple<Message>> msgs, boolean oob, Address original_dest)
protected void
handleDataReceived(Address sender, long seqno, short conn_id, boolean first, Message msg)
Check whether the hashtable contains an entry e forsender
(create if not).protected void
handleDataReceivedFromSelf(Address sender, long seqno, Message msg)
Called when the sender of a message is the local member.protected void
handleResendingOfFirstMessage(Address sender, int timestamp)
We need to resend the first message with our conn_idprotected void
handleUpEvent(Address sender, Message msg, UnicastHeader hdr)
protected void
handleXmitRequest(Address sender, SeqnoList missing)
boolean
hasSendConnectionTo(Address dest)
Used for testing onlyvoid
init()
Called after a protocol has been created and before the protocol is started.protected static boolean
isCallerRunsHandler(java.util.concurrent.RejectedExecutionHandler h)
protected boolean
isLocal(Address addr)
protected boolean
isLocalSiteMaster(Address dest)
boolean
isXmitsEnabled()
boolean
isXmitTaskRunning()
ReliableUnicast
lastSync(ExpiryCache<Address> c)
boolean
logNotFoundMsgs()
ReliableUnicast
logNotFoundMsgs(boolean l)
boolean
loopback()
ReliableUnicast
loopback(boolean b)
protected static short
max(short a, short b)
protected abstract boolean
needToSendAck(ReliableUnicast.Entry e, int num_acks)
java.lang.String
printAgeOutCache()
java.lang.String
printCachedBatches()
java.lang.String
printConnections()
protected java.lang.String
printMessageList(java.util.List<LongTuple<Message>> list)
java.lang.String
printReceiveWindowMessages()
java.lang.String
printSendWindowMessages()
void
removeAllConnections()
This method is public only so it can be invoked by unit testing, but should not otherwise be used !protected void
removeAndDeliver(ReliableUnicast.Entry entry, Address sender, AsciiString cluster, int min_size)
Try to remove as many messages as possible from the table as pass them up.int
removeConnections(boolean remove_send_connections, boolean remove_receive_connections)
Removes send- and/or receive-connections whose state is not OPEN (CLOSING or CLOSED).int
removeExpiredConnections()
void
removeReceiveConnection(Address mbr)
void
removeSendConnection(Address mbr)
protected void
resend(Message msg)
void
resetStats()
protected void
retransmit(Message msg)
Called by the sender to resend messages for which no ACK has been received yetprotected void
retransmit(SeqnoList missing, Address sender, Address real_dest)
Sends a retransmit request to the given senderboolean
reuseMessageBatches()
ReliableUnicast
reuseMessageBatches(boolean b)
protected boolean
send(Message msg, ReliableUnicast.SenderEntry entry, boolean dont_loopback_set, boolean dont_block)
protected void
sendAck(Address dst, ReliableUnicast.Entry entry, Address real_dest)
boolean
sendAtomically()
ReliableUnicast
sendAtomically(boolean f)
void
sendClose(Address dest, short conn_id)
protected Buffer.Options
sendOptions()
void
sendPendingAcks()
protected void
sendRequestForFirstSeqno(Address dest, Address original_dest)
boolean
sendsCanBlock()
ReliableUnicast
sendsCanBlock(boolean s)
ReliableUnicast
setConnCloseTimeout(long c)
ReliableUnicast
setConnExpiryTimeout(long c)
<T extends Protocol>
TsetLevel(java.lang.String level)
Sets the level of a logger.ReliableUnicast
setMaxRetransmitTime(long max_retransmit_time)
ReliableUnicast
setMaxXmitReqSize(int m)
ReliableUnicast
setSyncMinInterval(long s)
ReliableUnicast
setXmitInterval(long i)
ReliableUnicast
setXmitsEnabled(boolean b)
void
start()
This method is called on aJChannel.connect(String)
; starts work.void
startRetransmitTask()
void
stop()
Called on aJChannel.disconnect()
; stops work (e.g.void
stopRetransmitTask()
ReliableUnicast
timeService(TimeService ts)
void
triggerXmit()
ReliableUnicast
trimCachedBatches()
java.lang.Object
up(Message msg)
A single message was received.void
up(MessageBatch batch)
Sends up a multiple messages in aMessageBatch
.protected void
update(ReliableUnicast.Entry entry, int num_received)
-
Methods inherited from class org.jgroups.stack.Protocol
accept, addPolicy, addr, addr, afterCreationHook, destroy, down, enableStats, getAddress, getComponents, getDownProtocol, getDownServices, getId, getIdsAbove, getLevel, getLog, getName, getPolicies, getProtocolStack, getSocketFactory, getThreadFactory, getTransport, getUpProtocol, getUpServices, getValue, isErgonomics, level, parse, policies, providedDownServices, providedUpServices, removePolicy, requiredDownServices, requiredUpServices, resetStatistics, setAddress, setDownProtocol, setErgonomics, setId, setPolicies, setProtocolStack, setSocketFactory, setUpProtocol, setValue, statsEnabled, toString, up
-
-
-
-
Field Detail
-
DEFAULT_FIRST_SEQNO
protected static final long DEFAULT_FIRST_SEQNO
- See Also:
- Constant Field Values
-
DEFAULT_XMIT_INTERVAL
protected static final long DEFAULT_XMIT_INTERVAL
- See Also:
- Constant Field Values
-
conn_expiry_timeout
protected long conn_expiry_timeout
-
conn_close_timeout
protected long conn_close_timeout
-
max_retransmit_time
protected long max_retransmit_time
-
xmit_interval
protected long xmit_interval
-
xmits_enabled
protected boolean xmits_enabled
-
log_not_found_msgs
protected boolean log_not_found_msgs
-
sync_min_interval
protected long sync_min_interval
-
max_xmit_req_size
protected int max_xmit_req_size
-
max_batch_size
protected int max_batch_size
-
send_atomically
protected boolean send_atomically
-
reuse_message_batches
protected boolean reuse_message_batches
-
loopback
protected boolean loopback
-
DEFAULT_INITIAL_CAPACITY
protected static final int DEFAULT_INITIAL_CAPACITY
- See Also:
- Constant Field Values
-
DEFAULT_INCREMENT
protected static final int DEFAULT_INCREMENT
- See Also:
- Constant Field Values
-
num_msgs_sent
protected final java.util.concurrent.atomic.LongAdder num_msgs_sent
-
num_msgs_received
protected final java.util.concurrent.atomic.LongAdder num_msgs_received
-
num_acks_sent
protected final java.util.concurrent.atomic.LongAdder num_acks_sent
-
num_acks_received
protected final java.util.concurrent.atomic.LongAdder num_acks_received
-
num_xmits
protected final java.util.concurrent.atomic.LongAdder num_xmits
-
xmit_reqs_received
protected final java.util.concurrent.atomic.LongAdder xmit_reqs_received
-
xmit_reqs_sent
protected final java.util.concurrent.atomic.LongAdder xmit_reqs_sent
-
xmit_rsps_sent
protected final java.util.concurrent.atomic.LongAdder xmit_rsps_sent
-
avg_delivery_batch_size
protected final AverageMinMax avg_delivery_batch_size
-
sends_can_block
protected boolean sends_can_block
-
is_trace
protected boolean is_trace
-
relay_present
protected boolean relay_present
-
send_table
protected final java.util.Map<Address,ReliableUnicast.SenderEntry> send_table
-
recv_table
protected final java.util.Map<Address,ReliableUnicast.ReceiverEntry> recv_table
-
cached_batches
protected final java.util.Map<Address,MessageBatch> cached_batches
To cache batches for sending messages up the stack (https://issues.redhat.com/browse/JGRP-2841). Note: values are all _regular_ batches; OOB batches/messages have been passed up, or were removed when draining the table in removeAndDeliver()
-
recv_table_lock
protected final java.util.concurrent.locks.ReentrantLock recv_table_lock
-
xmit_task_map
protected final java.util.Map<Address,java.lang.Long> xmit_task_map
Used by the retransmit task to keep the last retransmitted seqno per member (applicable only for received messages (ReceiverEntry)): https://issues.redhat.com/browse/JGRP-1539
-
xmit_task
protected java.util.concurrent.Future<?> xmit_task
RetransmitTask running every xmit_interval ms
-
members
protected volatile java.util.List<Address> members
-
timer
protected TimeScheduler timer
-
running
protected volatile boolean running
-
last_conn_id
protected short last_conn_id
-
cache
protected AgeOutCache<Address> cache
-
time_service
protected TimeService time_service
-
timestamper
protected final java.util.concurrent.atomic.AtomicInteger timestamper
-
last_sync_sent
protected ExpiryCache<Address> last_sync_sent
Keep track of when a SEND_FIRST_SEQNO message was sent to a given sender
-
num_loopbacks
protected final java.util.concurrent.atomic.LongAdder num_loopbacks
-
msg_cache
protected final MessageCache msg_cache
-
DUMMY_OOB_MSG
protected static final Message DUMMY_OOB_MSG
-
drop_oob_and_dont_loopback_msgs_filter
protected final java.util.function.Predicate<Message> drop_oob_and_dont_loopback_msgs_filter
-
remove_filter
protected static final java.util.function.Predicate<Message> remove_filter
-
BATCH_ACCUMULATOR
protected static final java.util.function.BiConsumer<MessageBatch,Message> BATCH_ACCUMULATOR
-
-
Method Detail
-
sendOptions
protected Buffer.Options sendOptions()
-
needToSendAck
protected abstract boolean needToSendAck(ReliableUnicast.Entry e, int num_acks)
-
getNumLoopbacks
public long getNumLoopbacks()
-
getNumSendConnections
public int getNumSendConnections()
-
getNumReceiveConnections
public int getNumReceiveConnections()
-
getNumConnections
public int getNumConnections()
-
getTimestamper
public int getTimestamper()
-
setLevel
public <T extends Protocol> T setLevel(java.lang.String level)
Description copied from class:Protocol
Sets the level of a logger. This method is used to dynamically change the logging level of a running system, e.g. via JMX. The appender of a level needs to exist.
-
getXmitInterval
public long getXmitInterval()
-
setXmitInterval
public ReliableUnicast setXmitInterval(long i)
-
isXmitsEnabled
public boolean isXmitsEnabled()
-
setXmitsEnabled
public ReliableUnicast setXmitsEnabled(boolean b)
-
getConnExpiryTimeout
public long getConnExpiryTimeout()
-
setConnExpiryTimeout
public ReliableUnicast setConnExpiryTimeout(long c)
-
getConnCloseTimeout
public long getConnCloseTimeout()
-
setConnCloseTimeout
public ReliableUnicast setConnCloseTimeout(long c)
-
logNotFoundMsgs
public boolean logNotFoundMsgs()
-
logNotFoundMsgs
public ReliableUnicast logNotFoundMsgs(boolean l)
-
getSyncMinInterval
public long getSyncMinInterval()
-
setSyncMinInterval
public ReliableUnicast setSyncMinInterval(long s)
-
getMaxXmitReqSize
public int getMaxXmitReqSize()
-
setMaxXmitReqSize
public ReliableUnicast setMaxXmitReqSize(int m)
-
reuseMessageBatches
public boolean reuseMessageBatches()
-
reuseMessageBatches
public ReliableUnicast reuseMessageBatches(boolean b)
-
sendsCanBlock
public boolean sendsCanBlock()
-
sendsCanBlock
public ReliableUnicast sendsCanBlock(boolean s)
-
sendAtomically
public boolean sendAtomically()
-
sendAtomically
public ReliableUnicast sendAtomically(boolean f)
-
loopback
public boolean loopback()
-
loopback
public ReliableUnicast loopback(boolean b)
-
timeService
public ReliableUnicast timeService(TimeService ts)
-
lastSync
public ReliableUnicast lastSync(ExpiryCache<Address> c)
-
printConnections
public java.lang.String printConnections()
-
printCachedBatches
public java.lang.String printCachedBatches()
-
clearCachedBatches
public ReliableUnicast clearCachedBatches()
-
trimCachedBatches
public ReliableUnicast trimCachedBatches()
-
getNumMessagesSent
@Deprecated public long getNumMessagesSent()
Deprecated.Don't remove! https://issues.redhat.com/browse/JGRP-2814
-
getNumMessagesReceived
@Deprecated public long getNumMessagesReceived()
Deprecated.Don't remove! https://issues.redhat.com/browse/JGRP-2814
-
getNumAcksSent
public long getNumAcksSent()
-
getNumAcksReceived
public long getNumAcksReceived()
-
getNumXmits
public long getNumXmits()
-
getMaxRetransmitTime
public long getMaxRetransmitTime()
-
setMaxRetransmitTime
public ReliableUnicast setMaxRetransmitTime(long max_retransmit_time)
-
isXmitTaskRunning
public boolean isXmitTaskRunning()
-
getAgeOutCacheSize
public int getAgeOutCacheSize()
-
printAgeOutCache
public java.lang.String printAgeOutCache()
-
getAgeOutCache
public AgeOutCache<Address> getAgeOutCache()
-
hasSendConnectionTo
public boolean hasSendConnectionTo(Address dest)
Used for testing only
-
getNumUnackedMessages
public int getNumUnackedMessages()
The number of messages in all Entry.sent_msgs tables (haven't received an ACK yet)
-
getXmitTableUndeliveredMessages
public int getXmitTableUndeliveredMessages()
-
getXmitTableMissingMessages
public int getXmitTableMissingMessages()
-
getXmitTableDeliverableMessages
public int getXmitTableDeliverableMessages()
-
printReceiveWindowMessages
public java.lang.String printReceiveWindowMessages()
-
printSendWindowMessages
public java.lang.String printSendWindowMessages()
-
resetStats
public void resetStats()
- Overrides:
resetStats
in classProtocol
-
init
public void init() throws java.lang.Exception
Description copied from class:Protocol
Called after a protocol has been created and before the protocol is started. Attributes are already set. Other protocols are not yet connected and events cannot yet be sent.
-
start
public void start() throws java.lang.Exception
Description copied from class:Protocol
This method is called on aJChannel.connect(String)
; starts work. Protocols are connected ready to receive events. Will be called from bottom to top.- Specified by:
start
in interfaceLifecycle
- Overrides:
start
in classProtocol
- Throws:
java.lang.Exception
- Thrown if protocol cannot be started successfully. This will cause the ProtocolStack to fail, soJChannel.connect(String)
will throw an exception
-
stop
public void stop()
Description copied from class:Protocol
Called on aJChannel.disconnect()
; stops work (e.g. by closing multicast socket). Will be called from top to bottom.
-
handleUpEvent
protected void handleUpEvent(Address sender, Message msg, UnicastHeader hdr)
-
up
public java.lang.Object up(Message msg)
Description copied from class:Protocol
A single message was received. Protocols may examine the message and do something (e.g. add a header) with it before passing it up.
-
up
public void up(MessageBatch batch)
Description copied from class:Protocol
Sends up a multiple messages in aMessageBatch
. The sender of the batch is always the same, and so is the destination (null == multicast messages). Messages in a batch can be OOB messages, regular messages, or mixed messages, although the transport itself will create initial MessageBatches that contain only either OOB or regular messages. The default processing below sends messages up the stack individually, based on a matching criteria (callingProtocol.accept(Message)
), and - if true - callsProtocol.up(org.jgroups.Event)
for that message and removes the message. If the batch is not empty, it is passed up, or else it is dropped. Subclasses should check if there are any messages destined for them (e.g. usingMessageBatch.iterator(Predicate)
), then possibly remove and process them and finally pass the batch up to the next protocol. Protocols can also modify messages in place, e.g. ENCRYPT could decrypt all encrypted messages in the batch, not remove them, and pass the batch up when done.
-
handleBatchFromSelf
protected void handleBatchFromSelf(MessageBatch batch, ReliableUnicast.Entry entry)
-
down
public java.lang.Object down(Event evt)
Description copied from class:Protocol
An event is to be sent down the stack. A protocol may want to examine its type and perform some action on it, depending on the event's type. If the event is a message MSG, then the protocol may need to add a header to it (or do nothing at all) before sending it down the stack usingdown_prot.down()
.
-
down
public java.lang.Object down(Message msg)
Description copied from class:Protocol
A message is sent down the stack. Protocols may examine the message and do something (e.g. add a header) with it, before passing it down.
-
isLocalSiteMaster
protected boolean isLocalSiteMaster(Address dest)
-
isLocal
protected boolean isLocal(Address addr)
-
closeConnection
public void closeConnection(Address mbr)
Removes and resets from connection table (which is already locked). Returns true if member was found, otherwise false. This method is public only so it can be invoked by unit testing, but should not be used !
-
closeSendConnection
public void closeSendConnection(Address mbr)
-
closeReceiveConnection
public void closeReceiveConnection(Address mbr)
-
removeSendConnection
public void removeSendConnection(Address mbr)
-
removeReceiveConnection
public void removeReceiveConnection(Address mbr)
-
removeAllConnections
public void removeAllConnections()
This method is public only so it can be invoked by unit testing, but should not otherwise be used !
-
retransmit
protected void retransmit(SeqnoList missing, Address sender, Address real_dest)
Sends a retransmit request to the given sender
-
retransmit
protected void retransmit(Message msg)
Called by the sender to resend messages for which no ACK has been received yet
-
expired
public void expired(Address key)
Called by AgeOutCache, to removed expired connections- Specified by:
expired
in interfaceAgeOutCache.Handler<Address>
-
handleDataReceived
protected void handleDataReceived(Address sender, long seqno, short conn_id, boolean first, Message msg)
Check whether the hashtable contains an entry e forsender
(create if not). If e.received_msgs is null andfirst
is true: create a new AckReceiverWindow(seqno) and add message. Set e.received_msgs to the new window. Else just add the message.
-
addMessage
protected void addMessage(ReliableUnicast.ReceiverEntry entry, Address sender, long seqno, Message msg)
-
addQueuedMessages
protected void addQueuedMessages(Address sender, ReliableUnicast.ReceiverEntry entry, java.util.Collection<Message> queued_msgs)
-
handleDataReceivedFromSelf
protected void handleDataReceivedFromSelf(Address sender, long seqno, Message msg)
Called when the sender of a message is the local member. In this case, we don't need to add the message to the table as the sender already did that
-
handleBatchReceived
protected void handleBatchReceived(ReliableUnicast.ReceiverEntry entry, Address sender, java.util.List<LongTuple<Message>> msgs, boolean oob, Address original_dest)
-
removeAndDeliver
protected void removeAndDeliver(ReliableUnicast.Entry entry, Address sender, AsciiString cluster, int min_size)
Try to remove as many messages as possible from the table as pass them up. Prevents concurrent passing up of messages by different threads (https://issues.redhat.com/browse/JGRP-198); lots of threads can come up to this point concurrently, but only 1 is allowed to pass at a time. We *can* deliver messages from *different* senders concurrently, e.g. reception of P1, Q1, P2, Q2 can result in delivery of P1, Q1, Q2, P2: FIFO (implemented by UNICAST) says messages need to be delivered in the order in which they were sent
-
printMessageList
protected java.lang.String printMessageList(java.util.List<LongTuple<Message>> list)
-
getReceiverEntry
protected ReliableUnicast.ReceiverEntry getReceiverEntry(Address sender, long seqno, boolean first, short conn_id, Address real_dest)
-
_getReceiverEntry
public ReliableUnicast.ReceiverEntry _getReceiverEntry(Address sender, long seqno, boolean first, short conn_id, Address real_dest)
-
compareConnIds
protected ReliableUnicast.ReceiverEntry compareConnIds(short other, short mine, boolean first, ReliableUnicast.ReceiverEntry e, Address sender, long seqno, Address real_dest)
-
getSenderEntry
protected ReliableUnicast.SenderEntry getSenderEntry(Address dst)
-
createReceiverEntry
protected ReliableUnicast.ReceiverEntry createReceiverEntry(Address sender, long seqno, short conn_id, Address dest)
-
handleAckReceived
protected void handleAckReceived(Address sender, long seqno, short conn_id, int timestamp)
Add the ACK to hashtable.sender.sent_msgs
-
handleResendingOfFirstMessage
protected void handleResendingOfFirstMessage(Address sender, int timestamp)
We need to resend the first message with our conn_id
-
send
protected boolean send(Message msg, ReliableUnicast.SenderEntry entry, boolean dont_loopback_set, boolean dont_block)
-
addToSendBuffer
protected boolean addToSendBuffer(Buffer<Message> win, long seq, Message msg, java.util.function.Predicate<Message> filter, boolean dont_block)
Adds the message to the send buffer. The loop tries to handle temporary OOMEs by retrying if add() failed.- Returns:
- True if added successfully. False if not, e.g. no space in buffer and DONT_BLOCK set, or message already present, or seqno lower than buffer.low
-
resend
protected void resend(Message msg)
-
deliverBatch
protected void deliverBatch(MessageBatch batch, ReliableUnicast.Entry entry, Address original_dest)
-
getTimestamp
protected long getTimestamp()
-
startRetransmitTask
public void startRetransmitTask()
-
stopRetransmitTask
public void stopRetransmitTask()
-
isCallerRunsHandler
protected static boolean isCallerRunsHandler(java.util.concurrent.RejectedExecutionHandler h)
-
sendAck
protected void sendAck(Address dst, ReliableUnicast.Entry entry, Address real_dest)
-
getNewConnectionId
protected short getNewConnectionId()
-
sendRequestForFirstSeqno
protected void sendRequestForFirstSeqno(Address dest, Address original_dest)
-
sendClose
public void sendClose(Address dest, short conn_id)
-
closeIdleConnections
public void closeIdleConnections()
-
removeExpiredConnections
public int removeExpiredConnections()
-
removeConnections
public int removeConnections(boolean remove_send_connections, boolean remove_receive_connections)
Removes send- and/or receive-connections whose state is not OPEN (CLOSING or CLOSED).- Parameters:
remove_send_connections
- If true, send connections whose state is !OPEN are destroyed and removedremove_receive_connections
- If true, receive connections with state !OPEN are destroyed and removed- Returns:
- The number of connections which were removed
-
triggerXmit
public void triggerXmit()
-
sendPendingAcks
public void sendPendingAcks()
-
update
protected void update(ReliableUnicast.Entry entry, int num_received)
-
compare
protected static int compare(int ts1, int ts2)
Compares 2 timestamps, handles numeric overflow
-
accumulate
@SafeVarargs protected static int accumulate(java.util.function.ToIntFunction<Buffer<Message>> func, java.util.Collection<? extends ReliableUnicast.Entry>... entries)
-
max
protected static short max(short a, short b)
-
-