View source with raw comments or as raw
    1/*  Part of SWI-Prolog
    2
    3    Author:        Jan Wielemaker and Sean Charles
    4    E-mail:        jan@swi-prolog.org and <sean at objitsu dot com>
    5    WWW:           http://www.swi-prolog.org
    6    Copyright (c)  2013-2024, Sean Charles
    7                              SWI-Prolog Solutions b.v.
    8    All rights reserved.
    9
   10    Redistribution and use in source and binary forms, with or without
   11    modification, are permitted provided that the following conditions
   12    are met:
   13
   14    1. Redistributions of source code must retain the above copyright
   15       notice, this list of conditions and the following disclaimer.
   16
   17    2. Redistributions in binary form must reproduce the above copyright
   18       notice, this list of conditions and the following disclaimer in
   19       the documentation and/or other materials provided with the
   20       distribution.
   21
   22    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
   23    "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
   24    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
   25    FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
   26    COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
   27    INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
   28    BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
   29    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
   30    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   31    LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
   32    ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
   33    POSSIBILITY OF SUCH DAMAGE.
   34
   35    NOTE
   36
   37    The original code was subject to the MIT licence and written by
   38    Sean Charles.  Re-licenced to standard SWI-Prolog BSD-2 with
   39    permission from Sean Charles.
   40*/
   41
   42:- module(redis,
   43          [ redis_server/3,             % +Alias, +Address, +Options
   44            redis_connect/1,            % -Connection
   45            redis_connect/3,            % -Connection, +Host, +Port
   46            redis_disconnect/1,         % +Connection
   47            redis_disconnect/2,         % +Connection, +Options
   48                                        % Queries
   49            redis/1,                    % +Request
   50            redis/2,                    % +Connection, +Request
   51            redis/3,                    % +Connection, +Request, -Reply
   52                                        % High level queries
   53            redis_get_list/3,           % +Redis, +Key, -List
   54            redis_get_list/4,           % +Redis, +Key, +ChunkSize, -List
   55            redis_set_list/3,           % +Redis, +Key, +List
   56            redis_get_hash/3,           % +Redis, +Key, -Data:dict
   57            redis_set_hash/3,           % +Redis, +Key, +Data:dict
   58            redis_scan/3,               % +Redis, -LazyList, +Options
   59            redis_sscan/4,              % +Redis, +Set, -LazyList, +Options
   60            redis_hscan/4,              % +Redis, +Hash, -LazyList, +Options
   61            redis_zscan/4,              % +Redis, +Set, -LazyList, +Options
   62                                        % Publish/Subscribe
   63            redis_subscribe/4,          % +Redis, +Channels, -Id, +Options
   64            redis_subscribe/2,          % +Id, +Channels
   65            redis_unsubscribe/2,        % +Id, +Channels
   66            redis_current_subscription/2, % ?Id,?Channels
   67            redis_write/2,              % +Redis, +Command
   68            redis_read/2,               % +Redis, -Reply
   69                                        % Building blocks
   70            redis_array_dict/3,         % ?Array, ?Tag, ?Dict
   71                                        % Admin stuff
   72            redis_property/2,           % +Reply, ?Property
   73            redis_current_command/2,    % +Redis,?Command
   74            redis_current_command/3,    % +Redis, +Command, -Properties
   75
   76            sentinel_slave/4            % +ServerId, +Pool, -Slave, +Options
   77          ]).   78:- autoload(library(socket), [tcp_connect/3]).   79:- autoload(library(apply), [maplist/2, convlist/3, maplist/3, maplist/5]).   80:- autoload(library(broadcast), [broadcast/1]).   81:- autoload(library(error),
   82            [ must_be/2,
   83	      type_error/2,
   84              instantiation_error/1,
   85              uninstantiation_error/1,
   86              existence_error/2,
   87              existence_error/3
   88            ]).   89:- autoload(library(lazy_lists), [lazy_list/2]).   90:- autoload(library(lists), [append/3, member/2]).   91:- autoload(library(option), [merge_options/3, option/2,
   92			      option/3, select_option/4]).   93:- autoload(library(pairs), [group_pairs_by_key/2]).   94:- autoload(library(time), [call_with_time_limit/2]).   95:- use_module(library(debug), [debug/3, assertion/1]).   96:- use_module(library(settings), [setting/4, setting/2]).   97:- if(exists_source(library(ssl))).   98:- autoload(library(ssl), [ssl_context/3, ssl_negotiate/5]).   99:- endif.  100
  101:- use_foreign_library(foreign(redis4pl)).  102
  103:- setting(max_retry_count, nonneg, 8640, % one day
  104           "Max number of retries").  105:- setting(max_retry_wait, number, 10,
  106           "Max time to wait between recovery attempts").  107:- setting(sentinel_timeout, number, 0.2,
  108	   "Time to wait for a sentinel").  109
  110:- predicate_options(redis_server/3, 3,
  111                     [ pass_to(redis:redis_connect/3, 3)
  112                     ]).  113:- predicate_options(redis_connect/3, 3,
  114                     [ reconnect(boolean),
  115                       user(atom),
  116                       password(atomic),
  117                       version(between(2,3))
  118                     ]).  119:- predicate_options(redis_disconnect/2, 2,
  120                     [ force(boolean)
  121                     ]).  122:- predicate_options(redis_scan/3, 3,
  123                     [ match(atomic),
  124                       count(nonneg),
  125                       type(atom)
  126                     ]).  127% Actually not passing, but the same
  128:- predicate_options(redis_sscan/4, 4, [pass_to(redis:redis_scan/3, 3)]).  129:- predicate_options(redis_hscan/4, 4, [pass_to(redis:redis_scan/3, 3)]).  130:- predicate_options(redis_zscan/4, 4, [pass_to(redis:redis_scan/3, 3)]).

Redis client

This library is a client to Redis, a popular key value store to deal with caching and communication between micro services.

In the typical use case we register the details of one or more Redis servers using redis_server/3. Subsequenly, redis/2-3 is used to issue commands on the server. For example:

?- redis_server(default, redis:6379, [password("secret")]).
?- redis(default, set(user, "Bob")).
?- redis(default, get(user), User).
User = "Bob"

*/

  151:- dynamic server/3.  152
  153:- dynamic ( connection/2,              % ServerName, Stream
  154	     sentinel/2			% Pool, Address
  155           ) as volatile.
 redis_server(+ServerName, +Address, +Options) is det
