JBoss.orgCommunity Documentation
This chapter describes the most frequently used protocols, and their configuration. Ergonomics (Section 5.15, “Ergonomics”) strives to reduce the number of properties that have to be configured, by dynamically adjusting them at run time, however, this is not yet in place.
Meanwhile, we recommend that users should copy one of the predefined configurations (shipped with JGroups), e.g.
udp.xml
or tcp.xml
, and make only minimal changes to it.
This section is work in progress; we strive to update the documentation as we make changes to the code.
The table below lists properties that are available in all protocols, as they're defined in the superclass
of all protocols, org.jgroups.stack.Protocol
.
Table 7.1. Properties of org.jgroups.stack.Protocol
Name | Description |
---|---|
stats | Whether the protocol should collect protocol-specific runtime statistics. What those statistics are (or whether they even exist) depends on the particular protocol. See the org.jgroups.stack.Protocol javadoc for the available API related to statistics. Default is true. |
ergonomics | Turns on ergonomics. See Section 5.15, “Ergonomics” for details. |
id | Gives the protocol a different ID if needed so we can have multiple instances of it in the same stack |
TP
is the base class for all transports, e.g. UDP and TCP. All of the properties
defined here are inherited by the subclasses. The properties for TP
are:
Table 7.2. Properties
Name | Description |
---|---|
bind_addr | The bind address which should be used by this transport. The following special values are also recognized: GLOBAL, SITE_LOCAL, LINK_LOCAL, NON_LOOPBACK, match-interface, match-host, match-address |
bind_interface_str | The interface (NIC) which should be used by this transport |
bind_port | The port to which the transport binds. Default of 0 binds to any (ephemeral) port |
bundler_capacity | The max number of elements in a bundler if the bundler supports size limitations |
bundler_type | The type of bundler used. Has to be "old" or "new" (default) |
diagnostics_addr | Address for diagnostic probing. Default is 224.0.75.75 |
diagnostics_bind_interfaces | Comma delimited list of interfaces (IP addresses or interface names) that the diagnostics multicast socket should bind to |
diagnostics_passcode | Authorization passcode for diagnostics. If specified every probe query will be authorized |
diagnostics_port | Port for diagnostic probing. Default is 7500 |
diagnostics_ttl | TTL of the diagnostics multicast socket |
discard_incompatible_packets | Discard packets with a different version if true |
enable_batching | Allows the transport to pass received message batches up as MessagesBatch instances (up(MessageBatch)), rather than individual messages. This flag will be removed in a future version when batching has been implemented by all protocols |
enable_bundling | Enable bundling of smaller messages into bigger ones. Default is true |
enable_diagnostics | Switch to enable diagnostic probing. Default is true |
enable_unicast_bundling | Enable bundling of smaller messages into bigger ones for unicast messages. Default is true |
external_addr | Use "external_addr" if you have hosts on different networks, behind firewalls. On each firewall, set up a port forwarding rule (sometimes called "virtual server") to the local IP (e.g. 192.168.1.100) of the host then on each host, set "external_addr" TCP transport parameter to the external (public IP) address of the firewall. |
external_port | Used to map the internal port (bind_port) to an external port. Only used if > 0 |
internal_thread_pool_enabled | Switch for enabling thread pool for internal messages |
internal_thread_pool_keep_alive_time | Timeout in ms to remove idle threads from the internal pool |
internal_thread_pool_max_threads | Maximum thread pool size for the internal thread pool |
internal_thread_pool_min_threads | Minimum thread pool size for the internal thread pool |
internal_thread_pool_queue_enabled | Queue to enqueue incoming internal messages |
internal_thread_pool_queue_max_size | Maximum queue size for incoming internal messages |
internal_thread_pool_rejection_policy | Thread rejection policy. Possible values are Abort, Discard, DiscardOldest and Run |
log_discard_msgs | whether or not warnings about messages from different groups are logged |
log_discard_msgs_version | whether or not warnings about messages from members with a different version are discarded |
logical_addr_cache_expiration | Time (in ms) after which entries in the logical address cache marked as removable are removed |
logical_addr_cache_max_size | Max number of elements in the logical address cache before eviction starts |
loopback | Messages to self are looped back immediately if true |
max_bundle_size | Maximum number of bytes for messages to be queued until they are sent |
max_bundle_timeout | Max number of milliseconds until queued messages are sent |
oob_thread_pool.keep_alive_time | Timeout in ms to remove idle threads from the OOB pool |
oob_thread_pool.max_threads | Max thread pool size for the OOB thread pool |
oob_thread_pool.min_threads | Minimum thread pool size for the OOB thread pool |
oob_thread_pool_enabled | Switch for enabling thread pool for OOB messages. Default=true |
oob_thread_pool_queue_enabled | Use queue to enqueue incoming OOB messages |
oob_thread_pool_queue_max_size | Maximum queue size for incoming OOB messages |
oob_thread_pool_rejection_policy | Thread rejection policy. Possible values are Abort, Discard, DiscardOldest and Run |
physical_addr_max_fetch_attempts | Max number of attempts to fetch a physical address (when not in the cache) before giving up |
port_range | The range of valid ports, from bind_port to end_port. 0 only binds to bind_port and fails if taken |
receive_interfaces | Comma delimited list of interfaces (IP addresses or interface names) to receive multicasts on |
receive_on_all_interfaces | If true, the transport should use all available interfaces to receive multicast messages |
singleton_name | If assigned enable this transport to be a singleton (shared) transport |
suppress_time_different_cluster_warnings | Time during which identical warnings about messages from a member from a different cluster will be suppressed. 0 disables this (every warning will be logged). Setting the log level to ERROR also disables this. |
suppress_time_different_version_warnings | Time during which identical warnings about messages from a member with a different version will be suppressed. 0 disables this (every warning will be logged). Setting the log level to ERROR also disables this. |
thread_naming_pattern | Thread naming pattern for threads in this channel. Valid values are "pcl": "p": includes the thread name, e.g. "Incoming thread-1", "UDP ucast receiver", "c": includes the cluster name, e.g. "MyCluster", "l": includes the local address of the current member, e.g. "192.168.5.1:5678" |
thread_pool.keep_alive_time | Timeout in milliseconds to remove idle thread from regular pool |
thread_pool.max_threads | Maximum thread pool size for the regular thread pool |
thread_pool.min_threads | Minimum thread pool size for the regular thread pool |
thread_pool_enabled | Switch for enabling thread pool for regular messages |
thread_pool_queue_enabled | Queue to enqueue incoming regular messages |
thread_pool_queue_max_size | Maximum queue size for incoming OOB messages |
thread_pool_rejection_policy | Thread rejection policy. Possible values are Abort, Discard, DiscardOldest and Run |
tick_time | Tick duration in the HashedTimingWheel timer. Only applicable if timer_type is "wheel" |
timer.keep_alive_time | Timeout in ms to remove idle threads from the timer pool |
timer.max_threads | Max thread pool size for the timer thread pool |
timer.min_threads | Minimum thread pool size for the timer thread pool |
timer_queue_max_size | Max number of elements on a timer queue |
timer_rejection_policy | Timer rejection policy. Possible values are Abort, Discard, DiscardOldest and Run |
timer_type | Type of timer to be used. Valid values are "old" (DefaultTimeScheduler, used up to 2.10), "new" or "new2" (TimeScheduler2), "new3" (TimeScheduler3) and "wheel". Note that this property might disappear in future releases, if one of the 3 timers is chosen as default timer |
wheel_size | Number of ticks in the HashedTimingWheel timer. Only applicable if timer_type is "wheel" |
who_has_cache_timeout | Timeout (in ms) to determine how long to wait until a request to fetch the physical address for a given logical address will be sent again. Subsequent requests for the same physical address will therefore be spaced at least who_has_cache_timeout ms apart |
bind_addr
can be set to the address of a network interface, e.g. 192.168.1.5
.
It can also be set for the entire stack using system property -Djgroups.bind_addr
, which
overrrides the XML value (if given).
The following special values are also recognized for bind_addr
:
Picks a global IP address if available. If not, falls back to a SITE_LOCAL IP address.
Picks a site local (non routable) IP address, e.g. from the 192.168.0.0
or
10.0.0.0
address range.
Picks a link-local IP address, from 169.254.1.0
through
169.254.254.255
.
Picks any non loopback address.
Pick a loopback address, e.g. 127.0.0.1
.
Pick an address which matches a pattern against the interface name,
e.g. match-interface:eth.*
Pick an address which matches a pattern against the host address,
e.g. match-address:192.168.*
Pick an address which matches a pattern against the host name,
e.g. match-host:linux.*
An example of setting the bind address in UDP to use a site local address is:
<UDP bind_addr="SITE_LOCAL" />
This will pick any address of any interface that's site-local, e.g. a 192.168.x.x
or
10.x.x.x
address.
UDP uses IP multicast for sending messages to all members of a group and UDP datagrams for unicast messages (sent to a single member). When started, it opens a unicast and multicast socket: the unicast socket is used to send/receive unicast messages, whereas the multicast socket sends and receives multicast messages. The channel's physical address will be the address and port number of the unicast socket.
A protocol stack with UDP as transport protocol is typically used with clusters whose members run in the same subnet. If running across subnets, an admin has to ensure that IP multicast is enabled across subnets. It is often the case that IP multicast is not enabled across subnets. In such cases, the stack has to either use UDP without IP multicasting or other transports such as TCP.
Table 7.3. Properties
Name | Description |
---|---|
disable_loopback | If true, disables IP_MULTICAST_LOOP on the MulticastSocket (for sending and receiving of multicast packets). IP multicast packets send on a host P will therefore not be received by anyone on P. Use with caution. |
ip_mcast | Multicast toggle. If false multiple unicast datagrams are sent instead of one multicast. Default is true |
ip_ttl | The time-to-live (TTL) for multicast datagram packets. Default is 8 |
max_bundle_size | Maximum number of bytes for messages to be queued until they are sent |
mcast_group_addr | The multicast address used for sending and receiving packets. Default is 228.8.8.8 |
mcast_port | The multicast port used for sending and receiving packets. Default is 7600 |
mcast_recv_buf_size | Receive buffer size of the multicast datagram socket. Default is 500'000 bytes |
mcast_send_buf_size | Send buffer size of the multicast datagram socket. Default is 100'000 bytes |
tos | Traffic class for sending unicast and multicast datagrams. Default is 8 |
ucast_recv_buf_size | Receive buffer size of the unicast datagram socket. Default is 64'000 bytes |
ucast_send_buf_size | Send buffer size of the unicast datagram socket. Default is 100'000 bytes |
Specifying TCP in your protocol stack tells JGroups to use TCP to send messages between cluster members. Instead of using a multicast bus, the cluster members create a mesh of TCP connections.
For example, while UDP sends 1 IP multicast packet when sending a message to a cluster of 10 members, TCP needs to send the message 9 times. It sends the same message to the first member, to the second member, and so on (excluding itself as the message is looped back internally).
This is slow, as the cost of sending a group message is O(n) with TCP, where it is O(1) with UDP. As the cost of sending a group message with TCP is a function of the cluster size, it becomes higher with larger clusters.
We recommend to use UDP for larger clusters, whenever possible
Table 7.4. Properties
Name | Description |
---|---|
client_bind_addr | The address of a local network interface which should be used by client sockets to bind to. The following special values are also recognized: GLOBAL, SITE_LOCAL, LINK_LOCAL and NON_LOOPBACK |
client_bind_port | The local port a client socket should bind to. If 0, an ephemeral port will be picked. |
conn_expire_time | Max time connection can be idle before being reaped (in ms) |
defer_client_bind_addr | If true, client sockets will not explicitly bind to bind_addr but will defer to the native socket |
linger | SO_LINGER in msec. Default of -1 disables it |
peer_addr_read_timeout | Max time to block on reading of peer address |
reaper_interval | Reaper interval in msec. Default is 0 (no reaping) |
recv_buf_size | Receiver buffer size in bytes |
send_buf_size | Send buffer size in bytes |
send_queue_size | Max number of messages in a send queue |
sock_conn_timeout | Max time allowed for a socket creation in connection table |
tcp_nodelay | Should TCP no delay flag be turned on |
use_send_queues | Should separate send queues be used for each connection |
TUNNEL was described in Section 5.3.4, “TUNNEL”.
Table 7.5. Properties (experimental)
Name | Description |
---|---|
gossip_router_hosts | A comma-separated list of GossipRouter hosts, e.g. HostA[12001],HostB[12001] |
reconnect_interval | Interval in msec to attempt connecting back to router in case of torn connection. Default is 5000 msec |
tcp_nodelay | Should TCP no delay flag be turned on |
The task of the discovery is to find an initial membership, which is used to determine the current coordinator. Once a coordinator is found, the joiner sends a JOIN request to the coord.
Discovery is also called periodically by MERGE2 (see Section 7.4.1, “MERGE2”), to see if we have diverging cluster membership information.
Discovery
is the superclass for all discovery protocols and therefore its
properties below can be used in any subclass.
Discovery sends a discovery request, and waits for num_initial_members
discovery
responses, or timeout
ms, whichever occurs first, before returning. Note that
break_on_coord_rsp="true"
will return as soon as we have a response from a coordinator.
Table 7.6. Properties
Name | Description |
---|---|
break_on_coord_rsp | Return from the discovery phase as soon as we have 1 coordinator response |
force_sending_discovery_rsps | Always sends a discovery response, no matter what |
num_initial_members | Minimum number of initial members to get a response from |
num_initial_srv_members | Minimum number of server responses (PingData.isServer()=true). If this value is greater than 0, we'll ignore num_initial_members |
return_entire_cache | Whether or not to return the entire logical-physical address cache mappings on a discovery request, or not. |
stagger_timeout | If greater than 0, we'll wait a random number of milliseconds in range [0..stagger_timeout] before sending a discovery response. This prevents traffic spikes in large clusters when everyone sends their discovery response at the same time |
timeout | Timeout to wait for the initial members |
use_disk_cache | If a persistent disk cache (PDC) is present, combine the discovery results with the contents of the disk cache before returning the results |
Initial (dirty) discovery of members. Used to detect the coordinator (oldest member), by mcasting PING requests to an IP multicast address.
Each member responds with a packet {C, A}, where C=coordinator's address and A=own address. After N milliseconds or M replies, the joiner determines the coordinator from the responses, and sends a JOIN request to it (handled by GMS). If nobody responds, we assume we are the first member of a group.
Unlike TCPPING, PING employs dynamic discovery, meaning that the member does not have to know in advance where other cluster members are.
PING
uses the IP multicasting capabilities of the transport to send a discovery
request to the cluster. It therefore requires UDP as transport.
TCPPING is used with TCP as transport, and uses a static list of cluster members's addresses. See Section 5.3.3.1, “Using TCP and TCPPING” for details.
Table 7.7. Properties
Name | Description |
---|---|
initial_hosts | Comma delimited list of hosts to be contacted for initial membership |
max_dynamic_hosts | max number of hosts to keep beyond the ones in initial_hosts |
port_range | Number of additional ports to be probed for membership. A port_range of 0 does not probe additional ports. Example: initial_hosts=A[7800] port_range=0 probes A:7800, port_range=1 probes A:7800 and A:7801 |
It is recommended to include the addresses of all cluster members in
initial_hosts
.
TCPGOSSIP uses an external GossipRouter to discover the members of a cluster. See Section 5.3.3.2, “Using TCP and TCPGOSSIP” for details.
Table 7.8. Properties
Name | Description |
---|---|
initial_hosts | Comma delimited list of hosts to be contacted for initial membership |
reconnect_interval | Interval (ms) by which a disconnected stub attempts to reconnect to the GossipRouter |
sock_conn_timeout | Max time for socket creation. Default is 1000 msec |
sock_read_timeout | Max time in milliseconds to block on a read. 0 blocks forever |
MPING (=Multicast PING) uses IP multicast to discover the initial membership. It can be used with all transports, but usually is used in combination with TCP. TCP usually requires TCPPING, which has to list all cluster members explicitly, but MPING doesn't have this requirement. The typical use case for this is when we want TCP as transport, but multicasting for discovery so we don't have to define a static list of initial hosts in TCPPING
MPING uses its own multicast socket for discovery. Properties bind_addr
(can also
be set via -Djgroups.bind_addr=
), mcast_addr
and
mcast_port
can be used to configure it.
Note that MPING requires a separate thread listening on the multicast socket for discovery requests.
Table 7.9. Properties
Name | Description |
---|---|
bind_addr | Bind address for multicast socket. The following special values are also recognized: GLOBAL, SITE_LOCAL, LINK_LOCAL and NON_LOOPBACK |
bind_interface_str | The interface (NIC) which should be used by this transport |
ip_ttl | Time to live for discovery packets. Default is 8 |
mcast_addr | Multicast address to be used for discovery |
mcast_port | Multicast port for discovery packets. Default is 7555 |
receive_interfaces | List of interfaces to receive multicasts on |
receive_on_all_interfaces | If true, the transport should use all available interfaces to receive multicast messages. Default is false |
send_interfaces | List of interfaces to send multicasts on |
send_on_all_interfaces | Whether send messages are sent on all interfaces. Default is false |
FILE_PING can be used instead of GossipRouter in cases where no external process is desired.
JDBC_PING is an alternative to S3_PING by using Amazon RDS instead of S3.
Table 7.10. Properties
Name | Description |
---|---|
connection_driver | The JDBC connection driver name |
connection_password | The JDBC connection password |
connection_url | The JDBC connection URL |
connection_username | The JDBC connection username |
datasource_jndi_name | To use a DataSource registered in JNDI, specify the JNDI name here. This is an alternative to all connection_* configuration options: if this property is not empty, then all connection relatedproperties must be empty. |
delete_single_sql | SQL used to delete a row. Customizable, but keep the order of parameters and pick compatible types: 1)Own Address, as String 2)Cluster name, as String |
initialize_sql | If not empty, this SQL statement will be performed at startup.Customize it to create the needed table on those databases which permit table creation attempt without loosing data, such as PostgreSQL and MySQL (using IF NOT EXISTS). To allow for creation attempts, errors performing this statement will be loggedbut not considered fatal. To avoid any DDL operation, set this to an empty string. |
insert_single_sql | SQL used to insert a new row. Customizable, but keep the order of parameters and pick compatible types: 1)Own Address, as String 2)Cluster name, as String 3)Serialized PingData as byte[] |
select_all_pingdata_sql | SQL used to fetch all node's PingData. Customizable, but keep the order of parameters and pick compatible types: only one parameter needed, String compatible, representing the Cluster name. Must return a byte[], the Serialized PingData as it was stored by the insert_single_sql statement |
BPING uses UDP broadcasts to discover other nodes. The default broadcast address (dest) is 255.255.255.255, and should be replaced with a subnet specific broadcast, e.g. 192.168.1.255.
Table 7.11. Properties (experimental)
Name | Description |
---|---|
bind_port | Port for discovery packets |
dest | Target address for broadcasts. This should be restricted to the local subnet, e.g. 192.168.1.255 |
port_range | Sends discovery packets to ports 8555 to (8555+port_range) |
RACKSPACE_PING uses Rackspace Cloud Files Storage to discover initial members. Each node writes a small object in a shared Rackspace container. New joiners read all addresses from the container and ping each of the elements of the resulting set of members. When a member leaves, it deletes its corresponding object.
This objects are stored under a container called 'jgroups', and each node will write an object name after the cluster name, plus a "/" followed by the address, thus simulating a hierarchical structure.
Table 7.12. Properties
Name | Description |
---|---|
apiKey | Rackspace API access key |
container | Name of the root container |
region | Rackspace region, either UK or US |
username | Rackspace username |
S3_PING uses Amazon S3 to discover initial members. New joiners read all addresses from this bucket and ping each of the elements of the resulting set of members. When a member leaves, it deletes its corresponding file.
It's designed specifically for members running on Amazon EC2, where multicast traffic is not allowed and thus MPING or PING will not work. When Amazon RDS is preferred over S3, or if a shared database is used, an alternative is to use JDBC_PING.
Each instance uploads a small file to an S3 bucket and each instance reads the files out of this bucket to determine the other members.
There are three different ways to use S3_PING, each having its own tradeoffs between security and ease-of-use. These are described in more detail below:
Pre-signed URLs are the most secure method since writing to buckets still requires authorization and you don't have to pass Amazon AWS credentials to every instance. However, they are also the most complex to setup.
Here's a configuration example for private buckets with credentials given to each instance:
<S3_PING location="my_bucket" access_key="access_key"
secret_access_key="secret_access_key" timeout="2000"
num_initial_members="3"/>
Here's an example for public buckets with no credentials:
<S3_PING location="my_bucket"
timeout="2000" num_initial_members="3"/>
And finally, here's an example for public readable buckets with pre-signed URLs:
<S3_PING pre_signed_put_url="http://s3.amazonaws.com/my_bucket/DemoCluster/node1?AWSAccessKeyId=access_key&Expires=1316276200&Signature=it1cUUtgCT9ZJyCJDj2xTAcRTFg%3D"
pre_signed_delete_url="http://s3.amazonaws.com/my_bucket/DemoCluster/node1?AWSAccessKeyId=access_key&Expires=1316276200&Signature=u4IFPRq%2FL6%2FAohykIW4QrKjR23g%3D"
timeout="2000" num_initial_members="3"/>
Table 7.13. Properties
Name | Description |
---|---|
access_key | The access key to AWS (S3) |
pre_signed_delete_url | When non-null, we use this pre-signed URL for DELETEs |
pre_signed_put_url | When non-null, we use this pre-signed URL for PUTs |
prefix | When non-null, we set location to prefix-UUID |
secret_access_key | The secret access key to AWS (S3) |
SWIFT_PING uses Openstack Swift to discover initial members. Each node writes a small object in a shared container. New joiners read all addresses from the container and ping each of the elements of the resulting set of members. When a member leaves, it deletes its corresponding object.
These objects are stored under a container called 'jgroups' (by default), and each node will write an object name after the cluster name, plus a "/" followed by the address, thus simulating a hierarchical structure.
Currently only Openstack Keystone authentication is supported. Here is a sample configuration block:
<SWIFT_PING timeout="2000"
num_initial_members="3"
auth_type="keystone_v_2_0"
auth_url="http://localhost:5000/v2.0/tokens"
username="demo"
password="password"
tenant="demo" />
Table 7.14. Properties (experimental)
Name | Description |
---|---|
auth_type | Authentication type |
auth_url | Authentication url |
container | Name of the root container |
password | Password |
tenant | Openstack Keystone tenant name |
username | Username |
This is a protocol written by Meltmedia, which uses the AWS API. It is not part of JGroups, but can be downloaded at https://metmedia.github.com/jgroups-aws.
The Persistent Discovery Cache can be used to cache the results of the discovery process persistently. E.g. if we have TCPPING.initial_hosts configured to include only members A and B, but have a lot more members, then other members can bootstrap themselves and find the right coordinator even when neither A nor B are running.
An example of a TCP-based stack configuration is:
<TCP />
<PDC cache_dir="/tmp/jgroups" />
<TCPPING timeout="2000" num_initial_members="20"
initial_hosts="192.168.1.5[7000]" port_range="0"
return_entire_cache="true"
use_disk_cache="true" />
Table 7.15. Properties
Name | Description |
---|---|
cache_dir | The absolute path of the directory for the disk cache. The mappings will be stored as individual files in this directory |
If a cluster gets split for some reasons (e.g. network partition), this protocol merges the subclusters
back into one cluster. It is only run by the coordinator (the oldest member in a cluster), which
periodically multicasts its presence and view information. If another coordinator (for the same cluster)
receives this message, it will initiate a merge process. Note that this merges subgroups
{A,B}
and {C,D,E}
back into {A,B,C,D,E}
,
but it does not merge state. The application has to handle the callback to merge
state. See Section 5.6, “Handling network partitions” for suggestion on merging states.
Following a merge, the coordinator of the merged group can shift from the typical case of
"the coordinator is the member who has been up the longest." During the merge process, the coordinators
of the various subgroups need to reach a common decision as to who the new coordinator is.
In order to ensure a consistent result, each coordinator combines the addresses of all the members
in a list and then sorts the list. The first member in the sorted list becomes the coordinator.
The sort order is determined by how the address implements the interface. Then JGroups compares based
on the UUID. So, take a hypothetical case where two machines were running, with one machine running
three separate cluster members and the other two members. If communication between the machines were cut,
the following subgroups would form:
{A,B} and {C,D,E}
Following the merge, the new view would be: {C,D,A,B,E}
, with C being the new
coordinator.
Note that "A", "B" and so on are just logical names, attached to UUIDs, but the actual sorting is done on the actual UUIDs.
Table 7.16. Properties
Name | Description |
---|---|
inconsistent_view_threshold | Number of inconsistent views with only 1 coord after a MERGE event is sent up |
max_interval | Maximum time in ms between runs to discover other clusters |
merge_fast | When receiving a multicast message, checks if the sender is member of the cluster. If not, initiates a merge. Generates a lot of traffic for large clusters when there is a lot of merging |
merge_fast_delay | The delay (in milliseconds) after which a merge fast execution is started |
min_interval | Minimum time in ms between runs to discover other clusters |
MERGE3 was added in JGroups 3.1.
In MERGE3, all members periodically send an INFO message with their address (UUID), logical name, physical address and ViewId. The ViewId (Section 3.7.1, “ViewId”) is used to see if we have diverging views among the cluster members: periodically, every coordinator looks at the INFO messages received so far and checks if there are any inconsistencies.
When inconsistencies are found, the merge leader will be the member with the lowest address (UUID). This is deterministic, and therefore we should at most times only have 1 merge going on.
The merge leader then asks the senders of the inconsistent ViewIds for their full Views. Once received, it simply passes a MERGE event up the stack, where the merge will be handled (by GMS) in exactly the same way as if MERGE2 has generated the MERGE event.
The advantages of MERGE3 compared to MERGE2 are:
Table 7.17. Properties
Name | Description |
---|---|
max_interval | Interval (in milliseconds) when the next info message will be sent. A random value is picked from range [1..max_interval] |
max_participants_in_merge | The max number of merge participants to be involved in a merge. 0 sets this to unlimited. |
min_interval | Minimum time in ms before sending an info message |
The task of failure detection is to probe members of a group and see whether they are alive. When a member is suspected (= deemed dead), then a SUSPECT message is sent to all nodes of the cluster. It is not the task of the failure detection layer to exclude a crashed member (this is done by the group membership protocol, GMS), but simply to notify everyone that a node in the cluster is suspected of having crashed.
The SUSPECT message is handled by the GMS protocol of the current coordinator only; all other members ignore it.
Failure detection based on heartbeat messages. If reply is not
received without timeout
ms, max_tries
times, a member is declared
suspected, and will be excluded by GMS
Each member send a message containing a "FD" - HEARTBEAT header to its neighbor to the right (identified by the ping_dest address). The heartbeats are sent by the inner class Monitor. When the neighbor receives the HEARTBEAT, it replies with a message containing a "FD" - HEARTBEAT_ACK header. The first member watches for "FD" - HEARTBEAT_ACK replies from its neigbor. For each received reply, it resets the last_ack timestamp (sets it to current time) and num_tries counter (sets it to 0). The same Monitor instance that sends heartbeats whatches the difference between current time and last_ack. If this difference grows over timeout, the Monitor cycles several more times (until max_tries) is reached) and then sends a SUSPECT message for the neighbor's address. The SUSPECT message is sent down the stack, is addressed to all members, and is as a regular message with a FdHeader.SUSPECT header.
Table 7.18. Properties
Name | Description |
---|---|
max_tries | Number of times to send an are-you-alive message |
msg_counts_as_heartbeat | Treat messages received from members as heartbeats. Note that this means we're updating a value in a hashmap every time a message is passing up the stack through FD, which is costly. |
timeout | Timeout to suspect a node P if neither a heartbeat nor data were received from P. |
Failure detection based on simple heartbeat protocol. Every member periodically multicasts a heartbeat. Every member also maintains a table of all members (minus itself). When data or a heartbeat from P are received, we reset the timestamp for P to the current time. Periodically, we check for expired members, and suspect those.
Example: <FD_ALL interval="3000" timeout="10000"/>
In the example above, we send a heartbeat every 3 seconds and suspect members if we haven't received a heartbeat (or traffic) for more than 10 seconds. Note that since we check the timestamps every 'interval' milliseconds, we will suspect a member after roughly 4 * 3s == 12 seconds. If we set the timeout to 8500, then we would suspect a member after 3 * 3 secs == 9 seconds.
Table 7.19. Properties
Name | Description |
---|---|
interval | Interval at which a HEARTBEAT is sent to the cluster |
msg_counts_as_heartbeat | Treat messages received from members as heartbeats. Note that this means we're updating a value in a hashmap every time a message is passing up the stack through FD_ALL, which is costly. Default is false |
timeout | Timeout after which a node P is suspected if neither a heartbeat nor data were received from P |
timeout_check_interval | Interval at which the HEARTBEAT timeouts are checked |
Failure detection protocol based on a ring of TCP sockets created between cluster members. Each member in a cluster connects to its neighbor (the last member connects to the first), thus forming a ring. Member B is suspected when its neighbor A detects abnormally closing of its TCP socket (presumably due to a node B crash). However, if a member B is about to leave gracefully, it lets its neighbor A know, so that it does not become suspected.
If you are using a multi NIC machine note that JGroups versions prior to 2.2.8 have FD_SOCK implementation that does not assume this possibility. Therefore JVM can possibly select NIC unreachable to its neighbor and setup FD_SOCK server socket on it. Neighbor would be unable to connect to that server socket thus resulting in immediate suspecting of a member. Suspected member is kicked out of the group, tries to rejoin, and thus goes into join/leave loop. JGroups version 2.2.8 introduces srv_sock_bind_addr property so you can specify network interface where FD_SOCK TCP server socket should be bound. This network interface is most likely the same interface used for other JGroups traffic. JGroups versions 2.2.9 and newer consult bind.address system property or you can specify network interface directly as FD_SOCK bind_addr property.
Table 7.20. Properties
Name | Description |
---|---|
bind_addr | The NIC on which the ServerSocket should listen on. The following special values are also recognized: GLOBAL, SITE_LOCAL, LINK_LOCAL and NON_LOOPBACK |
bind_interface_str | The interface (NIC) which should be used by this transport |
client_bind_port | Start port for client socket. Default value of 0 picks a random port |
external_addr | Use "external_addr" if you have hosts on different networks, behind firewalls. On each firewall, set up a port forwarding rule (sometimes called "virtual server") to the local IP (e.g. 192.168.1.100) of the host then on each host, set "external_addr" TCP transport parameter to the external (public IP) address of the firewall. |
external_port | Used to map the internal port (bind_port) to an external port. Only used if > 0 |
get_cache_timeout | Timeout for getting socket cache from coordinator. Default is 1000 msec |
keep_alive | Whether to use KEEP_ALIVE on the ping socket or not. Default is true |
num_tries | Number of attempts coordinator is solicited for socket cache until we give up. Default is 3 |
port_range | Number of ports to probe for start_port and client_bind_port |
sock_conn_timeout | Max time in millis to wait for ping Socket.connect() to return |
start_port | Start port for server socket. Default value of 0 picks a random port |
suspect_msg_interval | Interval for broadcasting suspect messages. Default is 5000 msec |
FD_PING uses a script or command that is run with 1 argument (the host to be pinged) and needs to return 0 (success) or 1 (failure). The default command is /sbin/ping (ping.exe on Windows), but this is user configurable and can be replaced with any user-provided script or executable.
Verifies that a suspected member is really dead by pinging that member one last time before excluding it, and dropping the suspect message if the member does respond.
VERIFY_SUSPECT tries to minimize false suspicions.
The protocol works as follows: it catches SUSPECT events traveling up the stack. The it verifies that the suspected member is really dead. If yes, it passes the SUSPECT event up the stack, otherwise it discards it. VERIFY_SUSPECT Has to be placed somewhere above the failure detection protocol and below the GMS protocol (receiver of the SUSPECT event). Note that SUSPECT events may be reordered by this protocol.
Table 7.21. Properties
Name | Description |
---|---|
bind_addr | Interface for ICMP pings. Used if use_icmp is true The following special values are also recognized: GLOBAL, SITE_LOCAL, LINK_LOCAL and NON_LOOPBACK |
bind_interface_str | The interface (NIC) which should be used by this transport |
num_msgs | Number of verify heartbeats sent to a suspected member |
timeout | Number of millisecs to wait for a response from a suspected member |
use_icmp | Use InetAddress.isReachable() to verify suspected member instead of regular messages |
use_mcast_rsps | Send the I_AM_NOT_DEAD message back as a multicast rather than as multiple unicasts (default is false) |
NAKACK provides reliable delivery and FIFO (= First In First Out) properties for messages sent to all nodes in a cluster.
Reliable delivery means that no message sent by a sender will ever be lost, as all messages are numbered with sequence numbers (by sender) and retransmission requests are sent to the sender of a message[9] if that sequence number is not received.
FIFO order means that all messages from a given sender are received in exactly the order in which they were sent.
NAKACK is a Lossless and FIFO delivery of multicast messages, using negative acks. E.g. when receiving P:1, P:3, P:4, a receiver delivers only P:1, and asks P for retransmission of message 2, queuing P3-4. When P2 is finally received, the receiver will deliver P2-4 to the application.
Table 7.22. Properties
Name | Description |
---|---|
become_server_queue_size | Size of the queue to hold messages received after creating the channel, but before being connected (is_server=false). After becoming the server, the messages in the queue are fed into up() and the queue is cleared. The motivation is to avoid retransmissions (see https://issues.jboss.org/browse/JGRP-1509 for details). 0 disables the queue. |
discard_delivered_msgs | Should messages delivered to application be discarded |
exponential_backoff | The first value (in milliseconds) to use in the exponential backoff. Enabled if greater than 0 |
log_discard_msgs | discards warnings about promiscuous traffic |
log_not_found_msgs | If true, trashes warnings about retransmission messages not found in the xmit_table (used for testing) |
max_msg_batch_size | Max number of messages to be removed from a NakReceiverWindow. This property might get removed anytime, so don't use it ! |
max_rebroadcast_timeout | Timeout to rebroadcast messages. Default is 2000 msec |
print_stability_history_on_failed_xmit | Should stability history be printed if we fail in retransmission. Default is false |
retransmit_timeouts | Timeout before requesting retransmissions |
suppress_time_non_member_warnings | Time during which identical warnings about messages from a non member will be suppressed. 0 disables this (every warning will be logged). Setting the log level to ERROR also disables this. |
use_mcast_xmit | Retransmit retransmit responses (messages) using multicast rather than unicast |
use_mcast_xmit_req | Use a multicast to request retransmission of missing messages |
use_range_based_retransmitter | Whether to use the old retransmitter which retransmits individual messages or the new one which uses ranges of retransmitted messages. Default is true. Note that this property will be removed in 3.0; it is only used to switch back to the old (and proven) retransmitter mechanism if issues occur |
xmit_from_random_member | Ask a random member for retransmission of a missing message. Default is false |
xmit_stagger_timeout | Number of milliseconds to delay the sending of an XMIT request. We pick a random number in the range [1 .. xmit_req_stagger_timeout] and add this to the scheduling time of an XMIT request. When use_mcast_xmit is enabled, if a number of members drop messages from the same member, then chances are that, if staggering is enabled, somebody else already sent the XMIT request (via mcast) and we can cancel the XMIT request once we receive the missing messages. For unicast XMIT responses (use_mcast_xmit=false), we still have an advantage by not overwhelming the receiver with XMIT requests, all at the same time. 0 disabless staggering. |
xmit_table_max_compaction_time | Number of milliseconds after which the matrix in the retransmission table is compacted (only for experts) |
xmit_table_msgs_per_row | Number of elements of a row of the matrix in the retransmission table (only for experts). The capacity of the matrix is xmit_table_num_rows * xmit_table_msgs_per_row |
xmit_table_num_rows | Number of rows of the matrix in the retransmission table (only for experts) |
xmit_table_resize_factor | Resize factor of the matrix in the retransmission table (only for experts) |
NAKACK2 was introduced in 3.1 and is a successor to NAKACK (at some point it will replace NAKACK). It has the same properties as NAKACK, but its implementation is faster and uses less memory, plus it creates fewer tasks in the timer.
Some of the properties of NAKACK were deprecated in NAKACK2, but were not removed so people can simply change from NAKACK to NAKACK2 by changing the protocol name in the config.
Table 7.23. Properties
Name | Description |
---|---|
become_server_queue_size | Size of the queue to hold messages received after creating the channel, but before being connected (is_server=false). After becoming the server, the messages in the queue are fed into up() and the queue is cleared. The motivation is to avoid retransmissions (see https://issues.jboss.org/browse/JGRP-1509 for details). 0 disables the queue. |
discard_delivered_msgs | Should messages delivered to application be discarded |
log_discard_msgs | discards warnings about promiscuous traffic |
log_not_found_msgs | If true, trashes warnings about retransmission messages not found in the xmit_table (used for testing) |
max_msg_batch_size | Max number of messages to be removed from a RingBuffer. This property might get removed anytime, so don't use it ! |
max_rebroadcast_timeout | Timeout to rebroadcast messages. Default is 2000 msec |
print_stability_history_on_failed_xmit | Should stability history be printed if we fail in retransmission. Default is false |
suppress_time_non_member_warnings | Time during which identical warnings about messages from a non member will be suppressed. 0 disables this (every warning will be logged). Setting the log level to ERROR also disables this. |
use_mcast_xmit | Retransmit retransmit responses (messages) using multicast rather than unicast |
use_mcast_xmit_req | Use a multicast to request retransmission of missing messages |
xmit_from_random_member | Ask a random member for retransmission of a missing message. Default is false |
xmit_interval | Interval (in milliseconds) at which missing messages (from all retransmit buffers) are retransmitted |
xmit_table_max_compaction_time | Number of milliseconds after which the matrix in the retransmission table is compacted (only for experts) |
xmit_table_msgs_per_row | Number of elements of a row of the matrix in the retransmission table (only for experts). The capacity of the matrix is xmit_table_num_rows * xmit_table_msgs_per_row |
xmit_table_num_rows | Number of rows of the matrix in the retransmission table (only for experts) |
xmit_table_resize_factor | Resize factor of the matrix in the retransmission table (only for experts) |
UNICAST provides reliable delivery and FIFO (= First In First Out) properties for point-to-point messages between one sender and one receiver.
Reliable delivery means that no message sent by a sender will ever be lost, as all messages are numbered with sequence numbers (by sender) and retransmission requests are sent to the sender of a message if that sequence number is not received.
FIFO order means that all messages from a given sender are received in exactly the order in which they were sent.
UNICAST uses positive acks for retransmission; sender A keeps sending message M until receiver B delivers M and sends an ack(M) to A, or until B leaves the cluster or A crashes.
Although JGroups attempts to send acks selectively, UNICAST will still see a lot of acks on the wire. If this is not desired, use UNICAST2 (see Section 7.6.4, “UNICAST2”).
On top of a reliable transport, such as TCP, UNICAST is not really needed. However, concurrent delivery of messages from the same sender is prevented by UNICAST by acquiring a lock on the sender's retransmission table, so unless concurrent delivery is desired, UNICAST should not be removed from the stack even if TCP is used.
Table 7.24. Properties
Name | Description |
---|---|
conn_expiry_timeout | Time (in milliseconds) after which an idle incoming or outgoing connection is closed. The connection will get re-established when used again. 0 disables connection reaping |
max_msg_batch_size | Max number of messages to be removed from a retransmit window. This property might get removed anytime, so don't use it ! |
max_retransmit_time | Max number of milliseconds we try to retransmit a message to any given member. After that, the connection is removed. Any new connection to that member will start with seqno #1 again. 0 disables this |
segment_capacity | Size (in bytes) of a Segment in the segments table. Only for experts, do not use ! |
timeout | n/a |
xmit_interval | Interval (in milliseconds) at which messages in the send windows are resent |
xmit_table_max_compaction_time | Number of milliseconds after which the matrix in the retransmission table is compacted (only for experts) |
xmit_table_msgs_per_row | Number of elements of a row of the matrix in the retransmission table (only for experts). The capacity of the matrix is xmit_table_num_rows * xmit_table_msgs_per_row |
xmit_table_num_rows | Number of rows of the matrix in the retransmission table (only for experts) |
xmit_table_resize_factor | Resize factor of the matrix in the retransmission table (only for experts) |
UNICAST2 provides lossless, ordered, communication between 2 members. Contrary to UNICAST, it uses negative acks (similar to NAKACK) rather than positive acks. This reduces the communication overhead required for sending an ack for every message.
Negative acks have sender A simply send messages without retransmission, and receivers never ack messages, until they detect a gap: for instance, if A sends messages 1,2,4,5, then B delivers 1 and 2, but queues 4 and 5 because it is missing message 3 from A. B then asks A to retransmit 3. When 3 is received, messages 3, 4 and 5 can be delivered to the application.
Compared to a positive ack scheme as used in UNICAST, negative acks have the advantage that they generate less traffic: if all messages are received in order, we never need to do retransmission.
Table 7.25. Properties
Name | Description |
---|---|
conn_expiry_timeout | Time (in milliseconds) after which an idle incoming or outgoing connection is closed. The connection will get re-established when used again. 0 disables connection reaping |
exponential_backoff | The first value (in milliseconds) to use in the exponential backoff. Enabled if greater than 0 |
log_not_found_msgs | If true, trashes warnings about retransmission messages not found in the xmit_table (used for testing) |
max_bytes | Max number of bytes before a stability message is sent to the sender |
max_msg_batch_size | Max number of messages to be removed from a NakReceiverWindow. This property might get removed anytime, so don't use it ! |
max_retransmit_time | Max number of milliseconds we try to retransmit a message to any given member. After that, the connection is removed. Any new connection to that member will start with seqno #1 again. 0 disables this |
max_stable_msgs | Max number of STABLE messages sent for the same highest_received seqno. A value < 1 is invalid |
stable_interval | Max number of milliseconds before a stability message is sent to the sender(s) |
timeout | list of timeouts |
use_range_based_retransmitter | Whether to use the old retransmitter which retransmits individual messages or the new one which uses ranges of retransmitted messages. Default is true. Note that this property will be removed in 3.0; it is only used to switch back to the old (and proven) retransmitter mechanism if issues occur |
xmit_interval | Interval (in milliseconds) at which missing messages (from all retransmit buffers) are retransmitted |
xmit_table_automatic_purging | If enabled, the removal of a message from the retransmission table causes an automatic purge (only for experts) |
xmit_table_max_compaction_time | Number of milliseconds after which the matrix in the retransmission table is compacted (only for experts) |
xmit_table_msgs_per_row | Number of elements of a row of the matrix in the retransmission table (only for experts). The capacity of the matrix is xmit_table_num_rows * xmit_table_msgs_per_row |
xmit_table_num_rows | Number of rows of the matrix in the retransmission table (only for experts) |
xmit_table_resize_factor | Resize factor of the matrix in the retransmission table (only for experts) |
UNICAST3 (available in 3.3) is the successor to UNICAST2, but is based on UNICAST, as it uses a positive acknowledgment mechanism. However, speed wise it is similar to UNICAST2
Details of UNICAST3's design can be found here: UNICAST3
Table 7.26. Properties
Name | Description |
---|---|
ack_batches_immediately | Send an ack for a batch immediately instead of using a delayed ack |
conn_close_timeout | Time (in ms) until a connection marked to be closed will get removed. 0 disables this |
conn_expiry_timeout | Time (in milliseconds) after which an idle incoming or outgoing connection is closed. The connection will get re-established when used again. 0 disables connection reaping |
log_not_found_msgs | If true, trashes warnings about retransmission messages not found in the xmit_table (used for testing) |
max_msg_batch_size | Max number of messages to be removed from a retransmit window. This property might get removed anytime, so don't use it ! |
max_retransmit_time | Max number of milliseconds we try to retransmit a message to any given member. After that, the connection is removed. Any new connection to that member will start with seqno #1 again. 0 disables this |
xmit_interval | Interval (in milliseconds) at which messages in the send windows are resent |
xmit_table_max_compaction_time | Number of milliseconds after which the matrix in the retransmission table is compacted (only for experts) |
xmit_table_msgs_per_row | Number of elements of a row of the matrix in the retransmission table (only for experts). The capacity of the matrix is xmit_table_num_rows * xmit_table_msgs_per_row |
xmit_table_num_rows | Number of rows of the matrix in the retransmission table (only for experts) |
xmit_table_resize_factor | Resize factor of the matrix in the retransmission table (only for experts) |
The RSVP protocol is not a reliable delivery protocol per se, but augments reliable protocols such as NAKACK, UNICAST or UNICAST2. It should be placed somewhere above these in the stack.
Table 7.27. Properties (experimental)
Name | Description |
---|---|
ack_on_delivery | When true, we pass the message up to the application and only then send an ack. When false, we send an ack first and only then pass the message up to the application. |
resend_interval | Interval (in milliseconds) at which we resend the RSVP request. Needs to be < timeout. 0 disables it. |
throw_exception_on_timeout | Whether an exception should be thrown when the timeout kicks in, and we haven't yet received all acks. An exception would be thrown all the way up to JChannel.send() |
timeout | Max time in milliseconds to block for an RSVP'ed message (0 blocks forever). |
To serve potential retransmission requests, a member has to store received messages until it is known that every member in the cluster has received them. Message stability for a given message M means that M has been seen by everyone in the cluster.
The stability protocol periodically (or when a certain number of bytes have been received) initiates a consensus protocol, which multicasts a stable message containing the highest message numbers for a given member. This is called a digest.
When everyone has received everybody else's stable messages, a digest is computed which consists of the minimum sequence numbers of all received digests so far. This is the stability vector, and contain only message sequence numbers that have been seen by everyone.
This stability vector is the broadcast to the group and everyone can remove messages from their retransmission tables whose sequence numbers are smaller than the ones received in the stability vector. These messages can then be garbage collected.
STABLE garbage collects messages that have been seen by all members of a cluster. Each member has to store all messages because it may be asked to retransmit. Only when we are sure that all members have seen a message can it be removed from the retransmission buffers. STABLE periodically gossips its highest and lowest messages seen. The lowest value is used to compute the min (all lowest seqnos for all members), and messages with a seqno below that min can safely be discarded.
Note that STABLE can also be configured to run when N bytes have been received. This is recommended when sending messages at a high rate, because sending stable messages based on time might accumulate messages faster than STABLE can garbage collect them.
Table 7.28. Properties
Name | Description |
---|---|
cap | Max percentage of the max heap (-Xmx) to be used for max_bytes. Only used if ergonomics is enabled. 0 disables setting max_bytes dynamically. |
desired_avg_gossip | Average time to send a STABLE message |
max_bytes | Maximum number of bytes received in all messages before sending a STABLE message is triggered |
send_stable_msgs_to_coord_only | Wether or not to send the STABLE messages to all members of the cluster, or to the current coordinator only. The latter reduces the number of STABLE messages, but also generates more work on the coordinator |
stability_delay | Delay before stability message is sent |
Group membership takes care of joining new members, handling leave requests by existing members, and handling SUSPECT messages for crashed members, as emitted by failure detection protocols. The algorithm for joining a new member is essentially:
- loop - find initial members (discovery) - if no responses: - become singleton group and break out of the loop - else: - determine the coordinator (oldest member) from the responses - send JOIN request to coordinator - wait for JOIN response - if JOIN response received: - install view and break out of the loop - else - sleep for 5 seconds and continue the loop
Table 7.29. Properties
Name | Description |
---|---|
flushInvokerClass | |
handle_concurrent_startup | Temporary switch. Default is true and should not be changed |
join_timeout | Join timeout |
leave_timeout | Leave timeout |
log_collect_msgs | Logs failures for collecting all view acks if true |
log_view_warnings | Logs warnings for reception of views less than the current, and for views which don't include self |
max_bundling_time | Max view bundling timeout if view bundling is turned on. Default is 50 msec |
max_join_attempts | Number of join attempts before we give up and become a singleton. Zero means 'never give up'. |
merge_timeout | Timeout (in ms) to complete merge |
num_prev_mbrs | Max number of old members to keep in history. Default is 50 |
num_prev_views | Number of views to store in history |
print_local_addr | Print local address of this member after connect. Default is true |
print_physical_addrs | Print physical address(es) on startup |
resume_task_timeout | Timeout to resume ViewHandler |
use_flush_if_present | Use flush for view changes. Default is true |
view_ack_collection_timeout | Time in ms to wait for all VIEW acks (0 == wait forever. Default is 2000 msec |
view_bundling | View bundling toggle |
Consider the following situation: a new member wants to join a group. The prodedure to do so is:
Multicast an (unreliable) discovery request (ping)
Wait for n responses or m milliseconds (whichever is first)
Every member responds with the address of the coordinator
If the initial responses are > 0: determine the coordinator and start the JOIN protocolg
If the initial response are 0: become coordinator, assuming that no one else is out there
However, the problem is that the initial mcast discovery request might get lost, e.g. when multiple members start at the same time, the outgoing network buffer might overflow, and the mcast packet might get dropped. Nobody receives it and thus the sender will not receive any responses, resulting in an initial membership of 0. This could result in multiple coordinators, and multiple subgroups forming. How can we overcome this problem ? There are two solutions:
Increase the timeout, or number of responses received. This will only help if the reason of the empty membership was a slow host. If the mcast packet was dropped, this solution won't help
Add the MERGE2 or MERGE3 protocol. This doesn't actually prevent multiple initial cordinators, but rectifies the problem by merging different subgroups back into one. Note that this might involve state merging which needs to be done by the application.
Flow control takes care of adjusting the rate of a message sender to the rate of the slowest receiver over time. If a sender continuously sends messages at a rate that is faster than the receiver(s), the receivers will either queue up messages, or the messages will get discarded by the receiver(s), triggering costly retransmissions. In addition, there is spurious traffic on the cluster, causing even more retransmissions.
Flow control throttles the sender so the receivers are not overrun with messages.
Note that flow control can be bypassed by setting message flag Message.NO_FC. See Section 5.13, “Tagging messages with flags” for details.
The properties for FlowControl
are shown below and can be used in
MFC and UFC:
Table 7.30. Properties
Name | Description |
---|---|
ignore_synchronous_response | Does not block a down message if it is a result of handling an up message in thesame thread. Fixes JGRP-928 |
max_block_time | Max time (in milliseconds) to block. Default is 5000 msec |
max_block_times | Max times to block for the listed messages sizes (Message.getLength()). Example: "1000:10,5000:30,10000:500" |
max_credits | Max number of bytes to send per receiver until an ack must be received to proceed |
min_credits | Computed as max_credits x min_theshold unless explicitly set |
min_threshold | The threshold (as a percentage of max_credits) at which a receiver sends more credits to a sender. Example: if max_credits is 1'000'000, and min_threshold 0.25, then we send ca. 250'000 credits to P once we've got only 250'000 credits left for P (we've received 750'000 bytes from P) |
FC uses a credit based system, where each sender has max_credits
credits and decrements
them whenever a message is sent. The sender blocks when the credits fall below 0, and only resumes
sending messages when it receives a replenishment message from the receivers.
The receivers maintain a table of credits for all senders and decrement the given sender's credits as well, when a message is received.
When a sender's credits drops below a threshold, the receiver will send a replenishment message to
the sender. The threshold is defined by min_bytes
or min_threshold
.
Table 7.31. Properties
Name | Description |
---|---|
ignore_synchronous_response | Does not block a down message if it is a result of handling an up message in thesame thread. Fixes JGRP-928 |
max_block_time | Max time (in milliseconds) to block. Default is 5000 msec |
max_block_times | Max times to block for the listed messages sizes (Message.getLength()). Example: "1000:10,5000:30,10000:500" |
max_credits | Max number of bytes to send per receiver until an ack must be received to proceed. Default is 500000 bytes |
min_credits | Computed as max_credits x min_theshold unless explicitly set |
min_threshold | The threshold (as a percentage of max_credits) at which a receiver sends more credits to a sender. Example: if max_credits is 1'000'000, and min_threshold 0.25, then we send ca. 250'000 credits to P once we've received 250'000 bytes from P |
FC has been deprecated, use MFC and UFC instead.
In 2.10, FC was separated into MFC (Multicast Flow Control) and Unicast Flow Control (UFC). The reason was that multicast flow control should not be impeded by unicast flow control, and vice versa. Also, performance for the separate implementations could be increased, plus they can be individually omitted. For example, if no unicast flow control is needed, UFC can be left out of the stack configuration.
FRAG and FRAG2 fragment large messages into smaller ones, send the smaller ones, and at the receiver side, the smaller fragments will get assembled into larger messages again, and delivered to the application. FRAG and FRAG2 work for both unicast and multicast messages.
The difference between FRAG and FRAG2 is that FRAG2 does 1 less copy than FRAG, so it is the recommended fragmentation protocol. FRAG serializes a message to know the exact size required (including headers), whereas FRAG2 only fragments the payload (excluding the headers), so it is faster.
The properties of FRAG2 are:
Table 7.32. Properties
Name | Description |
---|---|
frag_size | The max number of bytes in a message. Larger messages will be fragmented |
Contrary to FRAG, FRAG2 does not need to serialize a message in order to break it into smaller fragments: it looks only at the message's buffer, which is a byte array anyway. We assume that the size addition for headers and src and dest addresses is minimal when the transport finally has to serialize the message, so we add a constant (by default 200 bytes). Because of the efficiency gained by not having to serialize the message just to determine its size, FRAG2 is generally recommended over FRAG.
SEQUENCER provider total order for multicast (=group) messages by forwarding messages to the current coordinator, which then sends the messages to the cluster on behalf of the original sender. Because it is always the same sender (whose messages are delivered in FIFO order), a global (or total) order is established.
Sending members add every forwarded message M to a buffer and remove M when they receive it. Should the current coordinator crash, all buffered messages are forwarded to the new coordinator.
Table 7.33. Properties
Name | Description |
---|---|
delivery_table_max_size | Size of the set to store received seqnos (for duplicate checking) |
threshold | Number of acks needed before going from ack-mode to normal mode. 0 disables this, which means that ack-mode is always on |
A total order anycast is a totally ordered message sent to a subset of the cluster members. TOA intercepts messages with an AnycastMessage (carrying a list of addresses) and handles sending of the message in total order. Say the cluster is {A,B,C,D,E} and the Anycast is to {B,C}.
Skeen's algorithm is used to send the message: B and C each maintain a logical clock (a counter). When a message is to be sent, TOA contacts B and C and asks them for their counters. B and C return their counters (incrementing them for the next request).
The originator of the message then sets the message's ID to be the max of all returned counters and sends the message. Receivers then deliver the messages in order of their IDs.
The main use of TOA is currently in Infinispan's transactional caches with partial replication: it is used to apply transactional modifications in total order, so that no two-phase commit protocol has to be run and no locks have to be acquired.
As shown in "Exploiting Total Order Multicast in Weakly Consistent Transactional Caches", when we have many conflicts by different transactions modifying the same keys, TOM fares better than 2PC.
Note that TOA is experimental (as of 3.1).
STATE_TRANSFER is the existing transfer protocol, which transfers byte[] buffers around. However, at the
state provider's side, JGroups creates an output stream over the byte[] buffer, and passes the
ouput stream to the getState(OutputStream)
callback, and at the state
requester's side, an input stream is created and passed to the
setState(InputStream)
callback.
This allows us to continue using STATE_TRANSFER, until the new state transfer protocols are going to replace it (perhaps in 4.0).
In order to transfer application state to a joining member of a cluster, STATE_TRANSFER has to load entire state into memory and send it to a joining member. The major limitation of this approach is that for state transfers that are very large this would likely result in memory exhaustion.
For large state transfer use either the STATE or STATE_SOCK protocol. However, if the state is small, STATE_TRANSFER is okay.
StreamingStateTransfer
is the superclass of STATE and STATE_SOCK (see below).
Its properties are:
Table 7.34. Properties
Name | Description |
---|---|
buffer_size | Size (in bytes) of the state transfer buffer |
max_pool | Maximum number of pool threads serving state requests |
pool_thread_keep_alive | Keep alive for pool threads serving state requests |
STATE was renamed from (2.x) STREAMING_STATE_TRANSFER, and refactored to extend a common superclass
StreamingStateTransfer
. The other state transfer protocol extending
StreamingStateTransfer
is STATE_SOCK (see Section 3.8.11.1.3, “STATE_SOCK”).
STATE
uses a streaming approach to state transfer; the
state provider writes its state to the output stream passed to it in the
getState(OutputStream)
callback, which chunks the stream up into chunks
that are sent to the state requester in separate messages.
The state requester receives those chunks and feeds them into the input stream from which the
state is read by the setState(InputStream)
callback.
The advantage compared to STATE_TRANSFER is that state provider and requester only need small (transfer) buffers to keep a part of the state in memory, whereas STATE_TRANSFER needs to copy the entire state into memory.
If we for example have a list of 1 million elements, then STATE_TRANSFER would have to create a byte[] buffer out of it, and return the byte[] buffer, whereas a streaming approach could iterate through the list and write each list element to the output stream. Whenever the buffer capacity is reached, we'd then send a message and the buffer would be reused to receive more data.
STATE_SOCK is also a streaming state transfer protocol, but compared to STATE, it doesn't send the chunks as messages, but uses a TCP socket connection between state provider and requester to transfer the state.
The state provider creates a server socket at a configurable bind address and port, and the address
and port are sent back to a state requester in the state response. The state requester then establishes
a socket connection to the server socket and passes the socket's input stream to the
setState(InputStream)
callback.
The configuration options of STATE_SOCK are listed below:
Table 7.35. Properties
Name | Description |
---|---|
bind_addr | The interface (NIC) used to accept state requests. The following special values are also recognized: GLOBAL, SITE_LOCAL, LINK_LOCAL and NON_LOOPBACK |
bind_interface_str | The interface (NIC) which should be used by this transport |
bind_port | The port listening for state requests. Default value of 0 binds to any (ephemeral) port |
external_addr | Use "external_addr" if you have hosts on different networks, behind firewalls. On each firewall, set up a port forwarding rule (sometimes called "virtual server") to the local IP (e.g. 192.168.1.100) of the host then on each host, set "external_addr" TCP transport parameter to the external (public IP) address of the firewall. |
external_port | Used to map the internal port (bind_port) to an external port. Only used if > 0 |
BARRIER is used by some of the state transfer protocols, as it lets existing threads complete and blocks new threads to get both the digest and state in one go.
In 3.1, a new mechanism for state transfer will be implemented, eliminating the need for BARRIER. Until then, BARRIER should be used when one of the state transfer protocols is used. BARRIER is part of every default stack which contains a state transfer protocol.
Table 7.36. Properties
Name | Description |
---|---|
max_close_time | Max time barrier can be closed. Default is 60000 ms |
Flushing forces group members to send all their pending messages prior to a certain event. The process of flushing acquiesces the cluster so that state transfer or a join can be done. It is also called the stop-the-world model as nobody will be able to send messages while a flush is in process. Flush is used in:
State transfer
When a member requests state transfer, it tells everyone to stop sending messages and waits for everyone's ack. Then it have received everyone's asks, the application asks the coordinator for its state and ships it back to the requester. After the requester has received and set the state successfully, the requester tells everyone to resume sending messages.
View changes (e.g.a join). Before installing a new view V2, flushing ensures that all messages sent in the current view V1 are indeed delivered in V1, rather than in V2 (in all non-faulty members). This is essentially Virtual Synchrony.
FLUSH is designed as another protocol positioned just below the channel, on top of the stack (e.g. above STATE_TRANSFER). The STATE_TRANSFER and GMS protocols request a flush by sending an event up the stack, where it is handled by the FLUSH protcol. Another event is sent back by the FLUSH protocol to let the caller know that the flush has completed. When done (e.g. view was installed or state transferred), the protocol sends a message, which will allow everyone in the cluster to resume sending.
A channel is notified that the FLUSH phase has been started by
the Receiver.block()
callback.
Read more about flushing in Section 5.7, “Flushing: making sure every node in the cluster received a message”.
Table 7.37. Properties
Name | Description |
---|---|
bypass | When set, FLUSH is bypassed, same effect as if FLUSH wasn't in the config at all |
enable_reconciliation | Reconciliation phase toggle. Default is true |
end_flush_timeout | Timeout to wait for UNBLOCK after STOP_FLUSH is issued. Default is 2000 msec |
retry_timeout | Retry timeout after an unsuccessful attempt to quiet the cluster (first flush phase). Default is 3000 msec |
start_flush_timeout | Timeout (per atttempt) to quiet the cluster during the first flush phase. Default is 2000 msec |
timeout | Max time to keep channel blocked in flush. Default is 8000 msec |
STATS exposes various statistics, e.g. number of received multicast and unicast messages, number of bytes sent etc. It should be placed directly over the transport
JGroups provides protocols to encrypt cluster traffic (ENCRYPT), and to make sure that only authorized members can join a cluster (AUTH).
A detailed description of ENCRYPT is found in the JGroups source (JGroups/doc/ENCRYPT.html
).
Encryption by default only encrypts the message body, but doesn't encrypt message headers.
To encrypt the entire message (including all headers, plus destination and source addresses),
the property encrypt_entire_message
has to be set to true.
Also, ENCRYPT has to be below any protocols whose headers we want to encrypt, e.g.
<config ... >
<UDP />
<PING />
<MERGE2 />
<FD />
<VERIFY_SUSPECT />
<ENCRYPT encrypt_entire_message="false"
sym_init="128" sym_algorithm="AES/ECB/PKCS5Padding"
asym_init="512" asym_algorithm="RSA"/>
<pbcast.NAKACK />
<UNICAST />
<pbcast.STABLE />
<FRAG2 />
<pbcast.GMS />
</config>
Note that ENCRYPT sits below NAKACK and UNICAST, so the sequence numbers for these 2 protocols will be encrypted. Had ENCRYPT been placed below UNICAST but above NAKACK, then only UNICAST's headers (including sequence numbers) would have been encrypted, but not NAKACKs.
Note that it doesn't make too much sense to place ENCRYPT even lower in the stack, because then almost all traffic (even merge or discovery traffic) will be encrypted, which may be somewhat of a performance drag.
When we encrypt an entire message, we have to marshal the message into a byte buffer first and then encrypt it. This entails marshalling and copying of the byte buffer, which is not so good performance wise...
ENCRYPT uses store type JCEKS (for details between JKS and JCEKS see here), however
keytool
uses JKS, therefore a keystore generated with keytool will not be accessible.
To generate a keystore compatible with JCEKS, use the following command line options to keytool:
keytool -genseckey -alias myKey -keypass changeit -storepass changeit -keyalg Blowfish -keysize 56 -keystore defaultStore.keystore -storetype JCEKS
ENCRYPT could then be configured as follows:
<ENCRYPT key_store_name="defaultStore.keystore"
store_password="changeit"
alias="myKey"/>
Note that defaultStore.keystore will have to be found in the claspath.
Table 7.38. Properties
Name | Description |
---|---|
alias | Alias used for recovering the key. Change the default |
asymAlgorithm | Cipher engine transformation for asymmetric algorithm. Default is RSA |
asymInit | Initial public/private key length. Default is 512 |
asymProvider | Cryptographic Service Provider. Default is Bouncy Castle Provider |
encrypt_entire_message | |
keyPassword | Password for recovering the key. Change the default |
keyStoreName | File on classpath that contains keystore repository |
storePassword | Password used to check the integrity/unlock the keystore. Change the default |
symAlgorithm | Cipher engine transformation for symmetric algorithm. Default is AES |
symInit | Initial key length for matching symmetric algorithm. Default is 128 |
symProvider | Cryptographic Service Provider. Default is Bouncy Castle Provider |
AUTH is used to provide a layer of authentication to JGroups. This allows you to define pluggable security that defines if a node should be allowed to join a cluster. AUTH sits below the GMS protocol and listens for JOIN REQUEST messages. When a JOIN REQUEST is received it tries to find an AuthHeader object, inside of which should be an implementation of the AuthToken object.
AuthToken is an abstract class, implementations of which are responsible for providing the actual authentication mechanism. Some basic implementations of AuthToken are provide in the org.jgroups.auth package (SimpleToken, MD5Token and X509Token). Effectivly all these implementations do is encrypt a string (found in the jgroups config) and pass that on the JOIN REQUEST.
When authentication is successful, the message is simply passed up the stack to the GMS protocol. When it fails, the AUTH protocol creates a JOIN RESPONSE message with a failure string and passes it back down the stack. This failure string informs the client of the reason for failure. Clients will then fail to join the group and will throw a SecurityException. If this error string is null then authentication is considered to have passed.
For more information refer to the wiki at http://community.jboss.org/wiki/JGroupsAUTH.
COMPRESS compresses messages larger than min_size
, and uncompresses them at the
receiver's side. Property compression_level
determines how thorough the
compression algorith should be (0: no compression, 9: highest compression).
Table 7.40. Properties
Name | Description |
---|---|
compression_level | Compression level (from java.util.zip.Deflater) (0=no compression, 1=best speed, 9=best compression). Default is 9 |
min_size | Minimal payload size of a message (in bytes) for compression to kick in. Default is 500 bytes |
pool_size | Number of inflaters/deflaters for concurrent processing. Default is 2 |
As discussed in Section 5.4.4, “Scopes: concurrent message delivery for messages from the same sender”, the SCOPE protocol is used to deliver updates to different scopes concurrently. It has to be placed somewhere above UNICAST and NAKACK.
SCOPE has a separate thread pool. The reason why the default thread pool from the transport wasn't used is that the default thread pool has a different purpose. For example, it can use a queue to which all incoming messages are added, which would defy the purpose of concurrent delivery in SCOPE. As a matter of fact, using a queue would most likely delay messages get sent up into SCOPE !
Also, the default pool's rejection policy might not be "run", so the SCOPE implementation would have to catch rejection exceptions and engage in a retry protocol, which is complex and wastes resources.
The configuration of the thread pool is shown below. If you expect concurrent messages to N different scopes, then the max pool size would ideally be set to N. However, in most cases, this is not necessary as (a) the messages might not be to different scopes or (b) not all N scopes might get messages at the same time. So even if the max pool size is a bit smaller, the cost of this is slight delays, in the sense that a message for scope Y might wait until the thread processing message for scope X is available.
To remove unused scopes, an expiry policy is provided: expiration_time is the number of milliseconds after which an idle scope is removed. An idle scope is a scope which hasn't seen any messages for expiration_time milliseconds. The expiration_interval value defines the number of milliseconds at which the expiry task runs. Setting both values to 0 disables expiration; it would then have to be done manually (see Section 5.4.4, “Scopes: concurrent message delivery for messages from the same sender” for details).
Table 7.41. Properties
Name | Description |
---|---|
expiration_interval | Interval in milliseconds at which the expiry task tries to remove expired scopes |
expiration_time | Time in milliseconds after which an expired scope will get removed. An expired scope is one to which no messages have been added in max_expiration_time milliseconds. 0 never expires scopes |
thread_naming_pattern | Thread naming pattern for threads in this channel. Default is cl |
thread_pool.keep_alive_time | Timeout in milliseconds to remove idle thread from regular pool |
thread_pool.max_threads | Maximum thread pool size for the regular thread pool |
thread_pool.min_threads | Minimum thread pool size for the regular thread pool |
RELAY bridges traffic between seperate clusters, see Section 5.10, “Bridging between remote clusters” for details.
Table 7.42. Properties
Name | Description |
---|---|
bridge_name | Name of the bridge cluster |
bridge_props | Properties of the bridge cluster (e.g. tcp.xml) |
present_global_views | Drops views received from below and instead generates global views and passes them up. A global view consists of the local view and the remote view, ordered by view ID. If true, no protocolwhich requires (local) views can sit on top of RELAY |
relay | If set to false, don't perform relaying. Used e.g. for backup clusters; unidirectional replication from one cluster to another, but not back. Can be changed at runtime |
site | Description of the local cluster, e.g. "nyc". This is added to every address, so itshould be short. This is a mandatory property and must be set |
RELAY2 provides clustering between different sites (local clusters), for multicast and unicast messages. See Section 5.11, “Relaying between multiple sites (RELAY2)” for details.
Table 7.43. Properties
Name | Description |
---|---|
async_relay_creation | If true, the creation of the relay channel (and the connect()) are done in the background. Async relay creation is recommended, so the view callback won't be blocked |
can_become_site_master | Whether or not this node can become the site master. If false, and we become the coordinator, we won't start the bridge(s) |
config | Name of the relay configuration |
enable_address_tagging | Whether or not we generate our own addresses in which we use can_become_site_master. If this property is false, can_become_site_master is ignored |
forward_sleep | The time (in milliseconds) to sleep between forward attempts |
fwd_queue_max_size | Max number of messages in the foward queue. Messages are added to the forward queue when the status of a route went from UP to UNKNOWN and the queue is flushed when the status goes to UP (resending all queued messages) or DOWN (sending SITE-UNREACHABLE messages to the senders) |
max_forward_attempts | The number of tries to forward a message to a remote site |
relay_multicasts | Whether or not to relay multicast (dest=null) messages |
site | Name of the site (needs to be defined in the configuration) |
site_down_timeout | Number of millisconds to wait when the status for a site changed from UP to UNKNOWN before that site is declared DOWN. A site that's DOWN triggers immediate sending of a SITE-UNREACHABLE message back to the sender of a message to that site |
warn_when_ftc_missing | If true, logs a warning if the FORWARD_TO_COORD protocol is not found. This property might get deprecated soon |
STOMP is discussed in Section 5.9, “STOMP support”. The properties for it are shown below:
Table 7.44. Properties (experimental)
Name | Description |
---|---|
bind_addr | The bind address which should be used by the server socket. The following special values are also recognized: GLOBAL, SITE_LOCAL, LINK_LOCAL and NON_LOOPBACK |
endpoint_addr | If set, then endpoint will be set to this address |
exact_destination_match | If set to false, then a destination of /a/b match /a/b/c, a/b/d, a/b/c/d etc |
forward_non_client_generated_msgs | Forward received messages which don't have a StompHeader to clients |
port | Port on which the STOMP protocol listens for requests |
send_info | If true, information such as a list of endpoints, or views, will be sent to all clients (via the INFO command). This allows for example intelligent clients to connect to a different server should a connection be closed. |
The DAISYCHAIN protocol is discussed in Section 5.12, “Daisychaining”.
Table 7.45. Properties (experimental)
Name | Description |
---|---|
forward_queue_size | The number of messages in the forward queue. This queue is used to host messages that need to be forwarded by us on behalf of our neighbor |
loopback | Loop back multicast messages |
send_queue_size | The number of messages in the send queue. This queue is used to host messages that need to be sent |
RATE_LIMITER can be used to set a limit on the data sent per time unit. When sending data, only max_bytes can be sent per time_period milliseconds. E.g. if max_bytes="50M" and time_period="1000", then a sender can only send 50MBytes / sec max.
Table 7.46. Properties (experimental)
Name | Description |
---|---|
max_bytes | Max number of bytes to be sent in time_period ms. Blocks the sender if exceeded until a new time period has started |
time_period | Number of milliseconds during which max_bytes bytes can be sent |
There are currently 2 locking protocols: org.jgroups.protocols.CENTRAL_LOCK and
org.jgroups.protocols.PEER_LOCK. Both extend Locking
, which has the
following properties:
CENTRAL_LOCK has the current coordinator of a cluster grants locks, so every node has to communicate with the coordinator to acquire or release a lock. Lock requests by different nodes for the same lock are processed in the order in which they are received.
A coordinator maintains a lock table. To prevent losing the knowledge of who holds which locks, the coordinator can push lock information to a number of backups defined by num_backups. If num_backups is 0, no replication of lock information happens. If num_backups is greater than 0, then the coordinator pushes information about acquired and released locks to all backup nodes. Topology changes might create new backup nodes, and lock information is pushed to those on becoming a new backup node.
The advantage of CENTRAL_LOCK is that all lock requests are granted in the same order across the cluster, which is not the case with PEER_LOCK.
Table 7.48. Properties
Name | Description |
---|---|
num_backups | Number of backups to the coordinator. Server locks get replicated to these nodes as well |
PEER_LOCK acquires a lock by contacting all cluster nodes, and lock acquisition is only successful if all non-faulty cluster nodes (peers) grant it.
Unless a total order configuration is used (e.g. org.jgroups.protocols.SEQUENCER based), lock requests for the same resource from different senders may be received in different order, so deadlocks can occur. Example:
To acquire a lock, we need lock grants from both A and B, but this will never happen here. To fix this, either add SEQUENCER to the configuration, so that all lock requests are received in the same global order at both A and B, or use java.util.concurrent.locks.Lock.tryLock(long,javaTimeUnit) with retries if a lock cannot be acquired.
CENTRAL_EXECUTOR is an implementation of Executing which is needed by the ExecutionService.
Table 7.50. Properties
Name | Description |
---|---|
num_backups | Number of backups to the coordinator. Queue State gets replicated to these nodes as well |
COUNTER is the implementation of cluster wide counters, used by the CounterService.
Table 7.51. Properties (experimental)
Name | Description |
---|---|
bypass_bundling | Bypasses message bundling if true |
num_backups | Number of backup coordinators. Modifications are asynchronously sent to all backup coordinators |
reconciliation_timeout | Number of milliseconds to wait for reconciliation responses from all current members |
timeout | Request timeouts (in ms). If the timeout elapses, a Timeout (runtime) exception will be thrown |
SUPERVISOR is a protocol which runs rules which periodically (or event triggered) check conditions and take corrective action if a condition is not met. Example: org.jgroups.protocols.rules.CheckFDMonitor is a rule which periodically checks if FD's monitor task is running when the cluster size is > 1. If not, the monitor task is started.
The SUPERVISOR is explained in more detail in Section 5.16, “Supervising a running stack”
Table 7.52. Properties
Name | Description |
---|---|
config | Location of an XML file listing the rules to be installed |
[9] Note that NAKACK can also be configured to send retransmission requests for M to anyone in the cluster, rather than only to the sender of M.