Class Merger.MergeTask

  • All Implemented Interfaces:
    java.lang.Runnable
    Enclosing class:
    Merger

    protected class Merger.MergeTask
    extends java.lang.Object
    implements java.lang.Runnable
    Starts the merge protocol (only run by the merge leader). Essentially sends a MERGE_REQ to all coordinators of all subgroups found. Each coord receives its digest and view and returns it. The leader then computes the digest and view for the new group from the return values. Finally, it sends this merged view/digest to all subgroup coordinators; each coordinator will install it in their subgroup.
    • Field Detail

      • thread

        protected java.lang.Thread thread
      • coords

        protected final java.util.concurrent.ConcurrentMap<Address,​java.util.Collection<Address>> coords
        List of all subpartition coordinators and their members
      • subviews

        protected final java.util.Set<View> subviews
    • Constructor Detail

      • MergeTask

        protected MergeTask()
    • Method Detail

      • start

        public void start​(java.util.Map<Address,​View> views)
        Parameters:
        views - Guaranteed to be non-null and to have >= 2 members, or else this thread would not be started
      • stop

        public void stop()
      • isRunning

        public boolean isRunning()
      • run

        public void run()
        Specified by:
        run in interface java.lang.Runnable
      • _run

        protected void _run​(MergeId new_merge_id,
                            java.util.Collection<Address> coordsCopy)
                     throws java.lang.Exception
        Runs the merge protocol as a leader
        Throws:
        java.lang.Exception
      • getMergeDataFromSubgroupCoordinators

        protected boolean getMergeDataFromSubgroupCoordinators​(java.util.Map<Address,​java.util.Collection<Address>> coords,
                                                               MergeId new_merge_id,
                                                               long timeout)
        Sends a MERGE_REQ to all coords and populates a list of MergeData (in merge_rsps). Returns after coords.size() response have been received, or timeout msecs have elapsed (whichever is first).

        If a subgroup coordinator rejects the MERGE_REQ (e.g. because of participation in a different merge), that member will be removed from coords !

        Parameters:
        coords - A map of coordinatgor addresses and associated membership lists
        new_merge_id - The new merge id
        timeout - Max number of msecs to wait for the merge responses from the subgroup coords
      • removeRejectedMergeRequests

        protected void removeRejectedMergeRequests​(java.util.Collection<Address> coords)
        Removed rejected merge requests from merge_rsps and coords. This method has a lock on merge_rsps
      • consolidateMergeData

        protected MergeData consolidateMergeData​(java.util.List<MergeData> merge_rsps,
                                                 java.util.List<View> subviews)
        Merge all MergeData. All MergeData elements should be disjunct (both views and digests). However, this method is prepared to resolve duplicate entries (for the same member). The resolution strategy for views is to merge only 1 of the duplicate members. Resolution strategy for digests is to take the higher seqnos for duplicate digests.

        After merging all members into a Membership and subsequent sorting, the first member of the sorted membership will be the new coordinator. This method has a lock on merge_rsps.

        Parameters:
        merge_rsps - A list of MergeData items. Elements with merge_rejected=true were removed before. Is guaranteed not to be null and to contain at least 1 member
        subviews - Contains a list of Views, each View is a subgroup
      • consolidateDigests

        protected MutableDigest consolidateDigests​(View new_view,
                                                   java.util.List<MergeData> merge_rsps)
        Merge all digests into one. For each sender, the new value is max(highest_delivered), max(highest_received). This method has a lock on merge_rsps