Register a redis server without connecting to it. The ServerName acts as a lazy connection alias. Initially the ServerName default points at localhost:6379 with no connect options. The default server is used for redis/1 and redis/2 and may be changed using this predicate. Options are described with redis_connect/3.

Connections established this way are by default automatically reconnected if the connection is lost for some reason unless a reconnect(false) option is specified.

  169redis_server(Alias, Address, Options) :-
  170    must_be(ground, Alias),
  171    retractall(server(Alias, _, _)),
  172    asserta(server(Alias, Address, Options)).
  173
  174server(default, localhost:6379, []).
 redis_connect(-Connection) is det
 redis_connect(+Address, -Connection, +Options) is det
redis_connect(-Connection, +Host, +Port) is det
Connect to a redis server. The main mode is redis_connect(+Address, -Connection, +Options). redis_connect/1 is equivalent to redis_connect(localhost:6379, Connection, []). Options:
reconnect(+Boolean)
If true, try to reconnect to the service when the connection seems lost. Default is true for connections specified using redis_server/3 and false for explictly opened connections.
user(+User)
If version(3) and password(Password) are specified, these are used to authenticate using the HELLO command.
password(+Password)
Authenticate using Password
version(+Version)
Specify the connection protocol version. Initially this is version 2. Redis 6 also supports version 3. When specified as 3, the HELLO command is used to upgrade the protocol.
tls(true)
When specified, initiate a TLS connection. If this option is specified we must also specify the cacert, key and cert options.
cacert(+File)
CA Certificate file to verify with.
cert(+File)
Client certificate to authenticate with.
key(+File)
Private key file to authenticate with.
sentinels(+ListOfAddresses)
Used together with an Address of the form sentinel(MasterName) to enable contacting a network of Redis servers guarded by a sentinel network.
sentinel_user(+User)
sentinel_password(+Password)
Authentication information for the senitels. When omitted we try to connect withour authentication.

Instead of using these predicates, redis/2 and redis/3 are normally used with a server name argument registered using redis_server/3. These predicates are meant for creating a temporary paralel connection or using a connection with a blocking call.

Arguments:
Address- is a term Host:Port, unix(File) or the name of a server registered using redis_server/3. The latter realises a new connection that is typically used for blocking redis commands such as listening for published messages, waiting on a list or stream.
Compatibility
- redis_connect(-Connection, +Host, +Port) provides compatibility to the original GNU-Prolog interface and is equivalent to redis_connect(Host:Port, Connection, []).
  230redis_connect(Conn) :-
  231    redis_connect(default, Conn, []).
  232
  233redis_connect(Conn, Host, Port) :-
  234    var(Conn),
  235    ground(Host), ground(Port),
  236    !,                                  % GNU-Prolog compatibility
  237    redis_connect(Host:Port, Conn, []).
  238redis_connect(Server, Conn, Options) :-
  239    atom(Server),
  240    !,
  241    (   server(Server, Address, DefaultOptions)
  242    ->  merge_options(Options, DefaultOptions, Options2),
  243        do_connect(Server, Address, Conn, [address(Address)|Options2])
  244    ;   existence_error(redis_server, Server)
  245    ).
  246redis_connect(Address, Conn, Options) :-
  247    do_connect(Address, Address, Conn, [address(Address)|Options]).
 do_connect(+Id, +Address, -Conn, +Options)
Open the connection. A connection is a compound term of the shape
redis_connection(Id, Stream, Failures, Options)
  255do_connect(Id, sentinel(Pool), Conn, Options) =>
  256    sentinel_master(Id, Pool, Conn, Options).
  257do_connect(Id, Address0, Conn, Options) =>
  258    tcp_address(Address0, Address),
  259    tcp_connect(Address, Stream0, Options),
  260    tls_upgrade(Address, Stream0, Stream, Options),
  261    Conn = redis_connection(Id, Stream, 0, Options),
  262    hello(Conn, Options).
  263
  264tcp_address(unix(Path), Path) :-
  265    !.                                  % Using an atom is ambiguous
  266tcp_address(Address, Address).
 tls_upgrade(+Address, +Raw, -Stream, +Options) is det
Upgrade to a TLS connection when tls(true) is specified.
  272:- if(current_predicate(ssl_context/3)).  273tls_upgrade(Host:_Port, Raw, Stream, Options) :-
  274    option(tls(true), Options),
  275    !,
  276    must_have_option(cacert(CacertFile), Options),
  277    must_have_option(key(KeyFile), Options),
  278    must_have_option(cert(CertFile), Options),
  279    ssl_context(client, SSL,
  280		[ host(Host),
  281		  certificate_file(CertFile),
  282		  key_file(KeyFile),
  283		  cacerts([file(CacertFile)]),
  284		  cert_verify_hook(tls_verify),
  285		  close_parent(true)
  286		]),
  287    stream_pair(Raw, RawRead, RawWrite),
  288    ssl_negotiate(SSL, RawRead, RawWrite, Read, Write),
  289    stream_pair(Stream, Read, Write).
  290:- endif.  291tls_upgrade(_, Stream, Stream, _).
  292
  293:- if(current_predicate(ssl_context/3)).
 tls_verify(+SSL, +ProblemCert, +AllCerts, +FirstCert, +Status) is semidet
Accept or reject the certificate verification. Similar to the Redis command line client (redis-cli), we accept the certificate as long as it is signed, not verifying the hostname.
  301:- public tls_verify/5.  302tls_verify(_SSL, _ProblemCert, _AllCerts, _FirstCert, verified) :-
  303    !.
  304tls_verify(_SSL, _ProblemCert, _AllCerts, _FirstCert, hostname_mismatch) :-
  305    !.
  306tls_verify(_SSL, _ProblemCert, _AllCerts, _FirstCert, _Error) :-
  307    fail.
  308
  309:- endif.
 sentinel_master(+ServerId, +SentinelPool, -Connection, +Options) is det
