JBoss.orgCommunity Documentation

Chapter 4. Building Blocks

4.1. MessageDispatcher
4.1.1. RequestOptions
4.1.2. Requests and target destinations
4.1.3. Example
4.2. RpcDispatcher
4.2.1. Example
4.2.2. Response filters
4.3. Asynchronous invocation in MessageDispatcher and RpcDispatcher
4.4. ReplicatedHashMap
4.5. ReplCache
4.6. Cluster wide locking
4.6.1. Locking and merges
4.7. Cluster wide task execution
4.8. Cluster wide atomic counters
4.8.1. Design

Building blocks are layered on top of channels, and can be used instead of channels whenever a higher-level interface is required.

Whereas channels are simple socket-like constructs, building blocks may offer a far more sophisticated interface. In some cases, building blocks offer access to the underlying channel, so that -- if the building block at hand does not offer a certain functionality -- the channel can be accessed directly. Building blocks are located in the org.jgroups.blocks package.

Channels are simple patterns to asynchronously send and receive messages. However, a significant number of communication patterns in group communication require synchronous communication. For example, a sender would like to send a message to the group and wait for all responses. Or another application would like to send a message to the group and wait only until the majority of the receivers have sent a response, or until a timeout occurred.

MessageDispatcher provides blocking (and non-blocking) request sending and response correlation. It offers synchronous (as well as asynchronous) message sending with request-response correlation, e.g. matching one or multiple responses with the original request.

An example of using this class would be to send a request message to all cluster members, and block until all responses have been received, or until a timeout has elapsed.

Contrary to Section 4.2, “RpcDispatcher”, MessageDispatcher deals with sending message requests and correlating message responses, while RpcDispatcher deals with invoking method calls and correlating responses. RpcDispatcher extends MessageDispatcher, and offers an even higher level of abstraction over MessageDispatcher.

RpcDispatcher is essentially a way to invoke remote procedure calls (RCs) across a cluster.

Both MessageDispatcher and RpcDispatcher sit on top of a channel; therefore an instance of MessageDispatcher is created with a channel as argument. It can now be used in both client and server role: a client sends requests and receives responses and a server receives requests and sends responses. MessageDispatcher allows for an application to be both at the same time. To be able to serve requests in the server role, the RequestHandler.handle() method has to be implemented:

Object handle(Message msg) throws Exception;

The handle() method is called whenever a request is received. It must return a value (must be serializable, but can be null) or throw an exception. The returned value will be sent to the sender, and exceptions are also propagated to the sender.

Before looking at the methods of MessageDispatcher, let's take a look at RequestOptions first.

Every message sending in MessageDispatcher or request invocation in RpcDispatcher is governed by an instance of RequestOptions. This is a class which can be passed to a call to define the various options related to the call, e.g. a timeout, whether the call should block or not, the flags (see Section 5.13, “Tagging messages with flags”) etc.

The various options are:

  • Response mode: this determines whether the call is blocking and - if yes - how long it should block. The modes are:
    • GET_ALL: block until responses from all members (minus the suspected ones) have been received.
    • GET_NONE: wait for none. This makes the call non-blocking
    • GET_FIRST: block until the first response (from anyone) has been received
    • GET_MAJORITY: block until a majority of members have responded
  • Timeout: number of milliseconds we're willing to block. If the call hasn't terminated after the timeout elapsed, a TimeoutException will be thrown. A timeout of 0 means to wait forever. The timeout is ignored if the call is non-blocking (mode=GET_NONE)
  • Anycasting: if set to true, this means we'll use unicasts to individual members rather than sending multicasts. For example, if we have have TCP as transport, and the cluster is {A,B,C,D,E}, and we send a message through MessageDispatcher where dests={C,D}, and we do not want to send the request to everyone, then we'd set anycasting=true. This will send the request to C and D only, as unicasts, which is better if we use a transport such as TCP which cannot use IP multicasting (sending 1 packet to reach all members).
  • Response filter: A RspFilter allows for filtering of responses and user-defined termination of a call. For example, if we expect responses from 10 members, but can return after having received 3 non-null responses, a RspFilter could be used. See Section 4.2.2, “Response filters” for a discussion on response filters.
  • Scope: a short, defining a scope. This allows for concurrent delivery of messages from the same sender. See Section 5.4.4, “Scopes: concurrent message delivery for messages from the same sender” for a discussion on scopes.
  • Flags: the various flags to be passed to the message, see Section 5.13, “Tagging messages with flags” for details.
  • Exclusion list: here we can pass a list of members (addresses) that should be excluded. For example, if the view is A,B,C,D,E, and we set the exclusion list to A,C then the caller will wait for responses from everyone except A and C. Also, every recipient that's in the exclusion list will discard the message.

