View source with raw comments or as raw
    1/*  Part of SWI-Prolog
    2
    3    Author:        Jeffrey Rosenwald, Jan Wielemaker
    4    E-mail:        jeffrose@acm.org
    5    WWW:           http://www.swi-prolog.org
    6    Copyright (c)  2009-2019, Jeffrey Rosenwald
    7                   CWI, Amsterdam
    8    All rights reserved.
    9
   10    Redistribution and use in source and binary forms, with or without
   11    modification, are permitted provided that the following conditions
   12    are met:
   13
   14    1. Redistributions of source code must retain the above copyright
   15       notice, this list of conditions and the following disclaimer.
   16
   17    2. Redistributions in binary form must reproduce the above copyright
   18       notice, this list of conditions and the following disclaimer in
   19       the documentation and/or other materials provided with the
   20       distribution.
   21
   22    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
   23    "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
   24    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
   25    FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
   26    COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
   27    INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
   28    BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
   29    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
   30    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   31    LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
   32    ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
   33    POSSIBILITY OF SUCH DAMAGE.
   34*/
   35
   36:- module(paxos,
   37          [ paxos_get/1,                        % ?Term
   38            paxos_get/2,                        % +Key, -Value
   39            paxos_get/3,                        % +Key, -Value, +Options
   40            paxos_set/1,                        % ?Term
   41            paxos_set/2,                        % +Key, +Value
   42            paxos_set/3,                        % +Key, +Value, +Options
   43            paxos_on_change/2,                  % ?Term, +Goal
   44            paxos_on_change/3,                  % ?Key, ?Value, +Goal
   45
   46            paxos_initialize/1,			% +Options
   47
   48            paxos_admin_key/2,                  % ?Name, ?Key
   49            paxos_property/1,                   % ?Property
   50            paxos_quorum_ask/4,                 % ?Templ, +Msg, -Result, +Options
   51                                                % Hook support
   52            paxos_replicate_key/3               % +Nodes, ?Key, +Options
   53          ]).   54:- use_module(library(broadcast)).   55:- use_module(library(debug)).   56:- use_module(library(lists)).   57:- use_module(library(settings)).   58:- use_module(library(option)).   59:- use_module(library(error)).   60:- use_module(library(apply)).   61:- use_module(library(solution_sequences)).

A Replicated Data Store

This module provides a replicated data store that is coordinated using a variation on Lamport's Paxos concensus protocol. The original method is described in his paper entitled, "The Part-time Parliament", which was published in 1998. The algorithm is tolerant of non-Byzantine failure. That is late or lost delivery or reply, but not senseless delivery or reply. The present algorithm takes advantage of the convenience offered by multicast to the quorum's membership, who can remain anonymous and who can come and go as they please without effecting Liveness or Safety properties.

Paxos' quorum is a set of one or more attentive members, whose processes respond to queries within some known time limit (< 20ms), which includes roundtrip delivery delay. This property is easy to satisfy given that every coordinator is necessarily a member of the quorum as well, and a quorum of one is permitted. An inattentive member (e.g. one whose actions are late or lost) is deemed to be "not-present" for the purposes of the present transaction and consistency cannot be assured for that member. As long as there is at least one attentive member of the quorum, then persistence of the database is assured.

Each member maintains a ledger of terms along with information about when they were originally recorded. The member's ledger is deterministic. That is to say that there can only be one entry per functor/arity combination. No member will accept a new term proposal that has a line number that is equal-to or lower-than the one that is already recorded in the ledger.

Paxos is a three-phase protocol:

1: A coordinator first prepares the quorum for a new proposal by broadcasting a proposed term. The quorum responds by returning the last known line number for that functor/arity combination that is recorded in their respective ledgers.
2: The coordinator selects the highest line number it receives, increments it by one, and then asks the quorum to finally accept the new term with the new line number. The quorum checks their respective ledgers once again and if there is still no other ledger entry for that functor/arity combination that is equal-to or higher than the specified line, then each member records the term in the ledger at the specified line. The member indicates consent by returning the specified line number back to the coordinator. If consent is withheld by a member, then the member returns a nack instead. The coordinator requires unanimous consent. If it isn't achieved then the proposal fails and the coordinator must start over from the beginning.
3: Finally, the coordinator concludes the successful negotiation by broadcasting the agreement to the quorum in the form of a paxos_changed(Key,Value) event. This is the only event that should be of interest to user programs.

For practical reasons, we rely on the partially synchronous behavior (e.g. limited upper time bound for replies) of broadcast_request/1 over TIPC to ensure Progress. Perhaps more importantly, we rely on the fact that the TIPC broadcast listener state machine guarantees the atomicity of broadcast_request/1 at the process level, thus obviating the need for external mutual exclusion mechanisms.

Note that this algorithm does not guarantee the rightness of the value proposed. It only guarantees that if successful, the value proposed is identical for all attentive members of the quorum.