Discover the master and connect to it.
  315sentinel_master(Id, Pool, Master, Options) :-
  316    sentinel_connect(Id, Pool, Conn, Options),
  317    setting(sentinel_timeout, TMO),
  318    call_cleanup(
  319	query_sentinel(Pool, Conn, MasterAddr),
  320	redis_disconnect(Conn)),
  321    debug(redis(sentinel), 'Sentinel claims master is at ~p', [MasterAddr]),
  322    do_connect(Id, MasterAddr, Master, Options),
  323    debug(redis(sentinel), 'Connected to claimed master', []),
  324    redis(Master, role, Role),
  325    (   Role = [master|_Slaves]
  326    ->  debug(redis(sentinel), 'Verified role at ~p', [MasterAddr])
  327    ;   redis_disconnect(Master),
  328	debug(redis(sentinel), '~p is not the master: ~p', [MasterAddr, Role]),
  329	sleep(TMO),
  330	sentinel_master(Id, Pool, Master, Options)
  331    ).
  332
  333sentinel_connect(Id, Pool, Conn, Options) :-
  334    must_have_option(sentinels(Sentinels), Options),
  335    sentinel_auth(Options, Options1),
  336    setting(sentinel_timeout, TMO),
  337    (   sentinel(Pool, Sentinel)
  338    ;   member(Sentinel, Sentinels)
  339    ),
  340    catch(call_with_time_limit(
  341	      TMO,
  342	      do_connect(Id, Sentinel, Conn,
  343			 [sentinel(true)|Options1])),
  344	  Error,
  345	  (print_message(warning, Error),fail)),
  346    !,
  347    debug(redis(sentinel), 'Connected to sentinel at ~p', [Sentinel]),
  348    redis(Conn, sentinel(sentinels, Pool), Peers),
  349    transaction(update_known_sentinels(Pool, Sentinel, Peers)).
  350
  351sentinel_auth(Options0, Options) :-
  352    option(sentinel_user(User), Options0),
  353    option(sentinel_password(Passwd), Options0),
  354    !,
  355    merge_options([user(User), password(Passwd)], Options0, Options).
  356sentinel_auth(Options0, Options) :-
  357    select_option(password(_), Options0, Options, _).
  358
  359
  360query_sentinel(Pool, Conn, Host:Port) :-
  361    redis(Conn, sentinel('get-master-addr-by-name', Pool), MasterData),
  362    MasterData = [Host,Port].
  363
  364update_known_sentinels(Pool, Sentinel, Peers) :-
  365    retractall(sentinel(Pool, _)),
  366    maplist(update_peer_sentinel(Pool), Peers),
  367    asserta(sentinel(Pool, Sentinel)).
  368
  369update_peer_sentinel(Pool, Attrs),
  370  memberchk(ip-Host, Attrs),
  371  memberchk(port-Port, Attrs) =>
  372    asserta(sentinel(Pool, Host:Port)).
  373
  374must_have_option(Opt, Options) :-
  375    option(Opt, Options),
  376    !.
  377must_have_option(Opt, Options) :-
  378    existence_error(option, Opt, Options).
 sentinel_slave(+ServerId, +Pool, -Slave, +Options) is nondet
True when Slave is a slave server in the sentinel cluster. Slave is a dict holding the keys and values as described by the Redis command
SENTINEL SLAVES mastername
  387sentinel_slave(ServerId, Pool, Slave, Options) :-
  388    sentinel_connect(ServerId, Pool, Conn, Options),
  389    redis(Conn, sentinel(slaves, Pool), Slaves),
  390    member(Pairs, Slaves),
  391    dict_create(Slave, redis, Pairs).
 hello(+Connection, +Option)
Initialize the connection. This is used to upgrade to the RESP3 protocol and/or to authenticate.
  398hello(Con, Options) :-
  399    option(version(V), Options),
  400    V >= 3,
  401    !,
  402    (   option(user(User), Options),
  403        option(password(Password), Options)
  404    ->  redis(Con, hello(3, auth, User, Password))
  405    ;   redis(Con, hello(3))
  406    ).
  407hello(Con, Options) :-
  408    option(password(Password), Options),
  409    !,
  410    redis(Con, auth(Password)).
  411hello(_, _).
 redis_stream(+Spec, --Stream, +DoConnect) is det
Get the stream to a Redis server from Spec. Spec is either the name of a registered server or a term redis_connection(Id,Stream,Failures,Options). If the stream is disconnected it will be reconnected.
  420redis_stream(Var, S, _) :-
  421    (   var(Var)
  422    ->  !, instantiation_error(Var)
  423    ;   nonvar(S)
  424    ->  !, uninstantiation_error(S)
  425    ).
  426redis_stream(ServerName, S, Connect) :-
  427    atom(ServerName),
  428    !,
  429    (   connection(ServerName, S0)
  430    ->  S = S0
  431    ;   Connect == true,
  432        server(ServerName, Address, Options)
  433    ->  redis_connect(Address, Connection, Options),
  434        redis_stream(Connection, S, false),
  435        asserta(connection(ServerName, S))
  436    ;   existence_error(redis_server, ServerName)
  437    ).
  438redis_stream(redis_connection(_,S0,_,_), S, _) :-
  439    S0 \== (-),
  440    !,
  441    S = S0.
  442redis_stream(Redis, S, _) :-
  443    Redis = redis_connection(Id,-,_,Options),
  444    option(address(Address), Options),
  445    do_connect(Id,Address,Redis2,Options),
  446    arg(2, Redis2, S0),
  447    nb_setarg(2, Redis, S0),
  448    S = S0.
  449
  450has_redis_stream(Var, _) :-
  451    var(Var),
  452    !,
  453    instantiation_error(Var).
  454has_redis_stream(Alias, S) :-
  455    atom(Alias),
  456    !,
  457    connection(Alias, S).
  458has_redis_stream(redis_connection(_,S,_,_), S) :-
  459    S \== (-).
 redis_disconnect(+Connection) is det
 redis_disconnect(+Connection, +Options) is det
Disconnect from a redis server. The second form takes one option, similar to close/2:
force(Force)
When true (default false), do not raise any errors if Connection does not exist or closing the connection raises a network or I/O related exception. This version is used internally if a connection is in a broken state, either due to a protocol error or a network issue.
  475redis_disconnect(Redis) :-
  476    redis_disconnect(Redis, []).
  477
  478redis_disconnect(Redis, Options) :-
  479    option(force(true), Options),
  480    !,
  481    (   Redis = redis_connection(_Id, S, _, _Opts)
  482    ->  (   S == (-)
  483        ->  true
  484        ;   close(S, [force(true)]),
  485            nb_setarg(2, Redis, -)
  486        )
  487    ;   has_redis_stream(Redis, S)
  488    ->  close(S, [force(true)]),
  489        retractall(connection(_,S))
  490    ;   true
  491    ).
  492redis_disconnect(Redis, _Options) :-
  493    redis_stream(Redis, S, false),
  494    close(S),
  495    retractall(connection(_,S)).
 redis(+Connection, +Request) is semidet
This predicate is overloaded to handle two types of requests. First, it is a shorthand for redis(Connection, Command, _) and second, it can be used to exploit Redis pipelines and transactions. The second form is acticated if Request is a list. In that case, each element of the list is either a term Command -> Reply or a simple Command. Semantically this represents a sequence of redis/3 and redis/2 calls. It differs in the following aspects:

