Package org.jgroups.protocols.pbcast
Class STABLE
- java.lang.Object
-
- org.jgroups.stack.Protocol
-
- org.jgroups.protocols.pbcast.STABLE
-
- All Implemented Interfaces:
Lifecycle
public class STABLE extends Protocol
Computes the broadcast messages that are stable; i.e., have been delivered by all members. Sends STABLE events down the stack when this is the case. This allows NAKACK{2,3} to garbage collect messages that have been seen by all members.Works as follows: periodically (desired_avg_gossip) or when having received a number of bytes (max_bytes), every member sends its digest (highest seqno delivered, received) to the current coordinator
The coordinator updates a stability vector, which maintains the highest seqno delivered/receive for each member and initially contains no data, when such a message is received.
When messages from all members have been received, a stability message is mcast, which causes all members to send a STABLE event down the stack (triggering garbage collection in the NAKACK{2,3} layer).- Author:
- Bela Ban
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description protected class
STABLE.ResumeTask
static class
STABLE.StableHeader
protected class
STABLE.StableTask
Mcast periodic STABLE message.
-
Field Summary
Fields Modifier and Type Field Description protected Address
coordinator
protected long
desired_avg_gossip
Sends a STABLE gossip every 20 seconds on average.protected MutableDigest
digest
protected boolean
initialized
protected java.util.concurrent.locks.Lock
lock
protected long
max_bytes
Total amount of bytes from incoming messages (default = 0 = disabled).protected static long
MAX_SUSPEND_TIME
protected long
num_bytes_received
The total number of bytes received from unicast and multicast messagesprotected java.util.concurrent.atomic.LongAdder
num_stability_msgs_received
protected java.util.concurrent.atomic.LongAdder
num_stability_msgs_sent
protected java.util.concurrent.atomic.LongAdder
num_stable_msgs_received
protected java.util.concurrent.atomic.LongAdder
num_stable_msgs_sent
protected java.util.concurrent.locks.Lock
received
protected java.util.concurrent.Future<?>
resume_task_future
protected java.lang.Object
resume_task_mutex
protected java.util.concurrent.Future<?>
stable_task_future
protected java.util.concurrent.locks.Lock
stable_task_lock
protected boolean
suspended
When true, don't take part in garbage collection: neither send STABLE messages nor handle STABILITY messagesprotected TimeScheduler
timer
protected View
view
protected FixedSizeBitSet
votes
Keeps track of who we already heard from (STABLE_GOSSIP msgs).-
Fields inherited from class org.jgroups.stack.Protocol
after_creation_hook, down_prot, ergonomics, id, local_addr, log, policies, stack, stats, up_prot
-
-
Constructor Summary
Constructors Constructor Description STABLE()
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Modifier and Type Method Description protected boolean
addVote(int rank)
Adds mbr to votes and returns true if we have all the votes, otherwise false.protected static boolean
allVotesReceived(FixedSizeBitSet votes)
Votes is already locked and guaranteed to be non-nulljava.lang.Object
down(Event evt)
An event is to be sent down the stack.java.lang.Object
down(Message msg)
A message is sent down the stack.void
gc()
long
getBytes()
long
getDesiredAverageGossip()
protected Digest
getDigest()
long
getMaxBytes()
int
getNumVotes()
protected static int
getRank(Address member, View v)
long
getStabilityReceived()
long
getStabilitySent()
long
getStableReceived()
long
getStableSent()
boolean
getStableTaskRunning()
protected java.lang.Object
handle(STABLE.StableHeader hdr, Address sender, Digest digest)
protected void
handleRegularMessage(Message msg)
protected void
handleStabilityMessage(Digest stable_digest, Address sender, ViewId view_id)
protected void
handleStableMessage(Digest d, Address sender, ViewId view_id)
Digest d contains (a) the highest seqnos deliverable for each sender and (b) the highest seqnos seen for each member.protected void
handleViewChange(View v)
void
init()
Called after a protocol has been created and before the protocol is started.protected boolean
maxBytesExceeded(int len)
java.lang.String
printDigest()
protected java.lang.String
printDigest(Digest digest)
java.lang.String
printVotes()
java.util.List<java.lang.Integer>
requiredDownServices()
List of events that are required to be answered by some layer belowprotected void
resetDigest()
protected void
resetNumBytes()
void
resetStats()
protected void
resume()
protected void
sendStabilityMessage(Digest d, ViewId view_id)
Sends a stability message to all members except self.protected void
sendStableMessage(boolean send_in_background)
Broadcasts a STABLE message of the current digest to all members (or the coordinator only).STABLE
setDesiredAverageGossip(long g)
STABLE
setMaxBytes(long m)
void
start()
This method is called on aJChannel.connect(String)
; starts work.protected void
startResumeTask(long max_suspend_time)
protected void
startStableTask()
void
stop()
Called on aJChannel.disconnect()
; stops work (e.g.protected void
stopResumeTask()
protected void
stopStableTask()
protected void
suspend(long timeout)
java.lang.Object
up(Event evt)
An event was received from the protocol below.java.lang.Object
up(Message msg)
A single message was received.void
up(MessageBatch batch)
Sends up a multiple messages in aMessageBatch
.protected void
updateLocalDigest(Digest d, Address sender)
Update my own digest from a digest received by somebody else.-
Methods inherited from class org.jgroups.stack.Protocol
accept, addPolicy, addr, addr, afterCreationHook, destroy, down, enableStats, getAddress, getComponents, getDownProtocol, getDownServices, getId, getIdsAbove, getLevel, getLog, getName, getPolicies, getProtocolStack, getSocketFactory, getThreadFactory, getTransport, getUpProtocol, getUpServices, getValue, isErgonomics, level, parse, policies, providedDownServices, providedUpServices, removePolicy, requiredUpServices, resetStatistics, setAddress, setDownProtocol, setErgonomics, setId, setLevel, setPolicies, setProtocolStack, setSocketFactory, setUpProtocol, setValue, statsEnabled, toString
-
-
-
-
Field Detail
-
MAX_SUSPEND_TIME
protected static final long MAX_SUSPEND_TIME
- See Also:
- Constant Field Values
-
desired_avg_gossip
protected long desired_avg_gossip
Sends a STABLE gossip every 20 seconds on average. 0 disables gossiping of STABLE messages
-
max_bytes
protected long max_bytes
Total amount of bytes from incoming messages (default = 0 = disabled). When exceeded, a STABLE message will be broadcast andnum_bytes_received
reset to 0 . If this is > 0, then ideallystability_delay
should be set to a low number as well
-
num_stable_msgs_sent
protected final java.util.concurrent.atomic.LongAdder num_stable_msgs_sent
-
num_stable_msgs_received
protected final java.util.concurrent.atomic.LongAdder num_stable_msgs_received
-
num_stability_msgs_sent
protected final java.util.concurrent.atomic.LongAdder num_stability_msgs_sent
-
num_stability_msgs_received
protected final java.util.concurrent.atomic.LongAdder num_stability_msgs_received
-
view
protected volatile View view
-
digest
protected volatile MutableDigest digest
-
votes
protected FixedSizeBitSet votes
Keeps track of who we already heard from (STABLE_GOSSIP msgs). This is all 0's, and we set the sender when a STABLE message is received. When the bitset is all 1's (responses from all members), we send a STABILITY message
-
lock
protected final java.util.concurrent.locks.Lock lock
-
stable_task_future
protected java.util.concurrent.Future<?> stable_task_future
-
stable_task_lock
protected final java.util.concurrent.locks.Lock stable_task_lock
-
timer
protected TimeScheduler timer
-
num_bytes_received
protected long num_bytes_received
The total number of bytes received from unicast and multicast messages
-
received
protected final java.util.concurrent.locks.Lock received
-
suspended
protected volatile boolean suspended
When true, don't take part in garbage collection: neither send STABLE messages nor handle STABILITY messages
-
initialized
protected boolean initialized
-
resume_task_future
protected java.util.concurrent.Future<?> resume_task_future
-
resume_task_mutex
protected final java.lang.Object resume_task_mutex
-
coordinator
protected volatile Address coordinator
-
-
Method Detail
-
getDesiredAverageGossip
public long getDesiredAverageGossip()
-
setDesiredAverageGossip
public STABLE setDesiredAverageGossip(long g)
-
getMaxBytes
public long getMaxBytes()
-
setMaxBytes
public STABLE setMaxBytes(long m)
-
getBytes
public long getBytes()
-
getNumVotes
public int getNumVotes()
-
getStableReceived
public long getStableReceived()
-
getStableSent
public long getStableSent()
-
getStabilityReceived
public long getStabilityReceived()
-
getStabilitySent
public long getStabilitySent()
-
getStableTaskRunning
public boolean getStableTaskRunning()
-
gc
public void gc()
-
printDigest
public java.lang.String printDigest()
-
printVotes
public java.lang.String printVotes()
-
resetStats
public void resetStats()
- Overrides:
resetStats
in classProtocol
-
requiredDownServices
public java.util.List<java.lang.Integer> requiredDownServices()
Description copied from class:Protocol
List of events that are required to be answered by some layer below- Overrides:
requiredDownServices
in classProtocol
-
suspend
protected void suspend(long timeout)
-
resume
protected void resume()
-
init
public void init() throws java.lang.Exception
Description copied from class:Protocol
Called after a protocol has been created and before the protocol is started. Attributes are already set. Other protocols are not yet connected and events cannot yet be sent.
-
start
public void start() throws java.lang.Exception
Description copied from class:Protocol
This method is called on aJChannel.connect(String)
; starts work. Protocols are connected ready to receive events. Will be called from bottom to top.- Specified by:
start
in interfaceLifecycle
- Overrides:
start
in classProtocol
- Throws:
java.lang.Exception
- Thrown if protocol cannot be started successfully. This will cause the ProtocolStack to fail, soJChannel.connect(String)
will throw an exception
-
stop
public void stop()
Description copied from class:Protocol
Called on aJChannel.disconnect()
; stops work (e.g. by closing multicast socket). Will be called from top to bottom.
-
up
public java.lang.Object up(Event evt)
Description copied from class:Protocol
An event was received from the protocol below. Usually the current protocol will want to examine the event type and - depending on its type - perform some computation (e.g. removing headers from a MSG event type, or updating the internal membership list when receiving a VIEW_CHANGE event). Finally, the event is either a) discarded, or b) an event is sent down the stack usingdown_prot.down()
or c) the event (or another event) is sent up the stack usingup_prot.up()
.
-
up
public java.lang.Object up(Message msg)
Description copied from class:Protocol
A single message was received. Protocols may examine the message and do something (e.g. add a header) with it before passing it up.
-
up
public void up(MessageBatch batch)
Description copied from class:Protocol
Sends up a multiple messages in aMessageBatch
. The sender of the batch is always the same, and so is the destination (null == multicast messages). Messages in a batch can be OOB messages, regular messages, or mixed messages, although the transport itself will create initial MessageBatches that contain only either OOB or regular messages. The default processing below sends messages up the stack individually, based on a matching criteria (callingProtocol.accept(Message)
), and - if true - callsProtocol.up(org.jgroups.Event)
for that message and removes the message. If the batch is not empty, it is passed up, or else it is dropped. Subclasses should check if there are any messages destined for them (e.g. usingMessageBatch.iterator(Predicate)
), then possibly remove and process them and finally pass the batch up to the next protocol. Protocols can also modify messages in place, e.g. ENCRYPT could decrypt all encrypted messages in the batch, not remove them, and pass the batch up when done.
-
down
public java.lang.Object down(Message msg)
Description copied from class:Protocol
A message is sent down the stack. Protocols may examine the message and do something (e.g. add a header) with it, before passing it down.
-
down
public java.lang.Object down(Event evt)
Description copied from class:Protocol
An event is to be sent down the stack. A protocol may want to examine its type and perform some action on it, depending on the event's type. If the event is a message MSG, then the protocol may need to add a header to it (or do nothing at all) before sending it down the stack usingdown_prot.down()
.
-
handle
protected java.lang.Object handle(STABLE.StableHeader hdr, Address sender, Digest digest)
-
handleRegularMessage
protected void handleRegularMessage(Message msg)
-
maxBytesExceeded
protected boolean maxBytesExceeded(int len)
-
handleViewChange
protected void handleViewChange(View v)
-
updateLocalDigest
protected void updateLocalDigest(Digest d, Address sender)
Update my own digest from a digest received by somebody else. Returns whether the update was successful. Needs to be called with a lock on digest
-
resetDigest
protected void resetDigest()
-
addVote
protected boolean addVote(int rank)
Adds mbr to votes and returns true if we have all the votes, otherwise false.- Parameters:
rank
-
-
allVotesReceived
protected static boolean allVotesReceived(FixedSizeBitSet votes)
Votes is already locked and guaranteed to be non-null
-
startStableTask
protected void startStableTask()
-
stopStableTask
protected void stopStableTask()
-
startResumeTask
protected void startResumeTask(long max_suspend_time)
-
stopResumeTask
protected void stopResumeTask()
-
handleStableMessage
protected void handleStableMessage(Digest d, Address sender, ViewId view_id)
Digest d contains (a) the highest seqnos deliverable for each sender and (b) the highest seqnos seen for each member. (Difference: with 1,2,4,5, the highest seqno seen is 5, whereas the highest seqno deliverable is 2). The minimum of all highest seqnos deliverable will be taken to send a stability message, which results in garbage collection of messages lower than the ones in the stability vector. The maximum of all seqnos will be taken to trigger possible retransmission of last missing seqno (see DESIGN for details).
-
resetNumBytes
protected void resetNumBytes()
-
handleStabilityMessage
protected void handleStabilityMessage(Digest stable_digest, Address sender, ViewId view_id)
-
sendStableMessage
protected void sendStableMessage(boolean send_in_background)
Broadcasts a STABLE message of the current digest to all members (or the coordinator only). The message contains the highest seqno delivered and received for all members. The seqnos are retrieved from the NAKACK layer below.
-
sendStabilityMessage
protected void sendStabilityMessage(Digest d, ViewId view_id)
Sends a stability message to all members except self.- Parameters:
d
- A copy of the stability digest, so we don't need to copy it again
-
getDigest
protected Digest getDigest()
-
printDigest
protected java.lang.String printDigest(Digest digest)
-
-