An example of how to use RequestOptions is:



RpcDispatcher disp;
RequestOptions opts=new RequestOptions(Request.GET_ALL)
                    .setFlags(Message.NO_FC).setFlags(Message.DONT_BUNDLE);
Object val=disp.callRemoteMethod(target, method_call, opts);
          

The methods to send requests are:



public <T> RspList<T>
       castMessage(final Collection<Address> dests,
                   Message msg,
                   RequestOptions options) throws Exception;
public <T> NotifyingFuture<RspList<T>>
       castMessageWithFuture(final Collection<Address> dests,
                             Message msg,
                             RequestOptions options) throws Exception;
public <T> T sendMessage(Message msg,
                         RequestOptions opts) throws Exception;
public <T> NotifyingFuture<T>
       sendMessageWithFuture(Message msg,
                             RequestOptions options) throws Exception;
      

castMessage() sends a message to all members defined in dests. If dests is null, the message will be sent to all members of the current cluster. Note that a possible destination set in the message will be overridden. If a message is sent synchronously (defined by options.mode) then options.timeout defines the maximum amount of time (in milliseconds) to wait for the responses.

castMessage() returns a RspList, which contains a map of addresses and Rsps; there's one Rsp per member listed in dests.

A Rsp instance contains the response value (or null), an exception if the target handle() method threw an exception, whether the target member was suspected, or not, and so on. See the example below for more details.

castMessageWithFuture() returns immediately, with a future. The future can be used to fetch the response list (now or later), and it also allows for installation of a callback which will be invoked whenever the future is done. See Section 4.2.1.1, “Asynchronous calls with futures” for details on how to use NotifyingFutures.

sendMessage() allows an application programmer to send a unicast message to a single cluster member and receive the response. The destination of the message has to be non-null (valid address of a member). The mode argument is ignored (it is by default set to ResponseMode.GET_FIRST) unless it is set to GET_NONE in which case the request becomes asynchronous, ie. we will not wait for the response.

sendMessageWithFuture() returns immediately with a future, which can be used to fetch the result.

One advantage of using this building block is that failed members are removed from the set of expected responses. For example, when sending a message to 10 members and waiting for all responses, and 2 members crash before being able to send a response, the call will return with 8 valid responses and 2 marked as failed. The return value of castMessage() is a RspList which contains all responses (not all methods shown):



public class RspList<T> implements Map<Address,Rsp> {
    public boolean isReceived(Address sender);
    public int     numSuspectedMembers();
    public List<T> getResults();
    public List<Address> getSuspectedMembers();
    public boolean isSuspected(Address sender);
    public Object  get(Address sender);
    public int     size();
}
      

isReceived() checks whether a response from sender has already been received. Note that this is only true as long as no response has yet been received, and the member has not been marked as failed. numSuspectedMembers() returns the number of members that failed (e.g. crashed) during the wait for responses. getResults() returns a list of return values. get() returns the return value for a specific member.

This section shows an example of how to use a MessageDispatcher.