Procedurally, the process takes the following steps:

  1. Send all commands
  2. Read all replies and push messages
  3. Handle all callbacks from push messages
  4. Check whether one of the replies is an error. If so, raise this error (subsequent errors are lost)
  5. Bind all replies for the Command -> Reply terms.

Examples

?- redis(default,
         [ lpush(li,1),
           lpush(li,2),
           lrange(li,0,-1) -> List
         ]).
List = ["2", "1"].
  539redis(Redis, PipeLine) :-
  540    is_list(PipeLine),
  541    !,
  542    redis_pipeline(Redis, PipeLine).
  543redis(Redis, Req) :-
  544    redis(Redis, Req, _).
 redis(+Connection, +Command, -Reply) is semidet
Execute a redis Command on Connnection. Next, bind Reply to the returned result. Command is a callable term whose functor is the name of the Redis command and whose arguments are translated to Redis arguments according to the rules below. Note that all text is always represented using UTF-8 encoding.

Reply is either a plain term (often a variable) or a term Value as Type. In the latter form, Type dictates how the Redis bulk reply is translated to Prolog. The default equals to auto, i.e., as a number of the content satisfies the Prolog number syntax and as an atom otherwise.

Redis bulk replies are translated depending on the as Type as explained above.

string
string(Encoding)
Create a SWI-Prolog string object interpreting the blob as following Encoding. Encoding is a restricted set of SWI-Prolog's encodings: bytes (iso_latin_1), utf8 and text (the current locale translation).
atom
atom(Encoding)
As above, producing an atom.
codes
codes(Encoding)
As above, producing a list of integers (Unicode code points)
chars
chars(Encoding)
As above, producing a list of one-character atoms.
integer
float
rational
number
Interpret the bytes as a string representing a number. If the string does not represent a number of the requested type a type_error(Type, String) is raised.
tagged_integer
Same as integer, but demands the value to be between the Prolog flags min_tagged_integer and max_tagged_integer, allowing the value to be used as a dict key.
auto
Same as auto(atom, number)
auto(AsText, AsNumber)
If the bulk string confirms the syntax of AsNumber, convert the value to the requested numberical type. Else convert the value to text according to AsText. This is similar to the Prolog predicate name/2.
dict_key
Alias for auto(atom,tagged_integer). This allows the value to be used as a key for a SWI-Prolog dict.
pairs(AsKey, AsValue)
Convert a map or array of even length into pairs for which the key satisfies AsKey and the value AsValue. The pairs type can also be applied to a Redis array. In this case the array length must be even. This notably allows fetching a Redis hash as pairs using HGETALL using version 2 of the Redis protocol.
dict(AsKey, AsValue)
Similar to pairs(AsKey, AsValue), but convert the resulting pair list into a SWI-Prolog dict. AsKey must convert to a valid dict key, i.e., an atom or tagged integer. See dict_key.
dict(AsValue)
Shorthand for dict(dict_key, AsValue).

Here are some simple examples

?- redis(default, set(a, 42), X).
X = status("OK").
?- redis(default, get(a), X).
X = "42".
?- redis(default, get(a), X as integer).
X = 42.
?- redis(default, get(a), X as float).
X = 42.0.
?- redis(default, set(swipl:version, 8)).
true.
?- redis(default, incr(swipl:version), X).
X = 9.
Errors
- redis_error(Code, String)
  664redis(Redis, Req, Out) :-
  665    out_val(Out, Val),
  666    redis1(Redis, Req, Out),
  667    Val \== nil.
  668
  669out_val(Out, Val) :-
  670    (   nonvar(Out),
  671        Out = (Val as _)
  672    ->  true
  673    ;   Val = Out
  674    ).
  675
  676redis1(Redis, Req, Out) :-
  677    Error = error(Formal, _),
  678    catch(redis2(Redis, Req, Out), Error, true),
  679    (   var(Formal)
  680    ->  true
  681    ;   recover(Error, Redis, redis1(Redis, Req, Out))
  682    ).
  683
  684redis2(Redis, Req, Out) :-
  685    atom(Redis),
  686    !,
  687    redis_stream(Redis, S, true),
  688    with_mutex(Redis,
  689               ( redis_write_msg(S, Req),
  690                 redis_read_stream(Redis, S, Out)
  691               )).
  692redis2(Redis, Req, Out) :-
  693    redis_stream(Redis, S, true),
  694    redis_write_msg(S, Req),
  695    redis_read_stream(Redis, S, Out).
 redis_pipeline(+Redis, +PipeLine)
  699redis_pipeline(Redis, PipeLine) :-
  700    Error = error(Formal, _),
  701    catch(redis_pipeline2(Redis, PipeLine), Error, true),
  702    (   var(Formal)
  703    ->  true
  704    ;   recover(Error, Redis, redis_pipeline(Redis, PipeLine))
  705    ).
  706
  707redis_pipeline2(Redis, PipeLine) :-
  708    atom(Redis),
  709    !,
  710    redis_stream(Redis, S, true),
  711    with_mutex(Redis,
  712               redis_pipeline3(Redis, S, PipeLine)).
  713redis_pipeline2(Redis, PipeLine) :-
  714    redis_stream(Redis, S, true),
  715    redis_pipeline3(Redis, S, PipeLine).
  716
  717redis_pipeline3(Redis, S, PipeLine) :-
  718    maplist(write_pipeline(S), PipeLine),
  719    flush_output(S),
  720    read_pipeline(Redis, S, PipeLine).
  721
  722write_pipeline(S, Command -> _Reply) :-
  723    !,
  724    redis_write_msg_no_flush(S, Command).
  725write_pipeline(S, Command) :-
  726    redis_write_msg_no_flush(S, Command).
  727
  728read_pipeline(Redis, S, PipeLine) :-
  729    E = error(Formal,_),
  730    catch(read_pipeline2(Redis, S, PipeLine), E, true),
  731    (   var(Formal)
  732    ->  true
  733    ;   reconnect_error(E)
  734    ->  redis_disconnect(Redis, [force(true)]),
  735        throw(E)
  736    ;   resync(Redis),
  737        throw(E)
  738    ).
  739
  740read_pipeline2(Redis, S, PipeLine) :-
  741    maplist(redis_read_msg3(S), PipeLine, Replies, Errors, Pushed),
  742    maplist(handle_push(Redis), Pushed),
  743    maplist(handle_error, Errors),
  744    maplist(bind_reply, PipeLine, Replies).
  745
  746redis_read_msg3(S, _Command -> ReplyIn, Reply, Error, Push) :-
  747    !,
  748    redis_read_msg(S, ReplyIn, Reply, Error, Push).
  749redis_read_msg3(S, Var, Reply, Error, Push) :-
  750    redis_read_msg(S, Var, Reply, Error, Push).
  751
  752handle_push(Redis, Pushed) :-
  753    handle_push_messages(Pushed, Redis).
  754handle_error(Error) :-
  755    (   var(Error)
  756    ->  true
  757    ;   throw(Error)
  758    ).
  759bind_reply(_Command -> Reply0, Reply) :-
  760    !,
  761    Reply0 = Reply.
  762bind_reply(_Command, _).
 recover(+Error, +Redis, :Goal)
