org.jgroups.protocols.pbcast
Class StreamingStateTransfer

java.lang.Object
  extended by org.jgroups.stack.Protocol
      extended by org.jgroups.protocols.pbcast.StreamingStateTransfer
Direct Known Subclasses:
STATE, STATE_SOCK

public abstract class StreamingStateTransfer
extends Protocol

Base class for state transfer protocols which use streaming (or chunking) to transfer state between two members.

The major advantage of this approach is that transferring application state to a joining member of a group does not entail loading of the complete application state into memory. The application state, for example, might be located entirely on some form of disk based storage. The default STATE_TRANSFER protocol requires this state to be loaded entirely into memory before being transferred to a group member while the streaming state transfer protocols do not. Thus the streaming state transfer protocols are able to transfer application state that is very large (>1Gb) without a likelihood of the such transfer resulting in OutOfMemoryException.

Note that prior to 3.0, there was only 1 streaming protocol: STATE. In 3.0 the functionality was split between STATE and STATE_SOCK, and common functionality moved up into StreamingStateTransfer.

Since:
3.0
Author:
Bela Ban, Vladimir Blagojevic
See Also:
STATE_TRANSFER, STATE, STATE_SOCK

Nested Class Summary
protected  class StreamingStateTransfer.StateGetter
          Thread which invokes MessageListener.getState(java.io.OutputStream) in the application
static class StreamingStateTransfer.StateHeader
           
 
Field Summary
protected  double avg_state_size
           
protected  java.util.concurrent.atomic.AtomicBoolean barrier_closed
          Used to prevent spurious open and close barrier calls
protected  int buffer_size
           
protected  boolean flushProtocolInStack
           
protected  Address local_addr
           
protected  int max_pool
           
protected  java.util.List<Address> members
           
protected  java.util.concurrent.atomic.AtomicLong num_bytes_sent
           
protected  java.util.concurrent.atomic.AtomicInteger num_state_reqs
           
protected  java.util.Map<Address,java.io.OutputStream> pending_state_transfers
          Whenever we get a state transfer request, we create an OutputStream and add the state requester's address and the OutputStream to this map.
protected  long pool_thread_keep_alive
           
protected  java.util.concurrent.locks.Lock state_lock
          Used to synchronize all state requests and responses
protected  Address state_provider
           
protected  java.util.concurrent.ThreadPoolExecutor thread_pool
          Thread pool (configured with max_pool and pool_thread_keep_alive) to run StreamingStateTransfer.StateGetter threads on
 
Fields inherited from class org.jgroups.stack.Protocol
down_prot, ergonomics, id, log, name, stack, stats, up_prot
 
Constructor Summary
StreamingStateTransfer()
           
 
Method Summary
 void closeBarrierAndSuspendStable()
           
protected abstract  void createStreamToProvider(Address provider, StreamingStateTransfer.StateHeader hdr)
          Creates an InputStream to the state provider to read the state
protected abstract  void createStreamToRequester(Address requester)
          Creates an OutputStream to the state requester to write the state
protected  java.util.concurrent.ThreadPoolExecutor createThreadPool()
           
 void destroy()
          This method is called on a Channel.close().
protected  Address determineCoordinator()
           
 java.lang.Object down(Event evt)
          An event is to be sent down the stack.
 double getAverageStateSize()
           
 long getNumberOfStateBytesSent()
           
 int getNumberOfStateRequests()
           
protected  void getStateFromApplication(Address requester, java.io.OutputStream out, boolean use_separate_thread)
           
 long getThreadPoolCompletedTasks()
           
 int getThreadPoolSize()
           
protected  void handleConfig(java.util.Map<java.lang.String,java.lang.Object> config)
           
protected  void handleEOF(Address sender)
           
protected  void handleException(java.lang.Throwable exception)
           
protected  void handleStateChunk(Address sender, byte[] buffer, int offset, int length)
           
protected  void handleStateReq(Address requester)
           