public class MessageDispatcherTest implements RequestHandler {
    Channel            channel;
    MessageDispatcher  disp;
    RspList            rsp_list;
    String             props; // to be set by application programmer
    public void start() throws Exception {
        channel=new JChannel(props);
        disp=new MessageDispatcher(channel, null, null, this);
        channel.connect("MessageDispatcherTestGroup");
        for(int i=0; i < 10; i++) {
            Util.sleep(100);
            System.out.println("Casting message #" + i);
            rsp_list=disp.castMessage(null,
                new Message(null, null, new String("Number #" + i)),
                ResponseMode.GET_ALL, 0);
            System.out.println("Responses:\n" +rsp_list);
        }
        channel.close();
        disp.stop();
    }
    public Object handle(Message msg) throws Exception {
        System.out.println("handle(): " + msg);
        return "Success !";
    }
    public static void main(String[] args) {
        try {
            new MessageDispatcherTest().start();
        }
        catch(Exception e) {
            System.err.println(e);
        }
    }
}
      

The example starts with the creation of a channel. Next, an instance of MessageDispatcher is created on top of the channel. Then the channel is connected. The MessageDispatcher will from now on send requests, receive matching responses (client role) and receive requests and send responses (server role).

We then send 10 messages to the group and wait for all responses. The timeout argument is 0, which causes the call to block until all responses have been received.

The handle() method simply prints out a message and returns a string. This will be sent back to the caller as a response value (in Rsp.value). Has the call thrown an exception, Rsp.exception would be set instead.

Finally both the MessageDispatcher and channel are closed.

RpcDispatcher is derived from MessageDispatcher. It allows a programmer to invoke remote methods in all (or single) cluster members and optionally wait for the return value(s). An application will typically create a channel first, and then create an RpcDispatcher on top of it. RpcDispatcher can be used to invoke remote methods (client role) and at the same time be called by other members (server role).

Compared toMessageDispatcher, no handle() method needs to be implemented. Instead the methods to be called can be placed directly in the class using regular method definitions (see example below). The methods will get invoked using reflection.

To invoke remote method calls (unicast and multicast) the following methods are used:



public <T> RspList<T>
       callRemoteMethods(Collection<Address> dests,
                         String method_name,
                         Object[] args,
                         Class[] types,
                         RequestOptions options) throws Exception;
public <T> RspList<T>
       callRemoteMethods(Collection<Address> dests,
                         MethodCall method_call,
                         RequestOptions options) throws Exception;
public <T> NotifyingFuture<RspList<T>>
       callRemoteMethodsWithFuture(Collection<Address> dests,
                                   MethodCall method_call,
                                   RequestOptions options) throws Exception;
public <T> T callRemoteMethod(Address dest,
                              String method_name,
                              Object[] args,
                              Class[] types,
                              RequestOptions options) throws Exception;
public <T> T callRemoteMethod(Address dest,
                              MethodCall call,
                              RequestOptions options) throws Exception;
public <T> NotifyingFuture<T>
       callRemoteMethodWithFuture(Address dest,
                                  MethodCall call,
                                  RequestOptions options) throws Exception;
    

The family of callRemoteMethods() methods is invoked with a list of receiver addresses. If null, the method will be invoked in all cluster members (including the sender). Each call takes the target members to invoke it on (null mean invoke on all cluster members), a method and a RequestOption.

The method can be given as (1) the method name, (2) the arguments and (3) the argument types, or a MethodCall (containing a java.lang.reflect.Method and argument) can be given instead.

As with MessageDispatcher, a RspList or a future to a RspList is returned.

The family of callRemoteMethod() methods takes almost the same parameters, except that there is only one destination address instead of a list. If the dest argument is null, the call will fail.

The callRemoteMethod() calls return the actual result (or type T), or throw an exception if the method threw an exception on the target member.

Java's Reflection API is used to find the correct method in the target member according to the method name and number and types of supplied arguments. There is a runtime exception if a method cannot be resolved.

Note that we could also use method IDs and the MethodLookup interface to resolve methods, which is faster and has every RPC carry less data across the wire. To see how this is done, have a look at some of the MethodLookup implementations, e.g. in RpcDispatcherSpeedTest.