Error happened while running Goal on Redis. If this is a recoverable error (i.e., a network or disconnected peer), wait a little and try running Goal again.
  771:- meta_predicate recover(+, +, 0).  772
  773recover(Error, Redis, Goal) :-
  774    Error = error(Formal, _),
  775    reconnect_error(Formal),
  776    auto_reconnect(Redis),
  777    !,
  778    debug(redis(recover), '~p: got error ~p; trying to reconnect',
  779          [Redis, Error]),
  780    redis_disconnect(Redis, [force(true)]),
  781    (   wait_to_retry(Redis, Error)
  782    ->  call(Goal),
  783        retractall(failure(Redis, _))
  784    ;   throw(Error)
  785    ).
  786recover(Error, _, _) :-
  787    throw(Error).
  788
  789auto_reconnect(redis_connection(_,_,_,Options)) :-
  790    !,
  791    option(reconnect(true), Options).
  792auto_reconnect(Server) :-
  793    ground(Server),
  794    server(Server, _, Options),
  795    option(reconnect(true), Options, true).
  796
  797reconnect_error(io_error(_Action, _On)).
  798reconnect_error(socket_error(_Code, _)).
  799reconnect_error(syntax_error(unexpected_eof)).
  800reconnect_error(existence_error(stream, _)).
 wait(+Redis, +Error)
Wait for some time after a failure. First we wait for 10ms. This is doubled on each failure upto the setting max_retry_wait. If the setting max_retry_count is exceeded we fail and the called signals an exception.
  809:- dynamic failure/2 as volatile.  810
  811wait_to_retry(Redis, Error) :-
  812    redis_failures(Redis, Failures),
  813    setting(max_retry_count, Count),
  814    Failures < Count,
  815    Failures2 is Failures+1,
  816    redis_set_failures(Redis, Failures2),
  817    setting(max_retry_wait, MaxWait),
  818    Wait is min(MaxWait*100, 1<<Failures)/100.0,
  819    debug(redis(recover), '  Sleeping ~p seconds', [Wait]),
  820    retry_message_level(Failures, Level),
  821    print_message(Level, redis(retry(Redis, Failures, Wait, Error))),
  822    sleep(Wait).
  823
  824redis_failures(redis_connection(_,_,Failures0,_), Failures) :-
  825    !,
  826    Failures = Failures0.
  827redis_failures(Server, Failures) :-
  828    atom(Server),
  829    (   failure(Server, Failures)
  830    ->  true
  831    ;   Failures = 0
  832    ).
  833
  834redis_set_failures(Connection, Count) :-
  835    compound(Connection),
  836    !,
  837    nb_setarg(3, Connection, Count).
  838redis_set_failures(Server, Count) :-
  839    atom(Server),
  840    retractall(failure(Server, _)),
  841    asserta(failure(Server, Count)).
  842
  843retry_message_level(0, warning) :- !.
  844retry_message_level(_, silent).
 redis(+Request)
Connect to the default redis server, call redist/3 using Request, disconnect and print the result. This predicate is intended for interactive usage.
  853redis(Req) :-
  854    setup_call_cleanup(
  855        redis_connect(default, C, []),
  856        redis1(C, Req, Out),
  857        redis_disconnect(C)),
  858    print(Out).
 redis_write(+Redis, +Command) is det
 redis_read(+Redis, -Reply) is det
Write command and read replies from a Redis server. These are building blocks for subscribing to event streams.
  866redis_write(Redis, Command) :-
  867    redis_stream(Redis, S, true),
  868    redis_write_msg(S, Command).
  869
  870redis_read(Redis, Reply) :-
  871    redis_stream(Redis, S, true),
  872    redis_read_stream(Redis, S, Reply).
  873
  874
  875		 /*******************************
  876		 *      HIGH LEVEL ACCESS	*
  877		 *******************************/
 redis_get_list(+Redis, +Key, -List) is det
 redis_get_list(+Redis, +Key, +ChunkSize, -List) is det
Get the content of a Redis list in List. If ChunkSize is given and smaller than the list length, List is returned as a lazy list. The actual values are requested using redis LRANGE requests. Note that this results in O(N^2) complexity. Using a lazy list is most useful for relatively short lists holding possibly large items.

Note that values retrieved are strings, unless the value was added using Term as prolog.

It seems possible for LLEN to return OK. I don't know why. As a work-around we return the empty list rather than an error.

See also
- lazy_list/2 for a discussion on the difference between lazy lists and normal lists.
  897redis_get_list(Redis, Key, List) :-
  898    redis_get_list(Redis, Key, -1, List).
  899
  900redis_get_list(Redis, Key, Chunk, List) :-
  901    redis(Redis, llen(Key), Len),
  902    (   Len == status(ok)
  903    ->  List = []
  904    ;   (   Chunk >= Len
  905        ;   Chunk == -1
  906        )
  907    ->  (   Len == 0
  908        ->  List = []
  909        ;   End is Len-1,
  910            list_range(Redis, Key, 0, End, List)
  911        )
  912    ;   lazy_list(rlist_next(s(Redis,Key,0,Chunk,Len)), List)
  913    ).
  914
  915rlist_next(State, List, Tail) :-
  916    State = s(Redis,Key,Offset,Slice,Len),
  917    End is min(Len-1, Offset+Slice-1),
  918    list_range(Redis, Key, Offset, End, Elems),
  919    (   End =:= Len-1
  920    ->  List = Elems,
  921        Tail = []
  922    ;   Offset2 is Offset+Slice,
  923        nb_setarg(3, State, Offset2),
  924        append(Elems, Tail, List)
  925    ).
  926
  927% Redis LRANGE demands End > Start and returns inclusive.
  928
  929list_range(DB, Key, Start, Start, [Elem]) :-
  930    !,
  931    redis(DB, lindex(Key, Start), Elem).
  932list_range(DB, Key, Start, End, List) :-
  933    !,
  934    redis(DB, lrange(Key, Start, End), List).
 redis_set_list(+Redis, +Key, +List) is det