protected  void handleViewChange(View v)
           
 void init()
          Called after instance has been created (null constructor) and before protocol is started.
protected  boolean isDigestNeeded()
          When FLUSH is used we do not need to pass digests between members (see JGroups/doc/design/FLUSH.txt)
protected  void modifyStateResponseHeader(StreamingStateTransfer.StateHeader hdr)
           
 void openBarrierAndResumeStable()
           
protected  void removeRequester(Address requester)
           
 java.util.List<java.lang.Integer> requiredDownServices()
          List of events that are required to be answered by some layer below
 void resetStats()
           
protected  void sendEof(Address requester)
           
protected  void sendException(Address requester, java.lang.Throwable exception)
           
protected  void setStateInApplication(Address provider, java.io.InputStream in, Digest digest)
           
 void start()
          This method is called on a Channel.connect(String).
 void stop()
          This method is called on a Channel.disconnect().
 java.lang.Object up(Event evt)
          An event was received from the layer below.
 
Methods inherited from class org.jgroups.stack.Protocol
dumpStats, enableStats, getConfigurableObjects, getDownProtocol, getDownServices, getId, getIdsAbove, getLevel, getName, getProtocolStack, getSocketFactory, getThreadFactory, getTransport, getUpProtocol, getUpServices, getValue, isErgonomics, printStats, providedDownServices, providedUpServices, requiredUpServices, resetStatistics, setDownProtocol, setErgonomics, setId, setLevel, setProtocolStack, setSocketFactory, setUpProtocol, setValue, setValues, statsEnabled
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Field Detail

buffer_size

protected int buffer_size

max_pool

protected int max_pool

pool_thread_keep_alive

protected long pool_thread_keep_alive

num_state_reqs

protected final java.util.concurrent.atomic.AtomicInteger num_state_reqs

num_bytes_sent

protected final java.util.concurrent.atomic.AtomicLong num_bytes_sent

avg_state_size

protected double avg_state_size

local_addr

protected Address local_addr

state_provider

protected volatile Address state_provider

members

protected final java.util.List<Address> members

flushProtocolInStack

protected volatile boolean flushProtocolInStack

barrier_closed

protected java.util.concurrent.atomic.AtomicBoolean barrier_closed
Used to prevent spurious open and close barrier calls


thread_pool

protected java.util.concurrent.ThreadPoolExecutor thread_pool
Thread pool (configured with max_pool and pool_thread_keep_alive) to run StreamingStateTransfer.StateGetter threads on


pending_state_transfers

protected final java.util.Map<Address,java.io.OutputStream> pending_state_transfers
Whenever we get a state transfer request, we create an OutputStream and add the state requester's address and the OutputStream to this map. The state is fetched from the application on a separate thread, a StreamingStateTransfer.StateGetter thread.


state_lock

protected final java.util.concurrent.locks.Lock state_lock
Used to synchronize all state requests and responses

Constructor Detail

StreamingStateTransfer

public StreamingStateTransfer()
Method Detail

getNumberOfStateRequests

public int getNumberOfStateRequests()

getNumberOfStateBytesSent

public long getNumberOfStateBytesSent()

getAverageStateSize

public double getAverageStateSize()

getThreadPoolSize

public int getThreadPoolSize()

getThreadPoolCompletedTasks

public long getThreadPoolCompletedTasks()

requiredDownServices

public java.util.List<java.lang.Integer> requiredDownServices()
Description copied from class: Protocol
List of events that are required to be answered by some layer below

Overrides:
requiredDownServices in class Protocol

resetStats

public void resetStats()
Overrides:
resetStats in class Protocol

init

public void init()
          throws java.lang.Exception
Description copied from class: Protocol
Called after instance has been created (null constructor) and before protocol is started. Properties are already set. Other protocols are not yet connected and events cannot yet be sent.

Overrides:
init in class Protocol
Throws:
java.lang.Exception - Thrown if protocol cannot be initialized successfully. This will cause the ProtocolStack to fail, so the channel constructor will throw an exception