The code below shows an example of using RpcDispatcher:



public class RpcDispatcherTest {
    JChannel           channel;
    RpcDispatcher      disp;
    RspList            rsp_list;
    String             props; // set by application
    public static int print(int number) throws Exception {
        return number * 2;
    }
    public void start() throws Exception {
        MethodCall call=new MethodCall(getClass().getMethod("print", int.class));
        RequestOptions opts=new RequestOptions(ResponseMode.GET_ALL, 5000);
        channel=new JChannel(props);
        disp=new RpcDispatcher(channel, this);
        channel.connect("RpcDispatcherTestGroup");
        for(int i=0; i < 10; i++) {
            Util.sleep(100);
            rsp_list=disp.callRemoteMethods(null,
                                            "print",
                                            new Object[]{i},
                                            new Class[]{int.class},
                                            opts);
            // Alternative: use a (prefabricated) MethodCall:
            // call.setArgs(i);
            // rsp_list=disp.callRemoteMethods(null, call, opts);
            System.out.println("Responses: " + rsp_list);
        }
        channel.close();
        disp.stop();
    }
    public static void main(String[] args) throws Exception {
        new RpcDispatcherTest().start();
    }
}
     

Class RpcDispatcher defines method print() which will be called subsequently. The entry point start() creates a channel and an RpcDispatcher which is layered on top. Method callRemoteMethods() then invokes the remote print() in all cluster members (also in the caller). When all responses have been received, the call returns and the responses are printed.

As can be seen, the RpcDispatcher building block reduces the amount of code that needs to be written to implement RPC-based group communication applications by providing a higher abstraction level between the application and the primitive channels.

Response filters allow application code to hook into the reception of responses from cluster members and can let the request-response execution and correlation code know (1) wether a response is acceptable and (2) whether more responses are needed, or whether the call (if blocking) can return. The RspFilter interface looks as follows:



public interface RspFilter {
    boolean isAcceptable(Object response, Address sender);
    boolean needMoreResponses();
}
          

isAcceptable() is given a response value and the address of the member which sent the response, and needs to decide whether the response is valid (should return true) or not (should return false).

needMoreResponses() determine whether a call returns or not.

The sample code below shows how to use a RspFilter:



public void testResponseFilter() throws Exception {
    final long timeout = 10 * 1000 ;
    RequestOptions opts;
    opts=new RequestOptions(ResponseMode.GET_ALL,
                            timeout, false,
                            new RspFilter() {
                                int num=0;
                                public boolean isAcceptable(Object response,
                                                            Address sender) {
                                    boolean retval=((Integer)response).intValue() > 1;
                                    if(retval)
                                        num++;
                                    return retval;
                                }
                                public boolean needMoreResponses() {
                                    return num < 2;
                                }
                            });
    RspList rsps=disp1.callRemoteMethods(null, "foo", null, null, opts);
    System.out.println("responses are:\n" + rsps);
    assert rsps.size() == 3;
    assert rsps.numReceived() == 2;
}
          

Here, we invoke a cluster wide RPC (dests=null), which blocks (mode=GET_ALL) for 10 seconds max (timeout=10000), but also passes an instance of RspFilter to the call (in options).

The filter accepts all responses whose value is greater than 2, and returns as soon as it has received 2 responses which satisfy the above condition.

By default, a message received by a MessageDispatcher or RpcDispatcher is dispatched into application code by calling method handle from RequestHandler:



public interface RequestHandler {
    Object handle(Message msg) throws Exception;
}
        

In the case of RpcDispatcher, the handle() method converts the message's contents into a method call, invokes the method against the target object and returns the result (or throws an exception). The return value of handle() is then sent back to the sender of the message.

The invocation is synchronous, ie. done on the thread responsible for dispatching this particular message from the network up the stack all the way into the application. The thread is therefore unusable for the duration of the method invocation.

