Package org.jgroups.protocols.pbcast
Class StreamingStateTransfer
- java.lang.Object
-
- org.jgroups.stack.Protocol
-
- org.jgroups.protocols.pbcast.StreamingStateTransfer
-
- All Implemented Interfaces:
ProcessingQueue.Handler<Address>
- Direct Known Subclasses:
STATE
,STATE_SOCK
public abstract class StreamingStateTransfer extends Protocol implements ProcessingQueue.Handler<Address>
Base class for state transfer protocols which use streaming (or chunking) to transfer state between two members. The major advantage of this approach is that transferring application state to a joining member of a group does not entail loading of the complete application state into memory. The application state, for example, might be located entirely on some form of disk based storage. The defaultSTATE_TRANSFER
protocol requires this state to be loaded entirely into memory before being transferred to a group member while the streaming state transfer protocols do not. Thus the streaming state transfer protocols are able to transfer application state that is very large (>1Gb) without a likelihood of the such transfer resulting in OutOfMemoryException. Note that prior to 3.0, there was only 1 streaming protocol: STATE. In 3.0 the functionality was split between STATE and STATE_SOCK, and common functionality moved up into StreamingStateTransfer.- Since:
- 3.0
- Author:
- Bela Ban, Vladimir Blagojevic
- See Also:
STATE_TRANSFER
,STATE
,STATE_SOCK
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description protected class
StreamingStateTransfer.StateGetter
Thread which invokesStateListener.getState(java.io.OutputStream)
in the applicationstatic class
StreamingStateTransfer.StateHeader
-
Field Summary
Fields Modifier and Type Field Description protected double
avg_state_size
protected int
buffer_size
protected boolean
flushProtocolInStack
protected Address
local_addr
protected int
max_pool
protected java.util.List<Address>
members
protected java.util.concurrent.atomic.LongAdder
num_bytes_sent
protected java.util.concurrent.atomic.LongAdder
num_state_reqs
protected long
pool_thread_keep_alive
protected Address
state_provider
protected ProcessingQueue<Address>
state_requesters
List of members requesting state.protected java.util.concurrent.ThreadPoolExecutor
thread_pool
Thread pool (configured withmax_pool
andpool_thread_keep_alive
) to runStreamingStateTransfer.StateGetter
threads on-
Fields inherited from class org.jgroups.stack.Protocol
after_creation_hook, down_prot, ergonomics, id, log, stack, stats, up_prot
-
-
Constructor Summary
Constructors Constructor Description StreamingStateTransfer()
-
Method Summary
All Methods Instance Methods Abstract Methods Concrete Methods Modifier and Type Method Description protected void
close(java.lang.Object resource)
void
closeBarrierAndSuspendStable()
protected void
closeHoleFor(Address member)
protected abstract Tuple<java.io.InputStream,java.lang.Object>
createStreamToProvider(Address provider, StreamingStateTransfer.StateHeader hdr)
Creates an InputStream to the state provider to read the state.protected void
createStreamToRequester(Address requester)
Creates an OutputStream to the state requester to write the stateprotected java.util.concurrent.ThreadPoolExecutor
createThreadPool()
void
destroy()
This method is called on aJChannel.close()
.protected Address
determineCoordinator()
java.lang.Object
down(Event evt)
An event is to be sent down the stack.double
getAverageStateSize()
long
getNumberOfStateBytesSent()
long
getNumberOfStateRequests()
protected void
getStateFromApplication(Address requester, java.io.OutputStream out, boolean use_separate_thread)
long
getThreadPoolCompletedTasks()
int
getThreadPoolSize()
void
handle(Address state_requester)
protected void
handleConfig(java.util.Map<java.lang.String,java.lang.Object> config)
protected void
handleEOF(Address sender)
protected void
handleException(java.lang.Throwable exception)
protected void
handleStateChunk(Address sender, byte[] buffer, int offset, int length)
protected void
handleStateReq(Address requester)
protected void
handleStateRsp(Address provider, StreamingStateTransfer.StateHeader hdr)
protected void
handleViewChange(View v)
void
init()
Called after instance has been created (null constructor) and before protocol is started.protected boolean
isDigestNeeded()
When FLUSH is used we do not need to pass digests between members (see JGroups/doc/design/FLUSH.txt)protected void
modifyStateResponseHeader(StreamingStateTransfer.StateHeader hdr)
protected void
openBarrier()
void
openBarrierAndResumeStable()
protected void
punchHoleFor(Address member)
java.util.List<java.lang.Integer>
requiredDownServices()
List of events that are required to be answered by some layer belowvoid
resetStats()
protected void
resumeStable()
protected void
sendEof(Address requester)
protected void
sendException(Address requester, java.lang.Throwable exception)
protected void
setStateInApplication(java.io.InputStream in, java.lang.Object resource, Address provider)
void
start()
This method is called on aJChannel.connect(String)
.void
stop()
This method is called on aJChannel.disconnect()
.java.lang.Object
up(Event evt)
An event was received from the protocol below.java.lang.Object
up(Message msg)
A single message was received.protected boolean
useAsyncStateDelivery()
-
Methods inherited from class org.jgroups.stack.Protocol
accept, afterCreationHook, down, enableStats, getConfigurableObjects, getDownProtocol, getDownServices, getId, getIdsAbove, getLevel, getLog, getName, getProtocolStack, getSocketFactory, getThreadFactory, getTransport, getUpProtocol, getUpServices, getValue, isErgonomics, level, parse, providedDownServices, providedUpServices, requiredUpServices, resetStatistics, setDownProtocol, setErgonomics, setId, setLevel, setProtocolStack, setSocketFactory, setUpProtocol, setValue, statsEnabled, up
-
-
-
-
Field Detail
-
buffer_size
protected int buffer_size
-
max_pool
protected int max_pool
-
pool_thread_keep_alive
protected long pool_thread_keep_alive
-
num_state_reqs
protected final java.util.concurrent.atomic.LongAdder num_state_reqs
-
num_bytes_sent
protected final java.util.concurrent.atomic.LongAdder num_bytes_sent
-
avg_state_size
protected double avg_state_size
-
local_addr
protected Address local_addr
-
state_provider
protected volatile Address state_provider
-
members
protected final java.util.List<Address> members
-
flushProtocolInStack
protected volatile boolean flushProtocolInStack
-
thread_pool
protected java.util.concurrent.ThreadPoolExecutor thread_pool
Thread pool (configured withmax_pool
andpool_thread_keep_alive
) to runStreamingStateTransfer.StateGetter
threads on
-
state_requesters
protected final ProcessingQueue<Address> state_requesters
List of members requesting state. Only a single state request is handled at any time
-
-
Method Detail
-
getNumberOfStateRequests
public long getNumberOfStateRequests()
-
getNumberOfStateBytesSent
public long getNumberOfStateBytesSent()
-
getAverageStateSize
public double getAverageStateSize()
-
getThreadPoolSize
public int getThreadPoolSize()
-
getThreadPoolCompletedTasks
public long getThreadPoolCompletedTasks()
-
requiredDownServices
public java.util.List<java.lang.Integer> requiredDownServices()
Description copied from class:Protocol
List of events that are required to be answered by some layer below- Overrides:
requiredDownServices
in classProtocol
-
resetStats
public void resetStats()
- Overrides:
resetStats
in classProtocol
-
init
public void init() throws java.lang.Exception
Description copied from class:Protocol
Called after instance has been created (null constructor) and before protocol is started. Properties are already set. Other protocols are not yet connected and events cannot yet be sent.
-
destroy
public void destroy()
Description copied from class:Protocol
This method is called on aJChannel.close()
. Does some cleanup; after the call the VM will terminate
-
start
public void start() throws java.lang.Exception
Description copied from class:Protocol
This method is called on aJChannel.connect(String)
. Starts work. Protocols are connected and queues are ready to receive events. Will be called from bottom to top. This call will replace the START and START_OK events.- Overrides:
start
in classProtocol
- Throws:
java.lang.Exception
- Thrown if protocol cannot be started successfully. This will cause the ProtocolStack to fail, soJChannel.connect(String)
will throw an exception
-
stop
public void stop()
Description copied from class:Protocol
This method is called on aJChannel.disconnect()
. Stops work (e.g. by closing multicast socket). Will be called from top to bottom. This means that at the time of the method invocation the neighbor protocol below is still working. This method will replace the STOP, STOP_OK, CLEANUP and CLEANUP_OK events. The ProtocolStack guarantees that when this method is called all messages in the down queue will have been flushed
-
down
public java.lang.Object down(Event evt)
Description copied from class:Protocol
An event is to be sent down the stack. A protocol may want to examine its type and perform some action on it, depending on the event's type. If the event is a message MSG, then the protocol may need to add a header to it (or do nothing at all) before sending it down the stack usingdown_prot.down()
.
-
up
public java.lang.Object up(Event evt)
Description copied from class:Protocol
An event was received from the protocol below. Usually the current protocol will want to examine the event type and - depending on its type - perform some computation (e.g. removing headers from a MSG event type, or updating the internal membership list when receiving a VIEW_CHANGE event). Finally the event is either a) discarded, or b) an event is sent down the stack usingdown_prot.down()
or c) the event (or another event) is sent up the stack usingup_prot.up()
.
-
up
public java.lang.Object up(Message msg)
Description copied from class:Protocol
A single message was received. Protocols may examine the message and do something (e.g. add a header) with it before passing it up.
-
isDigestNeeded
protected boolean isDigestNeeded()
When FLUSH is used we do not need to pass digests between members (see JGroups/doc/design/FLUSH.txt)- Returns:
- true if use of digests is required, false otherwise
-
handleConfig
protected void handleConfig(java.util.Map<java.lang.String,java.lang.Object> config)
-
handleStateChunk
protected void handleStateChunk(Address sender, byte[] buffer, int offset, int length)
-
handleEOF
protected void handleEOF(Address sender)
-
handleException
protected void handleException(java.lang.Throwable exception)
-
getStateFromApplication
protected void getStateFromApplication(Address requester, java.io.OutputStream out, boolean use_separate_thread)
-
setStateInApplication
protected void setStateInApplication(java.io.InputStream in, java.lang.Object resource, Address provider)
-
closeBarrierAndSuspendStable
public void closeBarrierAndSuspendStable()
-
openBarrierAndResumeStable
public void openBarrierAndResumeStable()
-
openBarrier
protected void openBarrier()
-
resumeStable
protected void resumeStable()
-
sendEof
protected void sendEof(Address requester)
-
sendException
protected void sendException(Address requester, java.lang.Throwable exception)
-
createThreadPool
protected java.util.concurrent.ThreadPoolExecutor createThreadPool()
-
determineCoordinator
protected Address determineCoordinator()
-
handleViewChange
protected void handleViewChange(View v)
-
handle
public void handle(Address state_requester)
- Specified by:
handle
in interfaceProcessingQueue.Handler<Address>
-
handleStateReq
protected void handleStateReq(Address requester)
-
createStreamToRequester
protected void createStreamToRequester(Address requester)
Creates an OutputStream to the state requester to write the state
-
createStreamToProvider
protected abstract Tuple<java.io.InputStream,java.lang.Object> createStreamToProvider(Address provider, StreamingStateTransfer.StateHeader hdr) throws java.lang.Exception
Creates an InputStream to the state provider to read the state. Return the input stream and a handback object as a tuple. The handback object is handed back to the subclass when done, or in case of an error (e.g. to clean up resources)- Throws:
java.lang.Exception
-
close
protected void close(java.lang.Object resource)
-
useAsyncStateDelivery
protected boolean useAsyncStateDelivery()
-
modifyStateResponseHeader
protected void modifyStateResponseHeader(StreamingStateTransfer.StateHeader hdr)
-
handleStateRsp
protected void handleStateRsp(Address provider, StreamingStateTransfer.StateHeader hdr)
-
punchHoleFor
protected void punchHoleFor(Address member)
-
closeHoleFor
protected void closeHoleFor(Address member)
-
-