destroy

public void destroy()
Description copied from class: Protocol
This method is called on a Channel.close(). Does some cleanup; after the call the VM will terminate

Overrides:
destroy in class Protocol

start

public void start()
           throws java.lang.Exception
Description copied from class: Protocol
This method is called on a Channel.connect(String). Starts work. Protocols are connected and queues are ready to receive events. Will be called from bottom to top. This call will replace the START and START_OK events.

Overrides:
start in class Protocol
Throws:
java.lang.Exception - Thrown if protocol cannot be started successfully. This will cause the ProtocolStack to fail, so Channel.connect(String) will throw an exception

stop

public void stop()
Description copied from class: Protocol
This method is called on a Channel.disconnect(). Stops work (e.g. by closing multicast socket). Will be called from top to bottom. This means that at the time of the method invocation the neighbor protocol below is still working. This method will replace the STOP, STOP_OK, CLEANUP and CLEANUP_OK events. The ProtocolStack guarantees that when this method is called all messages in the down queue will have been flushed

Overrides:
stop in class Protocol

down

public java.lang.Object down(Event evt)
Description copied from class: Protocol
An event is to be sent down the stack. The layer may want to examine its type and perform some action on it, depending on the event's type. If the event is a message MSG, then the layer may need to add a header to it (or do nothing at all) before sending it down the stack using down_prot.down(). In case of a GET_ADDRESS event (which tries to retrieve the stack's address from one of the bottom layers), the layer may need to send a new response event back up the stack using up_prot.up().

Overrides:
down in class Protocol

up

public java.lang.Object up(Event evt)
Description copied from class: Protocol
An event was received from the layer below. Usually the current layer will want to examine the event type and - depending on its type - perform some computation (e.g. removing headers from a MSG event type, or updating the internal membership list when receiving a VIEW_CHANGE event). Finally the event is either a) discarded, or b) an event is sent down the stack using down_prot.down() or c) the event (or another event) is sent up the stack using up_prot.up().

Overrides:
up in class Protocol

isDigestNeeded

protected boolean isDigestNeeded()
When FLUSH is used we do not need to pass digests between members (see JGroups/doc/design/FLUSH.txt)

Returns:
true if use of digests is required, false otherwise

handleConfig

protected void handleConfig(java.util.Map<java.lang.String,java.lang.Object> config)

handleStateChunk

protected void handleStateChunk(Address sender,
                                byte[] buffer,
                                int offset,
                                int length)

handleEOF

protected void handleEOF(Address sender)

handleException

protected void handleException(java.lang.Throwable exception)

getStateFromApplication

protected void getStateFromApplication(Address requester,
                                       java.io.OutputStream out,
                                       boolean use_separate_thread)

removeRequester

protected void removeRequester(Address requester)

setStateInApplication

protected void setStateInApplication(Address provider,
                                     java.io.InputStream in,
                                     Digest digest)

closeBarrierAndSuspendStable

public void closeBarrierAndSuspendStable()

openBarrierAndResumeStable

public void openBarrierAndResumeStable()

sendEof

protected void sendEof(Address requester)

sendException

protected void sendException(Address requester,
                             java.lang.Throwable exception)

createThreadPool

protected java.util.concurrent.ThreadPoolExecutor createThreadPool()

determineCoordinator

protected Address determineCoordinator()

handleViewChange

protected void handleViewChange(View v)

handleStateReq

protected void handleStateReq(Address requester)

createStreamToRequester

protected abstract void createStreamToRequester(Address requester)
Creates an OutputStream to the state requester to write the state


createStreamToProvider

protected abstract void createStreamToProvider(Address provider,
                                               StreamingStateTransfer.StateHeader hdr)
Creates an InputStream to the state provider to read the state


modifyStateResponseHeader

protected void modifyStateResponseHeader(StreamingStateTransfer.StateHeader hdr)


Copyright © 1998-2012 Bela Ban / Red Hat. All Rights Reserved.