If the invocation takes a while, e.g. because locks are acquired or the application waits on some I/O, as the current thread is busy, another thread will be used for a different request message. This can quickly lead to the thread pool being exhausted or many messages getting queued if the pool has an associated queue.

Therefore a new way of dispatching messages to the application was devised: the asynchronous invocation API:



public interface AsyncRequestHandler extends RequestHandler {
    void handle(Message request, Response response) throws Exception;
}
        

Extending RequestHandler, interface AsyncRequestHandler adds an additional method taking a request message and a Response object. The request message contains the same information as before (e.g. a method call plus args). The Response argument is used to send a reply (if needed) at a later time, when processing is done.



public interface Response {
    void send(Object reply, boolean is_exception);
}
        

Response encapsulates information about the request (e.g. request ID and sender), and has method reply() to send a response. The is_exception parameter can be set to true if the reply is actually an exception, e.g. that was thrown when handle() ran application code.

The advantage of the new API is that it can, but doesn't have to, be used asynchronously. The default implementation still uses the synchronous invocation style:



public void handle(Message request, Response response) throws Exception {
    Object retval=handle(request);
    if(response != null)
        response.send(retval, false);
}
        

Method handle() is called, which synchronously calls into application code and returns a result, which is subsequently sent back to the sender of the request message.

However, an application could subclass MessageDispatcher or RpcDispatcher (as done in Infinispan), or it could set a custom request handler via MessageDispatcher.setRequestHandler(), and implement handle() by dispatching the processing to a thread from a thread pool. The thread which guided the request message from the network up to this point would be therefore immediately released and could be used for other messages. The response would be sent whenever the invocation of application code is done, and thus the thread from the thread pool would not be blocked on I/O, trying to acquire locks or anything else that blocks in application code.

To set the mode which is used, method MessageDispatcher.asyncDispatching(boolean) can be used. This can be changed even at runtime, to switch between sync and async invocation style.