Associate a Redis key with a list. As Redis has no concept of an empty list, if List is [], Key is deleted. Note that key values are always strings in Redis. The same conversion rules as for redis/1-3 apply.
  945redis_set_list(Redis, Key, List) :-
  946    redis(Redis, del(Key), _),
  947    (   List == []
  948    ->  true
  949    ;   Term =.. [rpush,Key|List],
  950        redis(Redis, Term, _Count)
  951    ).
 redis_get_hash(+Redis, +Key, -Data:dict) is det
 redis_set_hash(+Redis, +Key, +Data:dict) is det
Put/get a Redis hash as a Prolog dict. Putting a dict first deletes Key. Note that in many cases applications will manage Redis hashes by key. redis_get_hash/3 is notably a user friendly alternative to the Redis HGETALL command. If the Redis hash is not used by other (non-Prolog) applications one may also consider using the Term as prolog syntax to store the Prolog dict as-is.
  964redis_get_hash(Redis, Key, Dict) :-
  965    redis(Redis, hgetall(Key), Dict as dict(auto)).
  966
  967redis_set_hash(Redis, Key, Dict) :-
  968    redis_array_dict(Array, _, Dict),
  969    Term =.. [hset,Key|Array],
  970    redis(Redis, del(Key), _),
  971    redis(Redis, Term, _Count).
 redis_array_dict(?Array, ?Tag, ?Dict) is det
Translate a Redis reply representing hash data into a SWI-Prolog dict. Array is either a list of alternating keys and values or a list of pairs. When translating to an array, this is always a list of alternating keys and values.
Arguments:
Tag- is the SWI-Prolog dict tag.
  982redis_array_dict(Array, Tag, Dict) :-
  983    nonvar(Array),
  984    !,
  985    array_to_pairs(Array, Pairs),
  986    dict_pairs(Dict, Tag, Pairs).
  987redis_array_dict(TwoList, Tag, Dict) :-
  988    dict_pairs(Dict, Tag, Pairs),
  989    pairs_to_array(Pairs, TwoList).
  990
  991array_to_pairs([], []) :-
  992    !.
  993array_to_pairs([NameS-Value|T0], [Name-Value|T]) :-
  994    !,                                  % RESP3 returns a map as pairs.
  995    atom_string(Name, NameS),
  996    array_to_pairs(T0, T).
  997array_to_pairs([NameS,Value|T0], [Name-Value|T]) :-
  998    atom_string(Name, NameS),
  999    array_to_pairs(T0, T).
 1000
 1001pairs_to_array([], []) :-
 1002    !.
 1003pairs_to_array([Name-Value|T0], [NameS,Value|T]) :-
 1004    atom_string(Name, NameS),
 1005    pairs_to_array(T0, T).
 redis_scan(+Redis, -LazyList, +Options) is det
 redis_sscan(+Redis, +Set, -LazyList, +Options) is det
 redis_hscan(+Redis, +Hash, -LazyList, +Options) is det
 redis_zscan(+Redis, +Set, -LazyList, +Options) is det