author
- Jeffrey Rosenwald (JeffRose@acm.org)
See also
- tipc_broadcast.pl, udp_broadcast.pl */
license
- BSD-2
  133:- meta_predicate
  134    paxos_on_change(?, 0),
  135    paxos_on_change(?, ?, 0).  136
  137:- multifile
  138    paxos_message_hook/3,               % +PaxOS, +TimeOut, -Message
  139    paxos_ledger_hook/5.                % +Op, ?Key, ?Gen, ?Value, ?Status
  140
  141:- setting(max_sets, nonneg, 20,
  142           "Max Retries to get to an agreement").  143:- setting(max_gets, nonneg, 5,
  144           "Max Retries to get a value from the forum").  145:- setting(response_timeout, float, 0.020,
  146           "Max time to wait for a response").  147:- setting(replication_rate, number, 1000,
  148           "Number of keys replicated per second").  149:- setting(death_half_life, number, 10,
  150           "Half-time for failure score").  151:- setting(death_score, number, 100,
  152           "Consider a node dead if cummulative failure \c
  153            score exceeds this number").
 paxos_initialize(+Options) is det
Initialize this Prolog process as a paxos node. The initialization requires an initialized and configured TIPC, UDP or other broadcast protocol. Calling this initialization may be omitted, in which case the equivant of paxos_initialize([]) is executed lazily as part of the first paxos operation. Defined options:
node(?NodeID)
When instantiated, this node rejoins the network with the given node id. A fixed node idea should be used if the node is configured for persistency and causes the new node to receive updates for keys that have been created or modified since the node left the network. If NodeID is a variable it is unified with the discovered NodeID.

NodeID must be a small non-negative integer as these identifiers are used in bitmaps.

  175:- dynamic  paxos_initialized/0.  176:- volatile paxos_initialized/0.  177
  178paxos_initialize(_Options) :-
  179    paxos_initialized,
  180    !.
  181paxos_initialize(Options) :-
  182    with_mutex(paxos, paxos_initialize_sync(Options)).
  183
  184paxos_initialize_sync(_Options) :-
  185    paxos_initialized,
  186    !.
  187paxos_initialize_sync(Options) :-
  188    at_halt(paxos_leave),
  189    listen(paxos, paxos(X), paxos_message(X)),
  190    paxos_assign_node(Options),
  191    start_replicator,
  192    asserta(paxos_initialized).
  193
  194paxos_initialize :-
  195    paxos_initialize([]).
  196
  197
  198		 /*******************************
  199		 *            ADMIN		*
  200		 *******************************/
 paxos_get_admin(+Name, -Value) is semidet
 paxos_set_admin(+Name, +Value) is semidet
Set administrative keys. We use a wrapper such that we can hide the key identity.
  208paxos_admin_key(quorum, '$paxos_quorum').
  209paxos_admin_key(dead,   '$paxos_dead_nodes').
  210
  211paxos_get_admin(Name, Value) :-
  212    paxos_admin_key(Name, Key),
  213    paxos_get(Key, Value).
  214
  215paxos_set_admin(Name, Value) :-
  216    paxos_admin_key(Name, Key),
  217    paxos_set(Key, Value).
  218
  219paxos_set_admin_bg(Name, Value) :-
  220    thread_create(ignore(paxos_set_admin(Name, Value)), _,
  221                  [ detached(true)
  222                  ]).
  223
  224
  225		 /*******************************
  226		 *           NODE DATA		*
  227		 *******************************/
 node(?NodeId)
 quorum(?Bitmap)
 dead(?Bitmap)
 failed(?Bitmap)
 failed(?NodeId, ?LastTried, ?Score)
Track our identity as well as as the status of our peers in the network. NodeId is a small integer. Multiple NodeIds are combined in a Bitmap.
  247:- dynamic
  248    node/1,                             % NodeID
  249    quorum/1,                           % Bitmap
  250    failed/1,                           % Bitmap
  251    failed/3,                           % NodeID, LastTried, Score
  252    leaving/0,                          % Node is leaving
  253    dead/1,                             % Bitmap
  254    salt/1.                             % Unique key
  255:- volatile
  256    node/1,
  257    quorum/1,
  258    failed/1,
  259    failed/3,
  260    leaving/0,
  261    dead/1,
  262    salt/1.
 paxos_assign_node(+Options) is det
Assign a node for this paxos instance. If node is given as an option, this is the node id that is used. Otherwise the network is analysed and the system selects a new node.
  270paxos_assign_node(Options) :-
  271    (   option(node(Node), Options)
  272    ->  node(Node)
  273    ;   node(_)
  274    ),                                          % already done
  275    !.
  276paxos_assign_node(Options) :-
  277    between(1, 20, Retry),
  278    option(node(Node), Options, Node),
  279    (   node(_)
  280    ->  permission_error(set, paxos_node, Node)
  281    ;   true
  282    ),
  283    retractall(dead(_)),
  284    retractall(quorum(_)),
  285    retractall(failed(_)),
  286    retractall(failed(_,_,_)),
  287    retractall(leaving),
  288    Salt is random(1<<63),
  289    asserta(salt(Salt)),
  290    paxos_message(node(N,Q,D):From, 0.25, NodeQuery),
  291    findall(t(N,Q,D,From),
  292            broadcast_request(NodeQuery),
  293            Network),
  294    select(t(self,0,Salt,Me), Network, AllNodeStatus),
  295    partition(starting, AllNodeStatus, Starting, Running),
  296    nth_starting(Starting, Salt, Offset),
  297    retractall(salt(_)),
  298    debug(paxos(node), 'Me@~p; starting: ~p; running: ~p',
  299          [Me, Starting, Running]),
  300    arg_union(2, Running, Quorum),
  301    arg_union(3, Running, Dead),
  302    (   var(Node)
  303    ->  (   call_nth(( between(0, 1000, Node),
  304                       \+ memberchk(t(Node,_,_,_), Running),
  305                       Dead /\ (1<<Node) =:= 0),
  306                     Offset)
  307        ->  debug(paxos(node), 'Assigning myself node ~d', [Node])
  308        ;   resource_error(paxos_nodes)
  309        )
  310    ;   memberchk(t(Node,_,_,_), Running)
  311    ->  permission_error(set, paxos_node, Node)
  312    ;   Rejoin = true
  313    ),
  314    asserta(node(Node)),
  315    (   claim_node(Node, Me)
  316    ->  !,
  317        asserta(dead(Dead)),
  318        set_quorum(Node, Quorum),
  319        (   Rejoin == true
  320        ->  paxos_rejoin
  321        ;   true
  322        )
  323    ;   debug(paxos(node), 'Node ~p already claimed; retrying (~p)',
  324              [Node, Retry]),
  325        retractall(node(Node)),
  326        fail
  327    ).
  328
  329starting(t(self,_Quorum,_Salt,_Address)).
  330
  331nth_starting(Starting, Salt, N) :-
  332    maplist(arg(3), Starting, Salts),
  333    sort([Salt|Salts], Sorted),
  334    nth1(N, Sorted, Salt),
  335    !.
  336
  337claim_node(Node, Me) :-
  338    paxos_message(claim_node(Node, Ok):From, 0.25, NodeQuery),
  339    forall((   broadcast_request(NodeQuery),
  340               From \== Me,
  341               debug(paxos(node), 'Claim ~p ~p: ~p', [Node, From, Ok])
  342           ),
  343           Ok == true).
  344
  345set_quorum(Node, Quorum0) :-
  346    Quorum is Quorum0 \/ (1<<Node),
  347    debug(paxos(node), 'Adding ~d to quorum (now 0x~16r)', [Node, Quorum]),
  348    asserta(quorum(Quorum)),
  349    paxos_set_admin(quorum, Quorum).
 paxos_rejoin
Re-join the network. Tasks:
  359paxos_rejoin :-
  360    node(Node),
  361    repeat,
  362        (   paxos_get_admin(dead, Dead0)
  363        ->  Dead is Dead0 /\ \(1<<Node),
  364            (   Dead == Dead0
  365            ->  true
  366            ;   paxos_set_admin(dead, Dead)
  367            )
  368        ;   true
  369        ),
  370    !.
 paxos_leave is det
 paxos_leave(+Node) is det
Leave the network. The predicate paxos_leave/0 is called from at_halt/1 to ensure the node is deleted as the process dies. The paxos_leave/1 version is called to discard other nodes if they repeatedly did not respond to queries.
  380paxos_leave :-
  381    node(Node),
  382    !,
  383    asserta(leaving),
  384    paxos_leave(Node),
  385    Set is 1<<Node,
  386    paxos_message(forget(Set), -, Forget),
  387    broadcast(Forget),
  388    unlisten(paxos),
  389    retractall(leaving).
  390paxos_leave.
  391
  392paxos_leave(Node) :-
  393    !,
  394    paxos_update_set(quorum, del(Node)),
  395    paxos_update_set(dead,   add(Node)).
  396paxos_leave(_).
  397
  398paxos_update_set(Set, How) :-
  399    repeat,
  400      Term =.. [Set,Value],
  401      call(Term),
  402      (   How = add(Node)
  403      ->  NewValue is Value \/  (1<<Node)
  404      ;   How = del(Node)
  405      ->  NewValue is Value /\ \(1<<Node)
  406      ),
  407      (   Value == NewValue
  408      ->  true
  409      ;   paxos_set_admin(Set, NewValue)
  410      ->  true
  411      ;   leaving
  412      ),
  413    !.
  414
  415		 /*******************************
  416		 *          NODE STATUS		*
  417		 *******************************/
 update_failed(+Action, +Quorum, +Alive) is det
We just sent the Quorum a message and got a reply from the set Alive.
Arguments:
is- one of set, get or replicate and indicates the intended action.
  427update_failed(Action, Quorum, Alive) :-
  428    Failed is Quorum /\ \Alive,
  429    alive(Alive),
  430    consider_dead(Failed),
  431    (   failed(Failed)
  432    ->  true
  433    ;   (   clause(failed(_Old), true, Ref)
  434        ->  asserta(failed(Failed)),
  435            erase(Ref),
  436            debug(paxos(node), 'Updated failed quorum to 0x~16r', [Failed])
  437        ;   asserta(failed(Failed))
  438        ),
  439        (   Action == set
  440        ->  start_replicator
  441        ;   true
  442        )
  443    ).
  444
  445consider_dead(0) :-
  446    !.
  447consider_dead(Failed) :-
  448    Node is lsb(Failed),
  449    consider_dead1(Node),
  450    Rest is Failed /\ \(1<<Node),
  451    consider_dead(Rest).
  452
  453consider_dead1(Node) :-
  454    clause(failed(Node, Last, Score), true, Ref),
  455    !,
  456    setting(death_half_life, HalfLife),
  457    setting(death_score, DeathScore),
  458    get_time(Now),
  459    Passed is Now-Last,
  460    NewScore is Score*(2**(-Passed/HalfLife)) + 10,
  461    asserta(failed(Node, Now, NewScore)),
  462    erase(Ref),
  463    (   NewScore < DeathScore
  464    ->  debug(paxos(node), 'Consider node ~d dead', [Node]),
  465        paxos_leave(Node)
  466    ;   true
  467    ).
  468consider_dead1(Node) :-
  469    get_time(Now),
  470    asserta(failed(Node, Now, 10)).
  471
  472alive(Bitmap) :-
  473    (   clause(failed(Node, _Last, _Score), true, Ref),
  474        Bitmap /\ (1<<Node) =\= 0,
  475        erase(Ref),
  476        fail
  477    ;   true
  478    ).
 life_quorum(-Quorum, -LifeQuorum) is det
Find the Quorum and the living nodes from the Quorum. This is the set for which we wait. If the LifeQuorum is not a majority we address the whole Quorum.
To be done
- At some point in time we must remove a node from the quorum.
  489life_quorum(Quorum, LifeQuorum) :-
  490    quorum(Quorum),
  491    (   failed(Failed),
  492        Failed \== 0,
  493        LifeQuorum is Quorum /\ \Failed,
  494        majority(LifeQuorum, Quorum)
  495    ->  true
  496    ;   LifeQuorum = Quorum
  497    ).
  498
  499
  500		 /*******************************
  501		 *        NETWORK STATUS	*
  502		 *******************************/
  503
  504:- paxos_admin_key(quorum, Key),
  505   listen(paxos_changed(Key, Quorum),
  506          update_quorum(Quorum)).  507:- paxos_admin_key(dead, Key),
  508   listen(paxos_changed(Key, Death),
  509          update_dead(Death)).  510
  511update_quorum(Proposed) :-
  512    debug(paxos(node), 'Received quorum proposal 0x~16r', [Proposed]),
  513    quorum(Proposed),
  514    !.
  515update_quorum(Proposed) :-
  516    leaving,
  517    !,
  518    update(quorum(Proposed)).
  519update_quorum(Proposed) :-
  520    node(Node),
  521    Proposed /\ (1<<Node) =\= 0,
  522    !,
  523    update(quorum(Proposed)).
  524update_quorum(Proposed) :-
  525    node(Node),
  526    NewQuorum is Proposed \/ (1<<Node),
  527    update(quorum(NewQuorum)),
  528    debug(paxos(node), 'I''m not in the quorum! Proposing 0x~16r', [NewQuorum]),
  529    paxos_set_admin_bg(quorum, NewQuorum).
  530
  531update_dead(Proposed) :-
  532    debug(paxos(node), 'Received dead proposal 0x~16r', [Proposed]),
  533    dead(Proposed),
  534    !.
  535update_dead(Proposed) :-
  536    leaving,
  537    !,
  538    update(dead(Proposed)).
  539update_dead(Proposed) :-
  540    node(Node),
  541    Proposed /\ (1<<Node) =:= 0,
  542    !,
  543    update(dead(Proposed)).
  544update_dead(Proposed) :-
  545    node(Node),
  546    NewDead is Proposed /\ \(1<<Node),
  547    update(dead(NewDead)),
  548    paxos_set_admin_bg(dead, NewDead).
  549
  550update(Clause) :-
  551    functor(Clause, Name, Arity),
  552    functor(Generic, Name, Arity),
  553    (   clause(Generic, true, Ref)
  554    ->  asserta(Clause),
  555        erase(Ref)
  556    ;   asserta(Clause)
  557    ).
 paxos_property(?Property)
True if Property is a current property for the paxos network. Defined properties are:
  568paxos_property(node(NodeID)) :-
  569    node(NodeID).
  570paxos_property(quorum(Quorum)) :-
  571    quorum(Quorum).
  572paxos_property(failed(Nodes)) :-
  573    failed(Nodes).
  574
  575
  576		 /*******************************
  577		 *         INBOUND EVENTS	*
  578		 *******************************/
 paxos_message(?Message)
Handle inbound actions from our peers. Defines values for Message are:
prepare(+Key, -Node, -Gen, +Value)
A request message to set Key to Value. Returns the current generation at which we have a value or 0 for Gen and the our node id for Node.
accept(+Key, -Node, +Gen, -GenA, +Value)
A request message to set Key to Value if Gen is newer than the generation we have for Key. In that case GenA is Gen. Otherwise we reject using GenA = nack.
changed(+Key, +Gen, +Value, +Acceptors)
The leader got enough accepts for setting Key to Value at Gen. Acceptors is the set of nodes that accepted this value.
learn(+Key, -Node, +Gen, -GenA, +Value)
Request message peforming phase one for replication to learner nodes.
learned(+Key, +Gen, +Value, +Acceptors)
Phase two of the replication. Confirm the newly learned knowledge.
retrieve(+Key, -Node, -Gen, -Value)
A request message to retrieve our value for Key. Also provides our node id and the generation.
forget(+Nodes)
Forget the existence of Nodes.
node(-Node, -Quorum, -Dead)
Get my view about the network. Node is the (integer) node id of this node, Quorum is the idea of the quorum and Dead is the idea about non-responsive nodes.
To be done
- : originally the changed was handled by a get and when not successful with a new set, named paxos_audit. I don't really see why we need this.
  615paxos_message(prepare(Key,Node,Gen,Value)) :-
  616    node(Node),
  617    (   ledger(Key, Gen, _)
  618    ->  true
  619    ;   Gen = 0,
  620        ledger_create(Key, Gen, Value)
  621    ),
  622    debug(paxos, 'Prepared ~p-~p@~d', [Key,Value,Gen]).
  623paxos_message(accept(Key,Node,Gen,GenA,Value)) :-
  624    node(Node),
  625    (   ledger_update(Key, Gen, Value)
  626    ->  debug(paxos, 'Accepted ~p-~p@~d', [Key,Value,Gen]),
  627        GenA = Gen
  628    ;   debug(paxos, 'Rejected ~p-~p@~d', [Key,Value,Gen]),
  629        GenA = nack
  630    ).
  631paxos_message(changed(Key,Gen,Value,Acceptors)) :-
  632    debug(paxos, 'Changed ~p-~p@~d for ~p', [Key, Value, Gen, Acceptors]),
  633    ledger_update_holders(Key,Gen,Acceptors),
  634    broadcast(paxos_changed(Key,Value)).
  635paxos_message(learn(Key,Node,Gen,GenA,Value)) :-
  636    node(Node),
  637    debug(paxos, 'Learn ~p-~p@~p?', [Key, Value, Gen]),
  638    (   ledger_learn(Key,Gen,Value)
  639    ->  debug(paxos, 'Learned ~p-~p@~d', [Key,Value,Gen]),
  640        GenA = Gen
  641    ;   debug(paxos, 'Rejected ~p@~d', [Key, Gen]),
  642        GenA = nack
  643    ).
  644paxos_message(learned(Key,Gen,_Value,Acceptors)) :-
  645    ledger_update_holders(Key,Gen,Acceptors).
  646paxos_message(retrieve(Key,Node,K,Value)) :-
  647    node(Node),
  648    debug(paxos, 'Retrieving ~p', [Key]),
  649    ledger(Key,K,Value),
  650    debug(paxos, 'Retrieved ~p-~p@~d', [Key,Value,K]),
  651    !.
  652paxos_message(forget(Nodes)) :-
  653    ledger_forget(Nodes).
  654paxos_message(node(Node,Quorum,Dead)) :-
  655    (   node(Node),
  656        quorum(Quorum),
  657        dead(Dead)
  658    ->  true
  659    ;   salt(Salt),
  660        Node = self,
  661        Quorum = 0,
  662        Dead = Salt
  663    ).
  664paxos_message(claim_node(Node, Ok)) :-
  665    (   node(Node)
  666    ->  Ok = false
  667    ;   Ok = true
  668    ).
  669paxos_message(ask(Node, Message)) :-
  670    node(Node),
  671    broadcast_request(Message).
  672
  673
  674		 /*******************************
  675		 *     KEY-VALUE OPERATIONS	*
  676		 *******************************/
 paxos_set(+Term) is semidet
Equivalent to paxos_key(Term,Key), pasox_set(Key,Term). I.e., Term is a ground compound term and its key is the name/arity pair. This version provides compatibility with older versions of this library.
 paxos_set(+Key, +Value) is semidet
 paxos_set(+Key, +Value, +Options) is semidet
negotiates to have Key-Value recorded in the ledger for each of the quorum's members. This predicate succeeds if the quorum unanimously accepts the proposed term. If no such entry exists in the Paxon's ledger, then one is silently created. paxos_set/1 will retry the transaction several times (default: 20) before failing. Failure is rare and is usually the result of a collision of two or more writers writing to the same term at precisely the same time. On failure, it may be useful to wait some random period of time, and then retry the transaction. By specifying a retry count of zero, paxos_set/2 will succeed iff the first ballot succeeds.

On success, paxos_set/1 will also broadcast the term paxos_changed(Key,Value), to the quorum.

Options processed:

retry(Retries)
is a non-negative integer specifying the number of retries that will be performed before a set is abandoned. Defaults to the setting max_sets (20).
timeout(+Seconds)
Max time to wait for the forum to reply. Defaults to the setting response_timeout (0.020, 20ms).
Arguments:
Term- is a compound that may have unbound variables.
To be done
- If the Value is already current, should we simply do nothing?
  714paxos_set(Term) :-
  715    paxos_key(Term, Key),
  716    paxos_set(Key, Term, []).
  717
  718paxos_set(Key, Value) :-
  719    paxos_set(Key, Value, []).
  720
  721paxos_set(Key, Value, Options) :-
  722    must_be(ground, Key-Value),
  723    paxos_initialize,
  724    option(retry(Retries), Options, Retries),
  725    option(timeout(TMO), Options, TMO),
  726    apply_default(Retries, max_sets),
  727    apply_default(TMO, response_timeout),
  728    paxos_message(prepare(Key,Np,Rp,Value), TMO, Prepare),
  729    between(0, Retries, _),
  730      life_quorum(Quorum, Alive),
  731      Alive \== 0,
  732      debug(paxos, 'Set: ~p -> ~p', [Key, Value]),
  733      collect(Quorum, false, Np, Rp, Prepare, Rps, PrepNodes),
  734      debug(paxos, 'Set: quorum: 0x~16r, prepared by 0x~16r, gens ~p',
  735            [Quorum, PrepNodes, Rps]),
  736      majority(PrepNodes, Quorum),
  737      max_list(Rps, K),
  738      succ(K, K1),
  739      paxos_message(accept(Key,Na,K1,Ra,Value), TMO, Accept),
  740      collect(Alive, Ra == nack, Na, Ra, Accept, Ras, AcceptNodes),
  741      majority(AcceptNodes, Quorum),
  742      intersecting(PrepNodes, AcceptNodes),
  743      c_element(Ras, K, K1),
  744      broadcast(paxos(log(Key,Value,AcceptNodes,K1))),
  745      paxos_message(changed(Key,K1,Value,AcceptNodes), -, Changed),
  746      broadcast(Changed),
  747      update_failed(set, Quorum, AcceptNodes),
  748    !.
  749
  750apply_default(Var, Setting) :-
  751    var(Var),
  752    !,
  753    setting(Setting, Var).
  754apply_default(_, _).
  755
  756majority(SubSet, Set) :-
  757    popcount(SubSet) >= (popcount(Set)+2)//2.
  758
  759intersecting(Set1, Set2) :-
  760    Set1 /\ Set2 =\= 0.
 collect(+Quorum, :Stop, ?Node, ?Template, ?Message, -Result, -NodeSet) is semidet
Perform a broadcast request using Message. Node and Template share with Message and extract the replying node and the result value from Message. Result is the list of instantiations for Template received and NodeSet is the set (bitmask) of Node values that replies, i.e. |NodeSet| is length(Result). The transfer stops if all members of the set Quorum responded or the configured timeout passed.
  773collect(Quorum, Stop, Node, Template, Message, Result, NodeSet) :-
  774    State = state(0),
  775    L0 = [dummy|_],
  776    Answers = list(L0),
  777    (   broadcast_request(Message),
  778        debug(paxos(request), 'broadcast_request: ~p', [Message]),
  779        (   Stop
  780        ->  !,
  781            fail
  782        ;   true
  783        ),
  784        duplicate_term(Template, Copy),
  785        NewLastCell = [Copy|_],
  786        arg(1, Answers, LastCell),
  787        nb_linkarg(2, LastCell, NewLastCell),
  788        nb_linkarg(1, Answers, NewLastCell),
  789        arg(1, State, Replied0),
  790        Replied is Replied0 \/ (1<<Node),
  791        nb_setarg(1, State, Replied),
  792        Quorum /\ Replied =:= Quorum
  793    ->  true
  794    ;   true
  795    ),
  796    arg(1, State, NodeSet),
  797    arg(1, Answers, [_]),               % close the answer list
  798    L0 = [_|Result].
 paxos_quorum_ask(?Template, +Message, -Result, +Options)
Ask the paxos forum for their opinion. This predicate is not really part of the paxos protocol, but reuses notably the quorum maintenance mechanism of this library for asking questions to the quorum (cluster). Message is the message to be asked. Result is a list of copies of Template from the quorum. Options:
timeout(+Seconds)
Max time to wait for a reply. Default is the setting response_timeout.
node(?Node)
Can be used to include the replying node into Template.
quorum(?Quorum)
Set/query the interviewed quorum as a bitmask
  816paxos_quorum_ask(Template, Message, Result, Options) :-
  817    option(timeout(TMO), Options, TMO),
  818    option(node(Node), Options, _),
  819    option(quorum(Quorum), Options, Quorum),
  820    apply_default(TMO, response_timeout),
  821    (   var(Quorum)
  822    ->  life_quorum(Quorum, _Alive)
  823    ;   true
  824    ),
  825    paxos_message(ask(Node, Message), TMO, BroadcastMessage),
  826    collect(Quorum, false, Node, Template, BroadcastMessage, Result, _PrepNodes).
 paxos_get(?Term) is semidet
Equivalent to paxos_key(Term,Key), pasox_get(Key,Term). I.e., Term is a compound term and its key is the name/arity pair. This version provides compatibility with older versions of this library.
 paxos_get(+Key, +Value) is semidet
 paxos_get(+Key, +Value, +Options) is semidet
unifies Term with the entry retrieved from the Paxon's ledger. If no such entry exists in the member's local cache, then the quorum is asked to provide a value, which is verified for consistency. An implied paxos_set/1 follows. This predicate succeeds if a term with the same functor and arity exists in the Paxon's ledger, and fails otherwise.

Options processed:

retry(Retries)
is a non-negative integer specifying the number of retries that will be performed before a set is abandoned. Defaults to the setting max_gets (5).
timeout(+Seconds)
Max time to wait for the forum to reply. Defaults to the setting response_timeout (0.020, 20ms).
Arguments:
Term- is a compound. Any unbound variables are unified with those provided in the ledger entry.
  857paxos_get(Term) :-
  858    paxos_key(Term, Key),
  859    paxos_get(Key, Term, []).
  860paxos_get(Key, Value) :-
  861    paxos_get(Key, Value, []).
  862
  863paxos_get(Key, Value, _) :-
  864    ledger(Key, _Line, Value),
  865    !.
  866paxos_get(Key, Value, Options) :-
  867    paxos_initialize,
  868    option(retry(Retries), Options, Retries),
  869    option(timeout(TMO), Options, TMO),
  870    apply_default(Retries, max_gets),
  871    apply_default(TMO, response_timeout),
  872    Msg = Line-Value,
  873    paxos_message(retrieve(Key,Nr,Line,Value), TMO, Retrieve),
  874    node(Node),
  875    between(0, Retries, _),
  876      life_quorum(Quorum, Alive),
  877      QuorumA is Alive /\ \(1<<Node),
  878      collect(QuorumA, false, Nr, Msg, Retrieve, Terms, RetrievedNodes),
  879      debug(paxos, 'Retrieved: ~p from 0x~16r', [Terms, RetrievedNodes]),
  880      highest_vote(Terms, _Line-MajorityValue, Count),
  881      debug(paxos, 'Best: ~p with ~d votes', [MajorityValue, Count]),
  882      Count >= (popcount(QuorumA)+2)//2,
  883      debug(paxos, 'Retrieve: accept ~p', [MajorityValue]),
  884      update_failed(get, Quorum, RetrievedNodes),
  885      paxos_set(Key, MajorityValue),    % Is this needed?
  886    !.
  887
  888highest_vote(Terms, Term, Count) :-
  889    msort(Terms, Sorted),
  890    count_votes(Sorted, Counted),
  891    sort(1, >, Counted, [Count-Term|_]).
  892
  893count_votes([], []).
  894count_votes([H|T0], [N-H|T]) :-
  895    count_same(H, T0, 1, N, R),
  896    count_votes(R, T).
  897
  898count_same(H, [Hc|T0], C0, C, R) :-
  899    H == Hc,
  900    !,
  901    C1 is C0+1,
  902    count_same(H, T0, C1, C, R).
  903count_same(_, R, C, C, R).
 paxos_key(+Term, -Key) is det
Compatibility to allow for paxos_get/1, paxos_set/1, etc. The key of a compound term is a term '$c'(Name,Arity). Note that we do not use Name/Arity and X/Y is naturally used to organize keys as hierachical paths.
  912paxos_key(Compound, '$c'(Name,Arity)) :-
  913    compound(Compound), !,
  914    compound_name_arity(Compound, Name, Arity).
  915paxos_key(Compound, _) :-
  916    must_be(compound, Compound).
  917
  918
  919		 /*******************************
  920		 *          REPLICATION		*
  921		 *******************************/
 start_replicator
Start or signal the replicator thread that there may be outstanding replication work. This is the case if
  932start_replicator :-
  933    catch(thread_send_message(paxos_replicator, run),
  934          error(existence_error(_,_),_),
  935          fail),
  936    !.
  937start_replicator :-
  938    catch(thread_create(replicator, _,
  939                        [ alias(paxos_replicator),
  940                          detached(true)
  941                        ]),
  942          error(permission_error(_,_,_),_),
  943          true).
  944
  945replicator :-
  946    setting(replication_rate, ReplRate),
  947    ReplSleep is 1/ReplRate,
  948    node(Node),
  949    debug(paxos(replicate), 'Starting replicator', []),
  950    State = state(idle),
  951    repeat,
  952      quorum(Quorum),
  953      dead(Dead),
  954      LifeQuorum is Quorum /\ \Dead,
  955      (   LifeQuorum /\ \(1<<Node) =:= 0
  956      ->  debug(paxos(replicate),
  957                'Me: ~d, Quorum: 0x~16r, Dead: 0x~16r: I''m alone, waiting ...',
  958                [Node, Quorum, Dead]),
  959          thread_get_message(_)
  960      ;   (   paxos_replicate_key(LifeQuorum, Key, [])
  961          ->  replicated(State, key(Key)),
  962              thread_self(Me),
  963              thread_get_message(Me, _, [timeout(ReplSleep)])
  964          ;   replicated(State, idle),
  965              thread_get_message(_)
  966          )
  967      ),
  968      fail.
  969
  970replicated(State, key(_Key)) :-
  971    arg(1, State, idle),
  972    !,
  973    debug(paxos(replicate), 'Start replicating ...', []),
  974    nb_setarg(1, State, 1).
  975replicated(State, key(_Key)) :-
  976    !,
  977    arg(1, State, C0),
  978    C is C0+1,
  979    nb_setarg(1, State, C).
  980replicated(State, idle) :-
  981    arg(1, State, idle),
  982    !.
  983replicated(State, idle) :-
  984    arg(1, State, Count),
  985    debug(paxos(replicate), 'Replicated ~D keys', [Count]),
  986    nb_setarg(1, State, idle).
 paxos_replicate_key(+Nodes:bitmap, ?Key, +Options) is det
Replicate a Key to Nodes. If Key is unbound, a random key is selected.
timeout(+Seconds)
Max time to wait for the forum to reply. Defaults to the setting response_timeout (0.020, 20ms).
  998paxos_replicate_key(Nodes, Key, Options) :-
  999    replication_key(Nodes, Key),
 1000    option(timeout(TMO), Options, TMO),
 1001    apply_default(TMO, response_timeout),
 1002    ledger_current(Key, Gen, Value, Holders),
 1003    paxos_message(learn(Key,Na,Gen,Ga,Value), TMO, Learn),
 1004    collect(Nodes, Ga == nack, Na, Ga, Learn, _Gas, LearnedNodes),
 1005    NewHolders is Holders \/ LearnedNodes,
 1006    paxos_message(learned(Key,Gen,Value,NewHolders), -, Learned),
 1007    broadcast(Learned),
 1008    update_failed(replicate, Nodes, LearnedNodes).
 1009
 1010replication_key(_Nodes, Key) :-
 1011    ground(Key),
 1012    !.
 1013replication_key(Nodes, Key) :-
 1014    (   Nth is 1+random(popcount(Nodes))
 1015    ;   Nth = 1
 1016    ),
 1017    call_nth(needs_replicate(Nodes, Key), Nth),
 1018    !.
 1019
 1020needs_replicate(Nodes, Key) :-
 1021    ledger_current(Key, _Gen, _Value, Holders),
 1022    Nodes /\ \Holders =\= 0,
 1023    \+ paxos_admin_key(_, Key).
 1024
 1025
 1026		 /*******************************
 1027		 *      KEY CHANGE EVENTS	*
 1028		 *******************************/
 paxos_on_change(?Term, :Goal) is det
 paxos_on_change(?Key, ?Value, :Goal) is det
Executes the specified Goal when Key changes. paxos_on_change/2 listens for paxos_changed(Key,Value) notifications for Key, which are emitted as the result of successful paxos_set/3 transactions. When one is received for Key, then Goal is executed in a separate thread of execution.
Arguments:
Term- is a compound, identical to that used for paxos_get/1.
Goal- is one of:
  • a callable atom or term, or
  • the atom ignore, which causes monitoring for Term to be discontinued.
 1046paxos_on_change(Term, Goal) :-
 1047    paxos_key(Term, Key),
 1048    paxos_on_change(Key, Term, Goal).
 1049
 1050paxos_on_change(Key, Value, Goal) :-
 1051    Goal = _:Plain,
 1052    must_be(callable, Plain),
 1053    (   Plain == ignore
 1054    ->  unlisten(paxos_user, paxos_changed(Key,Value))
 1055    ;   listen(paxos_user, paxos_changed(Key,Value),
 1056               key_changed(Key, Value, Goal)),
 1057        paxos_initialize
 1058    ).
 1059
 1060key_changed(_Key, _Value, Goal) :-
 1061    E = error(_,_),
 1062    catch(thread_create(Goal, _, [detached(true)]),
 1063          E, key_error(E)).
 1064
 1065key_error(error(permission_error(create, thread, _), _)) :-
 1066    !.
 1067key_error(E) :-
 1068    print_message(error, E).
 1069
 1070
 1071		 /*******************************
 1072		 *            HOOKS		*
 1073		 *******************************/
 node(-Node) is det
Get the node ID for this paxos node.
 quorum(-Quorum) is det
Get the current quorum as a bitmask
 paxos_message(+PaxOS, +TimeOut, -BroadcastMessage) is det
Transform a basic PaxOS message in a message for the broadcasting service. This predicate is hooked by paxos_message_hook/3 with the same signature.
Arguments:
TimeOut- is one of - or a time in seconds.
 1091paxos_message(Paxos:From, TMO, Message) :-
 1092    paxos_message_raw(paxos(Paxos):From, TMO, Message).
 1093paxos_message(Paxos, TMO, Message) :-
 1094    paxos_message_raw(paxos(Paxos), TMO, Message).
 1095
 1096paxos_message_raw(Message, TMO, WireMessage) :-
 1097    paxos_message_hook(Message, TMO, WireMessage),
 1098    !.
 1099paxos_message_raw(Message, TMO, WireMessage) :-
 1100    throw(error(mode_error(det, fail,
 1101                           paxos:paxos_message_hook(Message, TMO, WireMessage)), _)).
 1102
 1103
 1104		 /*******************************
 1105		 *           STORAGE		*
 1106		 *******************************/
 paxos_ledger_hook(+Action, ?Key, ?Gen, ?Value, ?Holders)
Hook called for all operations on the ledger. Defined actions are:
current
Enumerate our ledger content.
get
Get a single value from our ledger.
create
Create a new key in our ledger.
accept
Accept a new newly proposed value for a key. Failure causes the library to send a NACK message.
set
Final acceptance of Ken@Gen, providing the holders that accepted the new value.
learn
Accept new keys in a new node or node that has been offline for some time.
 1128:- dynamic
 1129    paxons_ledger/4.                    % Key, Gen, Value, Holders
 ledger_current(?Key, ?Gen, ?Value, ?Holders) is nondet
True when Key is a known key in my ledger.
 1135ledger_current(Key, Gen, Value, Holders) :-
 1136    paxos_ledger_hook(current, Key, Gen, Value, Holders).
 1137ledger_current(Key, Gen, Value, Holders) :-
 1138    paxons_ledger(Key, Gen, Value, Holders),
 1139    valid(Holders).
 ledger(+Key, -Gen, -Value) is semidet
True if the ledger has Value associated with Key at generation Gen. Note that if the value is not yet acknowledged by the leader we should not use it.
 1148ledger(Key, Gen, Value) :-
 1149    paxos_ledger_hook(get, Key, Gen, Value0, Holders),
 1150    !,
 1151    valid(Holders),
 1152    Value = Value0.
 1153ledger(Key, Gen, Value) :-
 1154    paxons_ledger(Key, Gen, Value0, Holders),
 1155    valid(Holders),
 1156    !,
 1157    Value = Value0.
 ledger_create(+Key, +Gen, +Value) is det
Create a new Key-Value pair at generation Gen. This is executed during the preparation phase.
 1164ledger_create(Key, Gen, Value) :-
 1165    paxos_ledger_hook(create, Key, Gen, Value, -),
 1166    !.
 1167ledger_create(Key, Gen, Value) :-
 1168    get_time(Now),
 1169    asserta(paxons_ledger(Key, Gen, Value, created(Now))).
 ledger_update(+Key, +Gen, +Value) is semidet
Update Key to Value if the current generation is older than Gen. This reflects the accept phase of the protocol.
 1176ledger_update(Key, Gen, Value) :-
 1177    paxos_ledger_hook(accept, Key, Gen, Value, -),
 1178    !.
 1179ledger_update(Key, Gen, Value) :-
 1180    paxons_ledger(Key, Gen0, _Value, _Holders),
 1181    !,
 1182    Gen > Gen0,
 1183    get_time(Now),
 1184    asserta(paxons_ledger(Key, Gen, Value, accepted(Now))),
 1185    (   Gen0 == 0
 1186    ->  retractall(paxons_ledger(Key, Gen0, _, _))
 1187    ;   true
 1188    ).
 ledger_update_holders(+Key, +Gen, +Holders) is det
The leader acknowledged that Key@Gen represents a valid new
 1194ledger_update_holders(Key, Gen, Holders) :-
 1195    paxos_ledger_hook(set, Key, Gen, _, Holders),
 1196    !.
 1197ledger_update_holders(Key, Gen, Holders) :-
 1198    clause(paxons_ledger(Key, Gen, Value, Holders0), true, Ref),
 1199    !,
 1200    (   Holders0 == Holders
 1201    ->  true
 1202    ;   asserta(paxons_ledger(Key, Gen, Value, Holders)),
 1203        erase(Ref)
 1204    ),
 1205    clean_key(Holders0, Key, Gen).
 1206
 1207clean_key(Holders, _Key, _Gen) :-
 1208    valid(Holders),
 1209    !.
 1210clean_key(_, Key, Gen) :-
 1211    (   clause(paxons_ledger(Key, Gen0, _Value, _Holders0), true, Ref),
 1212        Gen0 < Gen,
 1213        erase(Ref),
 1214        fail
 1215    ;   true
 1216    ).
 ledger_learn(+Key, +Gen, +Value) is semidet
We received a learn event.
 1223ledger_learn(Key,Gen,Value) :-
 1224    paxos_ledger_hook(learn, Key, Gen, Value, -),
 1225    !.
 1226ledger_learn(Key,Gen,Value) :-
 1227    paxons_ledger(Key, Gen0, Value0, _Holders),
 1228    !,
 1229    (   Gen == Gen0,
 1230        Value == Value0
 1231    ->  true
 1232    ;   Gen > Gen0
 1233    ->  get_time(Now),
 1234        asserta(paxons_ledger(Key, Gen, Value, learned(Now)))
 1235    ).
 1236ledger_learn(Key,Gen,Value) :-
 1237    get_time(Now),
 1238    asserta(paxons_ledger(Key, Gen, Value, learned(Now))).
 ledger_forget(+Nodes) is det
Remove Nodes from all ledgers. This is executed in a background thread.
 1245ledger_forget(Nodes) :-
 1246    catch(thread_create(ledger_forget_threaded(Nodes), _,
 1247                        [ detached(true)
 1248                        ]),
 1249          error(permission_error(create, thread, _), _),
 1250          true).
 1251
 1252ledger_forget_threaded(Nodes) :-
 1253    debug(paxos(node), 'Forgetting 0x~16r', [Nodes]),
 1254    forall(ledger_current(Key, Gen, _Value, Holders),
 1255           ledger_forget(Nodes, Key, Gen, Holders)),
 1256    debug(paxos(node), 'Forgotten 0x~16r', [Nodes]).
 1257
 1258ledger_forget(Nodes, Key, Gen, Holders) :-
 1259    NewHolders is Holders /\ \Nodes,
 1260    (   NewHolders \== Holders,
 1261        ledger_update_holders(Key, Gen, NewHolders)
 1262    ->  true
 1263    ;   true
 1264    ).
 1265
 1266valid(Holders) :-
 1267    integer(Holders).
 1268
 1269
 1270		 /*******************************
 1271		 *             UTIL		*
 1272		 *******************************/
 c_element(+NewList, +Old, -Value)
A Muller c-element is a logic block used in asynchronous logic. Its output assumes the value of its input iff all of its inputs are identical. Otherwise, the output retains its original value.
 1280c_element([New | More], _Old, New) :-
 1281    forall(member(N, More), N == New),
 1282    !.
 1283c_element(_List, Old, Old).
 arg_union(+Arg, +ListOfTerms, -Set) is det
Get all the nth args from ListOfTerms and do a set union on the result.
 1290arg_union(Arg, NodeStatusList, Set) :-
 1291    maplist(arg(Arg), NodeStatusList, Sets),
 1292    list_union(Sets, Set).
 1293
 1294list_union(Sets, Set) :-
 1295    list_union(Sets, 0, Set).
 1296
 1297list_union([], Set, Set).
 1298list_union([H|T], Set0, Set) :-
 1299    Set1 is Set0 \/ H,
 1300    list_union(T, Set1, Set)