Package org.jgroups.protocols.pbcast
Class StreamingStateTransfer
- java.lang.Object
-
- org.jgroups.stack.Protocol
-
- org.jgroups.protocols.pbcast.StreamingStateTransfer
-
- All Implemented Interfaces:
Lifecycle
,ProcessingQueue.Handler<Address>
- Direct Known Subclasses:
STATE
,STATE_SOCK
public abstract class StreamingStateTransfer extends Protocol implements ProcessingQueue.Handler<Address>
Base class for state transfer protocols which use streaming (or chunking) to transfer state between two members. The major advantage of this approach is that transferring application state to a joining member of a group does not entail loading of the complete application state into memory. The application state, for example, might be located entirely on some form of disk based storage. The defaultSTATE_TRANSFER
protocol requires this state to be loaded entirely into memory before being transferred to a group member while the streaming state transfer protocols do not. Thus the streaming state transfer protocols are able to transfer application state that is very large (>1Gb) without a likelihood of the such transfer resulting in OutOfMemoryException. Note that prior to 3.0, there was only 1 streaming protocol: STATE. In 3.0 the functionality was split between STATE and STATE_SOCK, and common functionality moved up into StreamingStateTransfer.- Since:
- 3.0
- Author:
- Bela Ban, Vladimir Blagojevic
- See Also:
STATE_TRANSFER
,STATE
,STATE_SOCK
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description protected class
StreamingStateTransfer.StateGetter
Thread which invokesReceiver.getState(java.io.OutputStream)
in the applicationstatic class
StreamingStateTransfer.StateHeader
-
Field Summary
Fields Modifier and Type Field Description protected double
avg_state_size
protected int
buffer_size
protected int
max_pool
protected java.util.List<Address>
members
protected java.util.concurrent.atomic.LongAdder
num_bytes_sent
protected java.util.concurrent.atomic.LongAdder
num_state_reqs
protected long
pool_thread_keep_alive
protected Address
state_provider
protected ProcessingQueue<Address>
state_requesters
List of members requesting state.protected java.util.concurrent.ThreadPoolExecutor
thread_pool
Thread pool (configured withmax_pool
andpool_thread_keep_alive
) to runStreamingStateTransfer.StateGetter
threads on-
Fields inherited from class org.jgroups.stack.Protocol
after_creation_hook, down_prot, ergonomics, id, local_addr, log, policies, stack, stats, up_prot
-
-
Constructor Summary
Constructors Constructor Description StreamingStateTransfer()
-
Method Summary
All Methods Instance Methods Abstract Methods Concrete Methods Modifier and Type Method Description protected void
close(java.lang.Object resource)
void
closeBarrierAndSuspendStable()
protected void
closeHoleFor(Address member)
protected abstract Tuple<java.io.InputStream,java.lang.Object>
createStreamToProvider(Address provider, StreamingStateTransfer.StateHeader hdr)
Creates an InputStream to the state provider to read the state.protected void
createStreamToRequester(Address requester)
Creates an OutputStream to the state requester to write the stateprotected java.util.concurrent.ThreadPoolExecutor
createThreadPool()
void
destroy()
This method is called on aJChannel.close()
.protected Address
determineCoordinator()
java.lang.Object
down(Event evt)
An event is to be sent down the stack.double
getAverageStateSize()
long
getNumberOfStateBytesSent()
long
getNumberOfStateRequests()
protected void
getStateFromApplication(Address requester, java.io.OutputStream out, boolean use_separate_thread)
long
getThreadPoolCompletedTasks()
int
getThreadPoolSize()
void
handle(Address state_requester)
protected java.lang.Object
handle(StreamingStateTransfer.StateHeader hdr, Message msg)
protected void
handleConfig(java.util.Map<java.lang.String,java.lang.Object> config)
protected void
handleEOF(Address sender)
protected void
handleException(java.lang.Throwable exception)
protected void
handleStateChunk(Address sender, byte[] buffer, int offset, int length)
protected void
handleStateReq(Address requester)
protected void
handleStateRsp(Address provider, StreamingStateTransfer.StateHeader hdr)
protected void
handleViewChange(View v)
void
init()
Called after a protocol has been created and before the protocol is started.protected void
modifyStateResponseHeader(StreamingStateTransfer.StateHeader hdr)
protected void
openBarrier()
void
openBarrierAndResumeStable()
protected void
punchHoleFor(Address member)
java.util.List<java.lang.Integer>
requiredDownServices()
List of events that are required to be answered by some layer belowvoid
resetStats()
protected void
resumeStable()
protected void
sendEof(Address requester)
protected void
sendException(Address requester, java.lang.Throwable exception)
protected void
setStateInApplication(java.io.InputStream in, java.lang.Object resource, Address provider)
void
start()
This method is called on aJChannel.connect(String)
; starts work.void
stop()
Called on aJChannel.disconnect()
; stops work (e.g.java.lang.Object
up(Event evt)
An event was received from the protocol below.java.lang.Object
up(Message msg)
A single message was received.void
up(MessageBatch batch)
Sends up a multiple messages in aMessageBatch
.protected boolean
useAsyncStateDelivery()
-
Methods inherited from class org.jgroups.stack.Protocol
accept, addPolicy, addr, addr, afterCreationHook, down, down, enableStats, getAddress, getComponents, getDownProtocol, getDownServices, getId, getIdsAbove, getLevel, getLog, getName, getPolicies, getProtocolStack, getSocketFactory, getThreadFactory, getTransport, getUpProtocol, getUpServices, getValue, isErgonomics, level, parse, policies, providedDownServices, providedUpServices, removePolicy, requiredUpServices, resetStatistics, setAddress, setDownProtocol, setErgonomics, setId, setLevel, setPolicies, setProtocolStack, setSocketFactory, setUpProtocol, setValue, statsEnabled, toString
-
-
-
-
Field Detail
-
buffer_size
protected int buffer_size
-
max_pool
protected int max_pool
-
pool_thread_keep_alive
protected long pool_thread_keep_alive
-
num_state_reqs
protected final java.util.concurrent.atomic.LongAdder num_state_reqs
-
num_bytes_sent
protected final java.util.concurrent.atomic.LongAdder num_bytes_sent
-
avg_state_size
protected double avg_state_size
-
state_provider
protected volatile Address state_provider
-
members
protected final java.util.List<Address> members
-
thread_pool
protected java.util.concurrent.ThreadPoolExecutor thread_pool
Thread pool (configured withmax_pool
andpool_thread_keep_alive
) to runStreamingStateTransfer.StateGetter
threads on
-
state_requesters
protected final ProcessingQueue<Address> state_requesters
List of members requesting state. Only a single state request is handled at any time
-
-
Method Detail
-
getNumberOfStateRequests
public long getNumberOfStateRequests()
-
getNumberOfStateBytesSent
public long getNumberOfStateBytesSent()
-
getAverageStateSize
public double getAverageStateSize()
-
getThreadPoolSize
public int getThreadPoolSize()
-
getThreadPoolCompletedTasks
public long getThreadPoolCompletedTasks()
-
requiredDownServices
public java.util.List<java.lang.Integer> requiredDownServices()
Description copied from class:Protocol
List of events that are required to be answered by some layer below- Overrides:
requiredDownServices
in classProtocol
-
resetStats
public void resetStats()
- Overrides:
resetStats
in classProtocol
-
init
public void init() throws java.lang.Exception
Description copied from class:Protocol
Called after a protocol has been created and before the protocol is started. Attributes are already set. Other protocols are not yet connected and events cannot yet be sent.
-
destroy
public void destroy()
Description copied from class:Protocol
This method is called on aJChannel.close()
. Does some cleanup; after the call, the VM will terminate
-
start
public void start() throws java.lang.Exception
Description copied from class:Protocol
This method is called on aJChannel.connect(String)
; starts work. Protocols are connected ready to receive events. Will be called from bottom to top.- Specified by:
start
in interfaceLifecycle
- Overrides:
start
in classProtocol
- Throws:
java.lang.Exception
- Thrown if protocol cannot be started successfully. This will cause the ProtocolStack to fail, soJChannel.connect(String)
will throw an exception
-
stop
public void stop()
Description copied from class:Protocol
Called on aJChannel.disconnect()
; stops work (e.g. by closing multicast socket). Will be called from top to bottom.
-
down
public java.lang.Object down(Event evt)
Description copied from class:Protocol
An event is to be sent down the stack. A protocol may want to examine its type and perform some action on it, depending on the event's type. If the event is a message MSG, then the protocol may need to add a header to it (or do nothing at all) before sending it down the stack usingdown_prot.down()
.
-
up
public java.lang.Object up(Event evt)
Description copied from class:Protocol
An event was received from the protocol below. Usually the current protocol will want to examine the event type and - depending on its type - perform some computation (e.g. removing headers from a MSG event type, or updating the internal membership list when receiving a VIEW_CHANGE event). Finally, the event is either a) discarded, or b) an event is sent down the stack usingdown_prot.down()
or c) the event (or another event) is sent up the stack usingup_prot.up()
.
-
up
public java.lang.Object up(Message msg)
Description copied from class:Protocol
A single message was received. Protocols may examine the message and do something (e.g. add a header) with it before passing it up.
-
up
public void up(MessageBatch batch)
Description copied from class:Protocol
Sends up a multiple messages in aMessageBatch
. The sender of the batch is always the same, and so is the destination (null == multicast messages). Messages in a batch can be OOB messages, regular messages, or mixed messages, although the transport itself will create initial MessageBatches that contain only either OOB or regular messages. The default processing below sends messages up the stack individually, based on a matching criteria (callingProtocol.accept(Message)
), and - if true - callsProtocol.up(org.jgroups.Event)
for that message and removes the message. If the batch is not empty, it is passed up, or else it is dropped. Subclasses should check if there are any messages destined for them (e.g. usingMessageBatch.iterator(Predicate)
), then possibly remove and process them and finally pass the batch up to the next protocol. Protocols can also modify messages in place, e.g. ENCRYPT could decrypt all encrypted messages in the batch, not remove them, and pass the batch up when done.
-
handle
protected java.lang.Object handle(StreamingStateTransfer.StateHeader hdr, Message msg)
-
handleConfig
protected void handleConfig(java.util.Map<java.lang.String,java.lang.Object> config)
-
handleStateChunk
protected void handleStateChunk(Address sender, byte[] buffer, int offset, int length)
-
handleEOF
protected void handleEOF(Address sender)
-
handleException
protected void handleException(java.lang.Throwable exception)
-
getStateFromApplication
protected void getStateFromApplication(Address requester, java.io.OutputStream out, boolean use_separate_thread)
-
setStateInApplication
protected void setStateInApplication(java.io.InputStream in, java.lang.Object resource, Address provider)
-
closeBarrierAndSuspendStable
public void closeBarrierAndSuspendStable()
-
openBarrierAndResumeStable
public void openBarrierAndResumeStable()
-
openBarrier
protected void openBarrier()
-
resumeStable
protected void resumeStable()
-
sendEof
protected void sendEof(Address requester)
-
sendException
protected void sendException(Address requester, java.lang.Throwable exception)
-
createThreadPool
protected java.util.concurrent.ThreadPoolExecutor createThreadPool()
-
determineCoordinator
protected Address determineCoordinator()
-
handleViewChange
protected void handleViewChange(View v)
-
handle
public void handle(Address state_requester) throws java.lang.Exception
- Specified by:
handle
in interfaceProcessingQueue.Handler<Address>
- Throws:
java.lang.Exception
-
handleStateReq
protected void handleStateReq(Address requester) throws java.lang.Exception
- Throws:
java.lang.Exception
-
createStreamToRequester
protected void createStreamToRequester(Address requester)
Creates an OutputStream to the state requester to write the state
-
createStreamToProvider
protected abstract Tuple<java.io.InputStream,java.lang.Object> createStreamToProvider(Address provider, StreamingStateTransfer.StateHeader hdr) throws java.lang.Exception
Creates an InputStream to the state provider to read the state. Return the input stream and a handback object as a tuple. The handback object is handed back to the subclass when done, or in case of an error (e.g. to clean up resources)- Throws:
java.lang.Exception
-
close
protected void close(java.lang.Object resource)
-
useAsyncStateDelivery
protected boolean useAsyncStateDelivery()
-
modifyStateResponseHeader
protected void modifyStateResponseHeader(StreamingStateTransfer.StateHeader hdr)
-
handleStateRsp
protected void handleStateRsp(Address provider, StreamingStateTransfer.StateHeader hdr)
-
punchHoleFor
protected void punchHoleFor(Address member)
-
closeHoleFor
protected void closeHoleFor(Address member)
-
-