Asynchrounous invocation is typically used in conjunction with an application thread pool. The application knows (JGroups doesn't) which requests can be processed in parallel and which ones can't. For example, all OOB calls could be dispatched directly to the thread pool, as ordering of OOB requests is not important, but regular requests should be added to a queue where they are processed sequentually.

The main benefit here is that request dispatching (and ordering) is now under application control if the application wants to do that. If not, we can still use synchronous invocation.

A good example where asynchronous invocation makes sense are replicated web sessions. If a cluster node A has 1000 web sessions, then replication of updates across the cluster generates messages from A. Because JGroups delivers messages from the same sender sequentially, even updates to unrelated web sessions are delivered in strict order.

With asynchronous invocation, the application could devise a dispatching strategy which assigns updates to different (unrelated) web sessions to any available thread from the pool, but queues updates to the same session, and processes those by the same thread, to provide ordering of updates to the same session. This would speed up overall processing, as updates to a web session 1 on A don't have to wait until all updates to an unrelated web session 2 on A have been processed.

This is similar to what the Section 7.14.4, “SCOPE” protocol tried to achieve.

This class was written as a demo of how state can be shared between nodes of a cluster. It has never been heavily tested and is therefore not meant to be used in production.

A ReplicatedHashMap uses a concurrent hashmap internally and allows to create several instances of hashmaps in different processes. All of these instances have exactly the same state at all times. When creating such an instance, a cluster name determines which cluster of replicated hashmaps will be joined. The new instance will then query the state from existing members and update itself before starting to service requests. If there are no existing members, it will simply start with an empty state.

Modifications such as put(), clear() or remove() will be propagated in orderly fashion to all replicas. Read-only requests such as get() will only be invoked on the local hashmap.

Since both keys and values of a hashtable will be sent across the network, they have to be serializable. Putting a non-serializable value in the map will result in an exception at marshalling time.

A ReplicatedHashMap allows to register for notifications, e.g. when data is added removed. All listeners will get notified when such an event occurs. Notification is always local; for example in the case of removing an element, first the element is removed in all replicas, which then notify their listener(s) of the removal (after the fact).

ReplicatedHashMap allow members in a group to share common state across process and machine boundaries.

ReplCache is a distributed cache which - contrary to ReplicatedHashMap - doesn't replicate its values to all cluster members, but just to selected backups.

A put(K,V,R) method has a replication count R which determines on how many cluster members key K and value V should be stored. When we have 10 cluster members, and R=3, then K and V will be stored on 3 members. If one of those members goes down, or leaves the cluster, then a different member will be told to store K and V. ReplCache tries to always have R cluster members store K and V.

A replication count of -1 means that a given key and value should be stored on all cluster members.

The mapping between a key K and the cluster member(s) on which K will be stored is always deterministic, and is computed using a consistent hash function.

Note that this class was written as a demo of how state can be shared between nodes of a cluster. It has never been heavily tested and is therefore not meant to be used in production.

In 2.12, a new distributed locking service was added, replacing DistributedLockManager. The new service is implemented as a protocol and is used via org.jgroups.blocks.locking.LockService.

LockService talks to the locking protocol via events. The main abstraction of a distributed lock is an implementation of java.util.concurrent.locks.Lock. All lock methods are supported, however, conditions are not fully supported, and still need some more testing (as of July 2011).

Below is an example of how LockService is typically used:



// locking.xml needs to have a locking protocol
JChannel ch=new JChannel("/home/bela/locking.xml");
LockService lock_service=new LockService(ch);
ch.connect("lock-cluster");
Lock lock=lock_service.getLock("mylock");
lock.lock();
try {
    // do something with the locked resource
}
finally {
    lock.unlock();
}
        

In the example, we create a channel, then a LockService, then connect the channel. If the channel's configuration doesn't include a locking protocol, an exception will be thrown. Then we grab a lock named "mylock", which we lock and subsequently unlock. If another member P had already acquired "mylock", we'd block until P released the lock, or P left the cluster or crashed.

Note that the owner of a lock is always a given thread in a cluster, so the owner is the JGroups address and the thread ID. This means that different threads inside the same JVM trying to access the same named lock will compete for it. If thread-22 grabs the lock first, then thread-5 will block until thread-23 releases the lock.

JGroups includes a demo (org.jgroups.demos.LockServiceDemo), which can be used to interactively experiment with distributed locks. LockServiceDemo -h dumps all command line options.

Currently (Jan 2011), there are 2 protocols which provide locking: Section 7.14.10.2, “PEER_LOCK” and Section 7.14.10.1, “CENTRAL_LOCK”. The locking protocol has to be placed at or towards the top of the stack (close to the channel).

The following scenario is susceptible to network partitioning and subsequent merging: we have a cluster view of {A,B,C,D} and then the cluster splits into {A,B} and {C,D}. Assume that B and D now acquire a lock "mylock". This is what happens (with the locking protocol being CENTRAL_LOCK):

There is no easy way (via the Lock API) to 'remove' the lock from D. We could for example simply release D's lock on "mylock", but then there's no way telling D that the lock it holds is actually stale !

Therefore the recommended solution here is for nodes to listen to MergeView changes if they expect merging to occur, and re-acquire all of their locks after a merge, e.g.:



Lock l1, l2, l3;
LockService lock_service;
...
public void viewAccepted(View view) {
    if(view instanceof MergeView) {
        new Thread() {
            public void run() {
                lock_service.unlockAll();
                // stop all access to resources protected by l1, l2 or l3
                // every thread needs to re-acquire the locks it holds
            }
        }.start
    }
}
            

In 2.12, a distributed execution service was added. The new service is implemented as a protocol and is used via org.jgroups.blocks.executor.ExecutionService.

ExecutionService extends java.util.concurrent.ExecutorService and distributes tasks submitted to it across the cluster, trying to distribute the tasks to the cluster members as evenly as possible. When a cluster member leaves or dies, the tasks is was processing are re-distributed to other members in the cluster.

ExecutionService talks to the executing protocol via events. The main abstraction is an implementation of java.util.concurrent.ExecutorService. All methods are supported. The restrictions are however that the Callable or Runnable must be Serializable, Externalizable or Streamable. Also the result produced from the future needs to be Serializable, Externalizable or Streamable. If the Callable or Runnable are not, then an IllegalArgumentException is immediately thrown. If a result is not, then a NotSerializableException with the name of the class will be returned to the Future as an exception cause.

Below is an example of how ExecutionService is typically used:



// executing.xml needs to have a locking protocol
JChannel ch=new JChannel("/home/bela/executing.xml");
ExecutionService exec_service =new ExecutionService(ch);
ch.connect("exec-cluster");
Future<Value> future = exec_service.submit(new MyCallable());
try {
    Value value = future.get();
    // Do something with value
}
catch (InterruptedException e) {
    e.printStackTrace();
}
catch (ExecutionException e) {
    e.getCause().printStackTrace();
}
        

In the example, we create a channel, then an ExecutionService, then connect the channel. Then we submit our callable giving us a Future. Then we wait for the future to finish returning our value and do something with it. If any exception occurs we print the stack trace of that exception.

The ExecutionService follows the Producer-Consumer Pattern very closely. The ExecutionService is used as the Producer for this Pattern. Therefore the service only passes tasks off to be handled and doesn't do anything with the actual invocation of those tasks. There is a separate class that can was written specifically as a consumer, which can be ran on any node of the cluster. This class is ExecutionRunner and implements java.lang.Runnable. A user is required to run one or more instances of a ExecutionRunner on a node of the cluster. By having a thread run one of these runners, that thread has no volunteered to be able to run any task that is submitted to the cluster via an ExecutionService. This allows for any node in the cluster to participate or not participate in the running of these tasks and also any node can optionally run more than 1 ExecutionRunner if this node has additional capacity to do so. A runner will run indefinately until the thread that is currently running it is interrupted. If a task is running when the runner is interrupted the task will be interrupted.

Below is an example of how simple it is to have a single node start and allow for 10 distributed tasks to be executed simultaneously on it:



int runnerCount = 10;
// locking.xml needs to have a locking protocol
JChannel ch=new JChannel("/home/bela/executing.xml");
ch.connect("exec-cluster");
ExecutionRunner runner = new ExecutionRunner(ch);
ExecutorService service = Executors.newFixedThreadPool(runnerCount);
for (int i = 0; i < runnerCount; ++i) {
   // If you want to stop the runner hold onto the future
   // and cancel with interrupt.
   service.submit(runner);
}
        

In the example, we create a channel, then connect the channel, then an ExecutionRunner. Then we create a java.util.concurrent.ExecutorService that is used to start 10 threads that each thread runs the ExecutionRunner. This allows for this node to have 10 threads actively accept and work on requests submitted via any ExecutionService in the cluster.

Since an ExecutionService does not allow for non serializable class instances to be sent across as tasks there are 2 utility classes provided to get around this problem. For users that are used to using a CompletionService with an Executor there is an equivalent ExecutionCompletionService provided that allows for a user to have the same functionality. It would have been preferred to allow for the same ExecutorCompletionService to be used, but due to it's implementation using a non serializable object the ExecutionCompletionService was implemented to be used instead in conjunction with an ExecutorService. Also utility class was designed to help users to submit tasks which use a non serializable class. The Executions class contains a method serializableCallable which allows for a user to pass a constructor of a class that implements Callable and it's arguments to then return to a user a Callable that will upon running will automatically create and object from the constructor passing the provided arguments to it and then will call the call method on the object and return it's result as a normal callable. All the arguments provided must still be serializable and the return object as detailed previously.

JGroups includes a demo (org.jgroups.demos.ExecutionServiceDemo), which can be used to interactively experiment with a distributed sort algorithm and performance. This is for demonstration purposes and performance should not be assumed to be better than local. ExecutionServiceDemo -h dumps all command line options.

Currently (July 2011), there is 1 protocol which provide executions: Section 7.14.11, “CENTRAL_EXECUTOR”. The executing protocol has to be placed at or towards the top of the stack (close to the channel).

Cluster wide counters provide named counters (similar to AtomicLong) which can be changed atomically. 2 nodes incrementing the same counter with initial value 10 will see 11 and 12 as results, respectively.

To create a named counter, the following steps have to be taken:

In the first step, we add COUNTER to the top of the protocol stack configuration:



<config>
    ...
    <MFC max_credits="2M"
         min_threshold="0.4"/>
    <FRAG2 frag_size="60K"  />
    <COUNTER bypass_bundling="true" timeout="5000"/>
</config>
        

Configuration of the COUNTER protocol is described in Section 7.14.12, “COUNTER”.

Next, we create a CounterService, which is used to create and delete named counters:



ch=new JChannel(props);
CounterService counter_service=new CounterService(ch);
ch.connect("counter-cluster");
Counter counter=counter_service.getOrCreateCounter("mycounter", 1);
        

In the sample code above, we create a channel first, then create the CounterService referencing the channel. Then we connect the channel and finally create a new named counter "mycounter", with an initial value of 1. If the counter already exists, the existing counter will be returned and the initial value will be ignored.

CounterService doesn't consume any messages from the channel over which it is created; instead it grabs a reference to the COUNTER protocols and invokes methods on it directly. This has the advantage that CounterService is non-intrusive: many instances can be created over the same channel. CounterService even co-exists with other services which use the same mechanism, e.g. LockService or ExecutionService (see above).

The returned counter instance implements interface Counter:



package org.jgroups.blocks.atomic;
public interface Counter {
    public String getName();
    /**
     * Gets the current value of the counter
     * @return The current value
     */
    public long get();
    /**
     * Sets the counter to a new value
     * @param new_value The new value
     */
    public void set(long new_value);
    /**
     * Atomically updates the counter using a CAS operation
     *
     * @param expect The expected value of the counter
     * @param update The new value of the counter
     * @return True if the counter could be updated, false otherwise
     */
    public boolean compareAndSet(long expect, long update);
    /**
     * Atomically increments the counter and returns the new value
     * @return The new value
     */
    public long incrementAndGet();
    /**
     * Atomically decrements the counter and returns the new value
     * @return The new value
     */
    public long decrementAndGet();
    /**
     * Atomically adds the given value to the current value.
     *
     * @param delta the value to add
     * @return the updated value
     */
    public long addAndGet(long delta);
}
        

The design of COUNTER is described in details in CounterService.

In a nutshell, in a cluster the current coordinator maintains a hashmap of named counters. Members send requests (increment, decrement etc) to it, and the coordinator atomically applies the requests and sends back responses.

The advantage of this centralized approach is that - regardless of the size of a cluster - every request has a constant execution cost, namely a network round trip.

A crash or leaving of the coordinator is handled as follows. The coordinator maintains a version for every counter value. Whenever the counter value is changed, the version is incremented. For every request that modifies a counter, both the counter value and the version are returned to the requester. The requester caches all counter values and associated versions in its own local cache.

When the coordinator leaves or crashes, the next-in-line member becomes the new coordinator. It then starts a reconciliation phase, and discards all requests until the reconciliation phase has completed. The reconciliation phase solicits all members for their cached values and versions. To reduce traffic, the request also carries all version numbers with it.

Clients return values whose versions are higher than the ones shipped by the new coordinator. The new coordinator waits for responses from all members or timeout milliseconds. Then it updates its own hashmap with values whose versions are higher than its own. Finally, it stops discarding requests and sends a resend message to all clients in order to resend any requests that might be pending.

There's another edge case that also needs to be covered: if a client P updates a counter, and both P and the coordinator crash, then the update is lost. To reduce the chances of this happening, COUNTER can be enabled to replicate all counter changes to one or more backup coordinators. The num_backups property defines the number of such backups. Whenever a counter was changed in the current coordinator, it also updates the backups (asynchronously). 0 disables this.