Map the Redis SCAN, SSCAN, HSCAN and ZSCAN` commands into a lazy list. For redis_scan/3 and redis_sscan/4 the result is a list of strings. For redis_hscan/4 and redis_zscan/4, the result is a list of pairs. Options processed:
match(Pattern)
Adds the MATCH subcommand, only returning matches for Pattern.
count(Count)
Adds the COUNT subcommand, giving a hint to the size of the chunks fetched.
type(Type)
Adds the TYPE subcommand, only returning answers of the indicated type.
See also
- lazy_list/2.
 1029redis_scan(Redis, LazyList, Options) :-
 1030    scan_options([match,count,type], Options, Parms),
 1031    lazy_list(scan_next(s(scan,Redis,0,Parms)), LazyList).
 1032
 1033redis_sscan(Redis, Set, LazyList, Options) :-
 1034    scan_options([match,count,type], Options, Parms),
 1035    lazy_list(scan_next(s(sscan(Set),Redis,0,Parms)), LazyList).
 1036
 1037redis_hscan(Redis, Hash, LazyList, Options) :-
 1038    scan_options([match,count,type], Options, Parms),
 1039    lazy_list(scan_next(s(hscan(Hash),Redis,0,Parms)), LazyList).
 1040
 1041redis_zscan(Redis, Set, LazyList, Options) :-
 1042    scan_options([match,count,type], Options, Parms),
 1043    lazy_list(scan_next(s(zscan(Set),Redis,0,Parms)), LazyList).
 1044
 1045scan_options([], _, []).
 1046scan_options([H|T0], Options, [H,V|T]) :-
 1047    Term =.. [H,V],
 1048    option(Term, Options),
 1049    !,
 1050    scan_options(T0, Options, T).
 1051scan_options([_|T0], Options, T) :-
 1052    scan_options(T0, Options, T).
 1053
 1054
 1055scan_next(State, List, Tail) :-
 1056    State = s(Command,Redis,Cursor,Params),
 1057    Command =.. CList,
 1058    append(CList, [Cursor|Params], CList2),
 1059    Term =.. CList2,
 1060    redis(Redis, Term, [NewCursor,Elems0]),
 1061    scan_pairs(Command, Elems0, Elems),
 1062    (   NewCursor == 0
 1063    ->  List = Elems,
 1064        Tail = []
 1065    ;   nb_setarg(3, State, NewCursor),
 1066        append(Elems, Tail, List)
 1067    ).
 1068
 1069scan_pairs(hscan(_), List, Pairs) :-
 1070    !,
 1071    scan_pairs(List, Pairs).
 1072scan_pairs(zscan(_), List, Pairs) :-
 1073    !,
 1074    scan_pairs(List, Pairs).
 1075scan_pairs(_, List, List).
 1076
 1077scan_pairs([], []).
 1078scan_pairs([Key,Value|T0], [Key-Value|T]) :-
 1079    !,
 1080    scan_pairs(T0, T).
 1081scan_pairs([Key-Value|T0], [Key-Value|T]) :-
 1082    scan_pairs(T0, T).
 1083
 1084
 1085		 /*******************************
 1086		 *              ABOUT		*
 1087		 *******************************/
 redis_current_command(+Redis, ?Command) is nondet
 redis_current_command(+Redis, ?Command, -Properties) is nondet
True when Command has Properties. Fails if Command is not defined. The redis_current_command/3 version returns the command argument specification. See Redis documentation for an explanation.
 1096redis_current_command(Redis, Command) :-
 1097    redis_current_command(Redis, Command, _).
 1098
 1099redis_current_command(Redis, Command, Properties) :-
 1100    nonvar(Command),
 1101    !,
 1102    redis(Redis, command(info, Command), [[_|Properties]]).
 1103redis_current_command(Redis, Command, Properties) :-
 1104    redis(Redis, command, Commands),
 1105    member([Name|Properties], Commands),
 1106    atom_string(Command, Name).
 redis_property(+Redis, ?Property) is nondet
True if Property is a property of the Redis server. Currently uses redis(info, String) and parses the result. As this is for machine usage, properties names *_human are skipped.
 1114redis_property(Redis, Property) :-
 1115    redis(Redis, info, String),
 1116    info_terms(String, Terms),
 1117    member(Property, Terms).
 1118
 1119info_terms(Info, Pairs) :-
 1120    split_string(Info, "\n", "\r\n ", Lines),
 1121    convlist(info_line_term, Lines, Pairs).
 1122
 1123info_line_term(Line, Term) :-
 1124    sub_string(Line, B, _, A, :),
 1125    !,
 1126    sub_atom(Line, 0, B, _, Name),
 1127    \+ sub_atom(Name, _, _, 0, '_human'),
 1128    sub_string(Line, _, A, 0, ValueS),
 1129    (   number_string(Value, ValueS)
 1130    ->  true
 1131    ;   Value = ValueS
 1132    ),
 1133    Term =.. [Name,Value].
 1134
 1135
 1136		 /*******************************
 1137		 *            SUBSCRIBE		*
 1138		 *******************************/
 redis_subscribe(+Redis, +Channels, -Id, +Options) is det
Subscribe to one or more Redis PUB/SUB channels. This predicate creates a thread using thread_create/3 with the given Options. Once running, the thread listens for messages. The message content is a string or Prolog term as described in redis/3. On receiving a message, the following message is broadcasted:
redis(Id, Channel, Data)

If redis_unsubscribe/2 removes the last subscription, the thread terminates.

To simply print the incomming messages use e.g.

?- listen(redis(_, Channel, Data),
          format('Channel ~p got ~p~n', [Channel,Data])).
true.
?- redis_subscribe(default, test, Id, []).
Id = redis_pubsub_3,
?- redis(publish(test, "Hello world")).
Channel test got "Hello world"
1
true.
Arguments:
Id- is the thread identifier of the listening thread. Note that the Options alias(Name) can be used to get a system wide name.
 1168:- dynamic ( subscription/2,            % Id, Channel
 1169             listening/3                % Id, Connection, Thread
 1170           ) as volatile. 1171
 1172redis_subscribe(Redis, Spec, Id, Options) :-
 1173    atom(Redis),
 1174    !,
 1175    channels(Spec, Channels),
 1176    pubsub_thread_options(ThreadOptions, Options),
 1177    thread_create(setup_call_cleanup(
 1178                      redis_connect(Redis, Conn, [reconnect(true)]),
 1179                      redis_subscribe1(Redis, Conn, Channels),
 1180                      redis_disconnect(Conn)),
 1181                  Thread,
 1182                  ThreadOptions),
 1183    pubsub_id(Thread, Id).
 1184redis_subscribe(Redis, Spec, Id, Options) :-
 1185    channels(Spec, Channels),
 1186    pubsub_thread_options(ThreadOptions, Options),
 1187    thread_create(redis_subscribe1(Redis, Redis, Channels),
 1188                  Thread,
 1189                  ThreadOptions),
 1190    pubsub_id(Thread, Id).
 1191
 1192pubsub_thread_options(ThreadOptions, Options) :-
 1193    merge_options(Options, [detached(true)], ThreadOptions).
 1194
 1195pubsub_id(Thread, Thread).
 1196%pubsub_id(Thread, Id) :-
 1197%    thread_property(Thread, id(TID)),
 1198%    atom_concat('redis_pubsub_', TID, Id).
 1199
 1200redis_subscribe1(Redis, Conn, Channels) :-
 1201    Error = error(Formal, _),
 1202    catch(redis_subscribe2(Redis, Conn, Channels), Error, true),
 1203    (   var(Formal)
 1204    ->  true
 1205    ;   recover(Error, Conn, redis1(Conn, echo("reconnect"), _)),
 1206        thread_self(Me),
 1207        pubsub_id(Me, Id),
 1208        findall(Channel, subscription(Id, Channel), CurrentChannels),
 1209        redis_subscribe1(Redis, Conn, CurrentChannels)
 1210    ).
 1211
 1212redis_subscribe2(Redis, Conn, Channels) :-
 1213    redis_subscribe3(Conn, Channels),
 1214    redis_listen(Redis, Conn).
 1215
 1216redis_subscribe3(Conn, Channels) :-
 1217    thread_self(Me),
 1218    pubsub_id(Me, Id),
 1219    prolog_listen(this_thread_exit, pubsub_clean(Id)),
 1220    maplist(register_subscription(Id), Channels),
 1221    redis_stream(Conn, S, true),
 1222    Req =.. [subscribe|Channels],
 1223    redis_write_msg(S, Req).
 1224
 1225pubsub_clean(Id) :-
 1226    retractall(listening(Id, _Connection, _Thread)),
 1227    retractall(subscription(Id, _Channel)).
 redis_subscribe(+Id, +Channels) is det
 redis_unsubscribe(+Id, +Channels) is det
Add/remove channels from for the subscription. If no subscriptions remain, the listening thread terminates.
Arguments:
Channels- is either a single channel or a list thereof. Each channel specification is either an atom or a term `A:B:...`, where all parts are atoms.
 1239redis_subscribe(Id, Spec) :-
 1240    channels(Spec, Channels),
 1241    (   listening(Id, Connection, _Thread)
 1242    ->  true
 1243    ;   existence_error(redis_pubsub, Id)
 1244    ),
 1245    maplist(register_subscription(Id), Channels),
 1246    redis_stream(Connection, S, true),
 1247    Req =.. [subscribe|Channels],
 1248    redis_write_msg(S, Req).
 1249
 1250redis_unsubscribe(Id, Spec) :-
 1251    channels(Spec, Channels),
 1252    (   listening(Id, Connection, _Thread)
 1253    ->  true
 1254    ;   existence_error(redis_pubsub, Id)
 1255    ),
 1256    maplist(unregister_subscription(Id), Channels),
 1257    redis_stream(Connection, S, true),
 1258    Req =.. [unsubscribe|Channels],
 1259    redis_write_msg(S, Req).
 redis_current_subscription(?Id, ?Channels)
True when a PUB/SUB subscription with Id is listening on Channels.
 1265redis_current_subscription(Id, Channels) :-
 1266    findall(Id-Channel, subscription(Id, Channel), Pairs),
 1267    keysort(Pairs, Sorted),
 1268    group_pairs_by_key(Sorted, Grouped),
 1269    member(Id-Channels, Grouped).
 1270
 1271channels(Spec, List) :-
 1272    is_list(Spec),
 1273    !,
 1274    maplist(channel_name, Spec, List).
 1275channels(Ch, [Key]) :-
 1276    channel_name(Ch, Key).
 1277
 1278channel_name(Atom, Atom) :-
 1279    atom(Atom),
 1280    !.
 1281channel_name(Key, Atom) :-
 1282    phrase(key_parts(Key), Parts),
 1283    !,
 1284    atomic_list_concat(Parts, :, Atom).
 1285channel_name(Key, _) :-
 1286    type_error(redis_key, Key).
 1287
 1288key_parts(Var) -->
 1289    { var(Var), !, fail }.
 1290key_parts(Atom) -->
 1291    { atom(Atom) },
 1292    !,
 1293    [Atom].
 1294key_parts(A:B) -->
 1295    key_parts(A),
 1296    key_parts(B).
 1297
 1298
 1299
 1300
 1301register_subscription(Id, Channel) :-
 1302    (   subscription(Id, Channel)
 1303    ->  true
 1304    ;   assertz(subscription(Id, Channel))
 1305    ).
 1306
 1307unregister_subscription(Id, Channel) :-
 1308    retractall(subscription(Id, Channel)).
 1309
 1310redis_listen(Redis, Conn) :-
 1311    thread_self(Me),
 1312    pubsub_id(Me, Id),
 1313    setup_call_cleanup(
 1314        assertz(listening(Id, Conn, Me), Ref),
 1315        redis_listen_loop(Redis, Id, Conn),
 1316        erase(Ref)).
 1317
 1318redis_listen_loop(Redis, Id, Conn) :-
 1319    redis_stream(Conn, S, true),
 1320    (   subscription(Id, _)
 1321    ->  redis_read_stream(Redis, S, Reply),
 1322        redis_broadcast(Redis, Reply),
 1323        redis_listen_loop(Redis, Id, Conn)
 1324    ;   true
 1325    ).
 1326
 1327redis_broadcast(_, [subscribe, _Channel, _N]) :-
 1328    !.
 1329redis_broadcast(Redis, [message, Channel, Data]) :-
 1330    !,
 1331    catch(broadcast(redis(Redis, Channel, Data)),
 1332          Error,
 1333          print_message(error, Error)).
 1334redis_broadcast(Redis, Message) :-
 1335    assertion((Message = [Type, Channel, _Data],
 1336               atom(Type),
 1337               atom(Channel))),
 1338    debug(redis(warning), '~p: Unknown message while listening: ~p',
 1339          [Redis,Message]).
 1340
 1341
 1342		 /*******************************
 1343		 *          READ/WRITE		*
 1344		 *******************************/
 redis_read_stream(+Redis, +Stream, -Term) is det
Read a message from a Redis stream. Term is one of

If something goes wrong, the connection is closed and an exception is raised.

 1361redis_read_stream(Redis, SI, Out) :-
 1362    E = error(Formal,_),
 1363    catch(redis_read_msg(SI, Out, Out0, Error, Push), E, true),
 1364    (   var(Formal)
 1365    ->  handle_push_messages(Push, Redis),
 1366        (   var(Error)
 1367        ->  Out = Out0
 1368        ;   resync(Redis),
 1369            throw(Error)
 1370        )
 1371    ;   redis_disconnect(Redis, [force(true)]),
 1372        throw(E)
 1373    ).
 1374
 1375handle_push_messages([], _).
 1376handle_push_messages([H|T], Redis) :-
 1377    (   catch(handle_push_message(H, Redis), E,
 1378              print_message(warning, E))
 1379    ->  true
 1380    ;   true
 1381    ),
 1382    handle_push_messages(T, Redis).
 1383
 1384handle_push_message(["pubsub"|List], Redis) :-
 1385    redis_broadcast(Redis, List).
 1386% some protocol version 3 push messages (such as
 1387% __keyspace@* events) seem to come directly
 1388% without a pubsub header
 1389handle_push_message([message|List], Redis) :-
 1390    redis_broadcast(Redis, [message|List]).
 resync(+Redis) is det
Re-synchronize after an error. This may happen if some type conversion fails and we have read a partial reply. It is hard to figure out what to read from where we are, so we echo a random magic sequence and read until we find the reply.
 1400resync(Redis) :-
 1401    E = error(Formal,_),
 1402    catch(do_resync(Redis), E, true),
 1403    (   var(Formal)
 1404    ->  true
 1405    ;   redis_disconnect(Redis, [force(true)])
 1406    ).
 1407
 1408do_resync(Redis) :-
 1409    A is random(1_000_000_000),
 1410    redis_stream(Redis, S, true),
 1411    redis_write_msg(S, echo(A)),
 1412    catch(call_with_time_limit(0.2, '$redis_resync'(S, A)),
 1413          time_limit_exceeded,
 1414          throw(error(time_limit_exceeded,_))).
 redis_read_msg(+Stream, -Message, -Error, -PushMessages) is det
 redis_write_msg(+Stream, +Message) is det
Read/write a Redis message. Both these predicates are in the foreign module redis4pl.
Arguments:
PushMessages- is a list of push messages that may be non-[] if protocol version 3 (see redis_connect/3) is selected. Using protocol version 2 this list is always empty.
 1429		 /*******************************
 1430		 *            MESSAGES		*
 1431		 *******************************/
 1432
 1433:- multifile
 1434    prolog:error_message//1,
 1435    prolog:message//1. 1436
 1437prolog:error_message(redis_error(Code, String)) -->
 1438    [ 'REDIS: ~w: ~s'-[Code, String] ].
 1439
 1440prolog:message(redis(retry(_Redis, _Failures, Wait, Error))) -->
 1441    [ 'REDIS: connection error.  Retrying in ~2f seconds'-[Wait], nl ],
 1442    [ '    '-[] ], '$messages':translate_message(Error)