View source with raw comments or as raw
    1:- encoding(utf8).
    2/*  Part of SWI-Prolog
    3
    4    Author:        Torbjörn Lager and Jan Wielemaker
    5    E-mail:        J.Wielemaker@vu.nl
    6    WWW:           http://www.swi-prolog.org
    7    Copyright (C): 2014-2019, Torbjörn Lager,
    8                              VU University Amsterdam
    9    All rights reserved.
   10
   11    Redistribution and use in source and binary forms, with or without
   12    modification, are permitted provided that the following conditions
   13    are met:
   14
   15    1. Redistributions of source code must retain the above copyright
   16       notice, this list of conditions and the following disclaimer.
   17
   18    2. Redistributions in binary form must reproduce the above copyright
   19       notice, this list of conditions and the following disclaimer in
   20       the documentation and/or other materials provided with the
   21       distribution.
   22
   23    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
   24    "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
   25    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
   26    FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
   27    COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
   28    INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
   29    BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
   30    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
   31    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   32    LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
   33    ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
   34    POSSIBILITY OF SUCH DAMAGE.
   35*/
   36
   37:- module(pengines,
   38          [ pengine_create/1,                   % +Options
   39            pengine_ask/3,                      % +Pengine, :Query, +Options
   40            pengine_next/2,                     % +Pengine. +Options
   41            pengine_stop/2,                     % +Pengine. +Options
   42            pengine_event/2,                    % -Event, +Options
   43            pengine_input/2,                    % +Prompt, -Term
   44            pengine_output/1,                   % +Term
   45            pengine_respond/3,                  % +Pengine, +Input, +Options
   46            pengine_debug/2,                    % +Format, +Args
   47            pengine_self/1,                     % -Pengine
   48            pengine_pull_response/2,            % +Pengine, +Options
   49            pengine_destroy/1,                  % +Pengine
   50            pengine_destroy/2,                  % +Pengine, +Options
   51            pengine_abort/1,                    % +Pengine
   52            pengine_application/1,              % +Application
   53            current_pengine_application/1,      % ?Application
   54            pengine_property/2,                 % ?Pengine, ?Property
   55            pengine_user/1,                     % -User
   56            pengine_event_loop/2,               % :Closure, +Options
   57            pengine_rpc/2,                      % +Server, :Goal
   58            pengine_rpc/3                       % +Server, :Goal, +Options
   59          ]).

Pengines: Web Logic Programming Made Easy

The library(pengines) provides an infrastructure for creating Prolog engines in a (remote) pengine server and accessing these engines either from Prolog or JavaScript.

author
- Torbjörn Lager and Jan Wielemaker */
   70:- use_module(library(http/http_dispatch)).   71:- use_module(library(http/http_parameters)).   72:- use_module(library(http/http_client)).   73:- use_module(library(http/http_json)).   74:- use_module(library(http/http_open)).   75:- use_module(library(http/http_stream)).   76:- use_module(library(http/http_wrapper)).   77:- use_module(library(http/http_cors)).   78:- use_module(library(thread_pool)).   79:- use_module(library(broadcast)).   80:- use_module(library(uri)).   81:- use_module(library(filesex)).   82:- use_module(library(time)).   83:- use_module(library(lists)).   84:- use_module(library(charsio)).   85:- use_module(library(apply)).   86:- use_module(library(aggregate)).   87:- use_module(library(option)).   88:- use_module(library(settings)).   89:- use_module(library(debug)).   90:- use_module(library(error)).   91:- use_module(library(sandbox)).   92:- use_module(library(modules)).   93:- use_module(library(term_to_json)).   94:- if(exists_source(library(uuid))).   95:- use_module(library(uuid)).   96:- endif.   97
   98
   99:- meta_predicate
  100    pengine_create(:),
  101    pengine_rpc(+, +, :),
  102    pengine_event_loop(1, +).  103
  104:- multifile
  105    write_result/3,                 % +Format, +Event, +Dict
  106    event_to_json/3,                % +Event, -JSON, +Format
  107    prepare_module/3,               % +Module, +Application, +Options
  108    prepare_goal/3,                 % +GoalIn, -GoalOut, +Options
  109    authentication_hook/3,          % +Request, +Application, -User
  110    not_sandboxed/2.                % +User, +App
  111
  112:- predicate_options(pengine_create/1, 1,
  113                     [ id(-atom),
  114                       alias(atom),
  115                       application(atom),
  116                       destroy(boolean),
  117                       server(atom),
  118                       ask(compound),
  119                       template(compound),
  120                       chunk(integer),
  121                       bindings(list),
  122                       src_list(list),
  123                       src_text(any),           % text
  124                       src_url(atom),
  125                       src_predicates(list)
  126                     ]).  127:- predicate_options(pengine_ask/3, 3,
  128                     [ template(any),
  129                       chunk(integer),
  130                       bindings(list)
  131                     ]).  132:- predicate_options(pengine_next/2, 2,
  133                     [ chunk(integer),
  134                       pass_to(pengine_send/3, 3)
  135                     ]).  136:- predicate_options(pengine_stop/2, 2,
  137                     [ pass_to(pengine_send/3, 3)
  138                     ]).  139:- predicate_options(pengine_respond/3, 2,
  140                     [ pass_to(pengine_send/3, 3)
  141                     ]).  142:- predicate_options(pengine_rpc/3, 3,
  143                     [ chunk(integer),
  144                       pass_to(pengine_create/1, 1)
  145                     ]).  146:- predicate_options(pengine_send/3, 3,
  147                     [ delay(number)
  148                     ]).  149:- predicate_options(pengine_event/2, 2,
  150                     [ pass_to(thread_get_message/3, 3)
  151                     ]).  152:- predicate_options(pengine_pull_response/2, 2,
  153                     [ pass_to(http_open/3, 3)
  154                     ]).  155:- predicate_options(pengine_event_loop/2, 2,
  156                     []).                       % not yet implemented
  157
  158% :- debug(pengine(transition)).
  159:- debug(pengine(debug)).               % handle pengine_debug in pengine_rpc/3.
  160
  161goal_expansion(random_delay, Expanded) :-
  162    (   debugging(pengine(delay))
  163    ->  Expanded = do_random_delay
  164    ;   Expanded = true
  165    ).
  166
  167do_random_delay :-
  168    Delay is random(20)/1000,
  169    sleep(Delay).
  170
  171:- meta_predicate                       % internal meta predicates
  172    solve(+, ?, 0, +),
  173    findnsols_no_empty(+, ?, 0, -),
  174    pengine_event_loop(+, 1, +).
 pengine_create(:Options) is det
Creates a new pengine. Valid options are:
id(-ID)
ID gets instantiated to the id of the created pengine. ID is atomic.
alias(+Name)
The pengine is named Name (an atom). A slave pengine (child) can subsequently be referred to by this name.
application(+Application)
Application in which the pengine runs. See pengine_application/1.
server(+URL)
The pengine will run in (and in the Prolog context of) the pengine server located at URL.
src_list(+List_of_clauses)
Inject a list of Prolog clauses into the pengine.
src_text(+Atom_or_string)
Inject the clauses specified by a source text into the pengine.
src_url(+URL)
Inject the clauses specified in the file located at URL into the pengine.
src_predicates(+List)
Send the local predicates denoted by List to the remote pengine. List is a list of predicate indicators.

Remaining options are passed to http_open/3 (meaningful only for non-local pengines) and thread_create/3. Note that for thread_create/3 only options changing the stack-sizes can be used. In particular, do not pass the detached or alias options..

Successful creation of a pengine will return an event term of the following form:

create(ID, Term)
ID is the id of the pengine that was created. Term is not used at the moment.

An error will be returned if the pengine could not be created:

error(ID, Term)
ID is invalid, since no pengine was created. Term is the exception's error term. */
  229pengine_create(M:Options0) :-
  230    translate_local_sources(Options0, Options, M),
  231    (   select_option(server(BaseURL), Options, RestOptions)
  232    ->  remote_pengine_create(BaseURL, RestOptions)
  233    ;   local_pengine_create(Options)
  234    ).
 translate_local_sources(+OptionsIn, -Options, +Module) is det
Translate the src_predicates and src_list options into src_text. We need to do that anyway for remote pengines. For local pengines, we could avoid this step, but there is very little point in transferring source to a local pengine anyway as local pengines can access any Prolog predicate that you make visible to the application.

Multiple sources are concatenated to end up with a single src_text option.

  248translate_local_sources(OptionsIn, Options, Module) :-
  249    translate_local_sources(OptionsIn, Sources, Options2, Module),
  250    (   Sources == []
  251    ->  Options = Options2
  252    ;   Sources = [Source]
  253    ->  Options = [src_text(Source)|Options2]
  254    ;   atomics_to_string(Sources, Source)
  255    ->  Options = [src_text(Source)|Options2]
  256    ).
  257
  258translate_local_sources([], [], [], _).
  259translate_local_sources([H0|T], [S0|S], Options, M) :-
  260    nonvar(H0),
  261    translate_local_source(H0, S0, M),
  262    !,
  263    translate_local_sources(T, S, Options, M).
  264translate_local_sources([H|T0], S, [H|T], M) :-
  265    translate_local_sources(T0, S, T, M).
  266
  267translate_local_source(src_predicates(PIs), Source, M) :-
  268    must_be(list, PIs),
  269    with_output_to(string(Source),
  270                   maplist(list_in_module(M), PIs)).
  271translate_local_source(src_list(Terms), Source, _) :-
  272    must_be(list, Terms),
  273    with_output_to(string(Source),
  274                   forall(member(Term, Terms),
  275                          format('~k .~n', [Term]))).
  276translate_local_source(src_text(Source), Source, _).
  277
  278list_in_module(M, PI) :-
  279    listing(M:PI).
 pengine_send(+NameOrID, +Term) is det
Same as pengine_send(NameOrID, Term, []). */
  286pengine_send(Target, Event) :-
  287    pengine_send(Target, Event, []).
 pengine_send(+NameOrID, +Term, +Options) is det
Succeeds immediately and places Term in the queue of the pengine NameOrID. Options is a list of options:
delay(+Time)
The actual sending is delayed by Time seconds. Time is an integer or a float.

Any remaining options are passed to http_open/3. */

  302pengine_send(Target, Event, Options) :-
  303    must_be(atom, Target),
  304    pengine_send2(Target, Event, Options).
  305
  306pengine_send2(self, Event, Options) :-
  307    !,
  308    thread_self(Queue),
  309    delay_message(queue(Queue), Event, Options).
  310pengine_send2(Name, Event, Options) :-
  311    child(Name, Target),
  312    !,
  313    delay_message(pengine(Target), Event, Options).
  314pengine_send2(Target, Event, Options) :-
  315    delay_message(pengine(Target), Event, Options).
  316
  317delay_message(Target, Event, Options) :-
  318    option(delay(Delay), Options),
  319    !,
  320    alarm(Delay,
  321          send_message(Target, Event, Options),
  322          _AlarmID,
  323          [remove(true)]).
  324delay_message(Target, Event, Options) :-
  325    random_delay,
  326    send_message(Target, Event, Options).
  327
  328send_message(queue(Queue), Event, _) :-
  329    thread_send_message(Queue, pengine_request(Event)).
  330send_message(pengine(Pengine), Event, Options) :-
  331    (   pengine_remote(Pengine, Server)
  332    ->  remote_pengine_send(Server, Pengine, Event, Options)
  333    ;   pengine_thread(Pengine, Thread)
  334    ->  thread_send_message(Thread, pengine_request(Event))
  335    ;   existence_error(pengine, Pengine)
  336    ).
 pengine_request(-Request) is det
To be used by a pengine to wait for the next request. Such messages are placed in the queue by pengine_send/2.
  343pengine_request(Request) :-
  344    pengine_self(Self),
  345    get_pengine_application(Self, Application),
  346    setting(Application:idle_limit, IdleLimit),
  347    thread_self(Me),
  348    (   thread_get_message(Me, pengine_request(Request), [timeout(IdleLimit)])
  349    ->  true
  350    ;   Request = destroy
  351    ).
 pengine_reply(+Event) is det
 pengine_reply(+Queue, +Event) is det
Reply Event to the parent of the current Pengine or the given Queue. Such events are read by the other side with pengine_event/1.

If the message cannot be sent within the idle_limit setting of the pengine, abort the pengine.

  364pengine_reply(Event) :-
  365    pengine_parent(Queue),
  366    pengine_reply(Queue, Event).
  367
  368pengine_reply(_Queue, _Event0) :-
  369    nb_current(pengine_idle_limit_exceeded, true),
  370    !.
  371pengine_reply(Queue, Event0) :-
  372    arg(1, Event0, ID),
  373    wrap_first_answer(ID, Event0, Event),
  374    random_delay,
  375    debug(pengine(event), 'Reply to ~p: ~p', [Queue, Event]),
  376    (   pengine_self(ID),
  377        \+ pengine_detached(ID, _)
  378    ->  get_pengine_application(ID, Application),
  379        setting(Application:idle_limit, IdleLimit),
  380        debug(pengine(reply), 'Sending ~p, timout: ~q', [Event, IdleLimit]),
  381        (   thread_send_message(Queue, pengine_event(ID, Event),
  382                                [ timeout(IdleLimit)
  383                                ])
  384        ->  true
  385        ;   thread_self(Me),
  386            debug(pengine(reply), 'pengine_reply: timeout for ~q (thread ~q)',
  387                  [ID, Me]),
  388            nb_setval(pengine_idle_limit_exceeded, true),
  389            thread_detach(Me),
  390            abort
  391        )
  392    ;   thread_send_message(Queue, pengine_event(ID, Event))
  393    ).
  394
  395wrap_first_answer(ID, Event0, CreateEvent) :-
  396    wrap_first_answer_in_create_event(CreateEvent, [answer(Event0)]),
  397    arg(1, CreateEvent, ID),
  398    !,
  399    retract(wrap_first_answer_in_create_event(CreateEvent, [answer(Event0)])).
  400wrap_first_answer(_ID, Event, Event).
  401
  402
  403empty_queue :-
  404    pengine_parent(Queue),
  405    empty_queue(Queue, 0, Discarded),
  406    debug(pengine(abort), 'Abort: discarded ~D messages', [Discarded]).
  407
  408empty_queue(Queue, C0, C) :-
  409    thread_get_message(Queue, _Term, [timeout(0)]),
  410    !,
  411    C1 is C0+1,
  412    empty_queue(Queue, C1, C).
  413empty_queue(_, C, C).
 pengine_ask(+NameOrID, @Query, +Options) is det
Asks pengine NameOrID a query Query.

Options is a list of options:

template(+Template)
Template is a variable (or a term containing variables) shared with the query. By default, the template is identical to the query.
chunk(+Integer)
Retrieve solutions in chunks of Integer rather than one by one. 1 means no chunking (default). Other integers indicate the maximum number of solutions to retrieve in one chunk.
bindings(+Bindings)
Sets the global variable '$variable_names' to a list of Name = Var terms, providing access to the actual variable names.

Any remaining options are passed to pengine_send/3.

Note that the predicate pengine_ask/3 is deterministic, even for queries that have more than one solution. Also, the variables in Query will not be bound. Instead, results will be returned in the form of event terms.

success(ID, Terms, Projection, Time, More)
ID is the id of the pengine that succeeded in solving the query. Terms is a list holding instantiations of Template. Projection is a list of variable names that should be displayed. Time is the CPU time used to produce the results and finally, More is either true or false, indicating whether we can expect the pengine to be able to return more solutions or not, would we call pengine_next/2.
failure(ID)
ID is the id of the pengine that failed for lack of a solutions.
error(ID, Term)
ID is the id of the pengine throwing the exception. Term is the exception's error term.
output(ID, Term)
ID is the id of a pengine running the query that called pengine_output/1. Term is the term that was passed in the first argument of pengine_output/1 when it was called.
prompt(ID, Term)
ID is the id of the pengine that called pengine_input/2 and Term is the prompt.

Defined in terms of pengine_send/3, like so:

pengine_ask(ID, Query, Options) :-
    partition(pengine_ask_option, Options, AskOptions, SendOptions),
    pengine_send(ID, ask(Query, AskOptions), SendOptions).

*/

  478pengine_ask(ID, Query, Options) :-
  479    partition(pengine_ask_option, Options, AskOptions, SendOptions),
  480    pengine_send(ID, ask(Query, AskOptions), SendOptions).
  481
  482
  483pengine_ask_option(template(_)).
  484pengine_ask_option(chunk(_)).
  485pengine_ask_option(bindings(_)).
  486pengine_ask_option(breakpoints(_)).
 pengine_next(+NameOrID, +Options) is det
Asks pengine NameOrID for the next solution to a query started by pengine_ask/3. Defined options are:
chunk(+Count)
Modify the chunk-size to Count before asking the next set of solutions.

Remaining options are passed to pengine_send/3. The result of re-executing the current goal is returned to the caller's message queue in the form of event terms.

success(ID, Terms, Projection, Time, More)
See pengine_ask/3.
failure(ID)
ID is the id of the pengine that failed for lack of more solutions.
error(ID, Term)
ID is the id of the pengine throwing the exception. Term is the exception's error term.
output(ID, Term)
ID is the id of a pengine running the query that called pengine_output/1. Term is the term that was passed in the first argument of pengine_output/1 when it was called.
prompt(ID, Term)
ID is the id of the pengine that called pengine_input/2 and Term is the prompt.

Defined in terms of pengine_send/3, as follows:

pengine_next(ID, Options) :-
    pengine_send(ID, next, Options).

*/

  530pengine_next(ID, Options) :-
  531    select_option(chunk(Count), Options, Options1),
  532    !,
  533    pengine_send(ID, next(Count), Options1).
  534pengine_next(ID, Options) :-
  535    pengine_send(ID, next, Options).
 pengine_stop(+NameOrID, +Options) is det
Tells pengine NameOrID to stop looking for more solutions to a query started by pengine_ask/3. Options are passed to pengine_send/3.

Defined in terms of pengine_send/3, like so:

pengine_stop(ID, Options) :-
    pengine_send(ID, stop, Options).

*/

  551pengine_stop(ID, Options) :- pengine_send(ID, stop, Options).
 pengine_abort(+NameOrID) is det
Aborts the running query. The pengine goes back to state `2', waiting for new queries.
See also
- pengine_destroy/1. */
  562pengine_abort(Name) :-
  563    (   child(Name, Pengine)
  564    ->  true
  565    ;   Pengine = Name
  566    ),
  567    (   pengine_remote(Pengine, Server)
  568    ->  remote_pengine_abort(Server, Pengine, [])
  569    ;   pengine_thread(Pengine, Thread),
  570        debug(pengine(abort), 'Signalling thread ~p', [Thread]),
  571        catch(thread_signal(Thread, throw(abort_query)), _, true)
  572    ).
 pengine_destroy(+NameOrID) is det
 pengine_destroy(+NameOrID, +Options) is det
Destroys the pengine NameOrID. With the option force(true), the pengine is killed using abort/0 and pengine_destroy/2 succeeds. */
  582pengine_destroy(ID) :-
  583    pengine_destroy(ID, []).
  584
  585pengine_destroy(Name, Options) :-
  586    (   child(Name, ID)
  587    ->  true
  588    ;   ID = Name
  589    ),
  590    option(force(true), Options),
  591    !,
  592    (   pengine_thread(ID, Thread)
  593    ->  catch(thread_signal(Thread, abort),
  594              error(existence_error(thread, _), _), true)
  595    ;   true
  596    ).
  597pengine_destroy(ID, _) :-
  598    catch(pengine_send(ID, destroy),
  599          error(existence_error(pengine, ID), _),
  600          retractall(child(_,ID))).
  601
  602
  603/*================= pengines administration =======================
  604*/
 current_pengine(?Id, ?Parent, ?Location)
Dynamic predicate that registers our known pengines. Id is an atomic unique datatype. Parent is the id of our parent pengine. Location is one of
  615:- dynamic
  616    current_pengine/6,              % Id, ParentId, Thread, URL, App, Destroy
  617    pengine_queue/4,                % Id, Queue, TimeOut, Time
  618    output_queue/3,                 % Id, Queue, Time
  619    pengine_user/2,                 % Id, User
  620    pengine_data/2,                 % Id, Data
  621    pengine_detached/2.             % Id, Data
  622:- volatile
  623    current_pengine/6,
  624    pengine_queue/4,
  625    output_queue/3,
  626    pengine_user/2,
  627    pengine_data/2,
  628    pengine_detached/2.  629
  630:- thread_local
  631    child/2.                        % ?Name, ?Child
 pengine_register_local(+Id, +Thread, +Queue, +URL, +App, +Destroy) is det
 pengine_register_remote(+Id, +URL, +Queue, +App, +Destroy) is det
 pengine_unregister(+Id) is det
  637pengine_register_local(Id, Thread, Queue, URL, Application, Destroy) :-
  638    asserta(current_pengine(Id, Queue, Thread, URL, Application, Destroy)).
  639
  640pengine_register_remote(Id, URL, Application, Destroy) :-
  641    thread_self(Queue),
  642    asserta(current_pengine(Id, Queue, 0, URL, Application, Destroy)).
 pengine_unregister(+Id)
Called by the pengine thread destruction. If we are a remote pengine thread, our URL equals http and the queue is the message queue used to send events to the HTTP workers.
  650pengine_unregister(Id) :-
  651    thread_self(Me),
  652    (   current_pengine(Id, Queue, Me, http, _, _)
  653    ->  with_mutex(pengine, sync_destroy_queue_from_pengine(Id, Queue))
  654    ;   true
  655    ),
  656    retractall(current_pengine(Id, _, Me, _, _, _)),
  657    retractall(pengine_user(Id, _)),
  658    retractall(pengine_data(Id, _)).
  659
  660pengine_unregister_remote(Id) :-
  661    retractall(current_pengine(Id, _Parent, 0, _, _, _)).
 pengine_self(-Id) is det
True if the current thread is a pengine with Id.
  667pengine_self(Id) :-
  668    thread_self(Thread),
  669    current_pengine(Id, _Parent, Thread, _URL, _Application, _Destroy).
  670
  671pengine_parent(Parent) :-
  672    nb_getval(pengine_parent, Parent).
  673
  674pengine_thread(Pengine, Thread) :-
  675    current_pengine(Pengine, _Parent, Thread, _URL, _Application, _Destroy),
  676    Thread \== 0,
  677    !.
  678
  679pengine_remote(Pengine, URL) :-
  680    current_pengine(Pengine, _Parent, 0, URL, _Application, _Destroy).
  681
  682get_pengine_application(Pengine, Application) :-
  683    current_pengine(Pengine, _Parent, _, _URL, Application, _Destroy),
  684    !.
  685
  686get_pengine_module(Pengine, Pengine).
  687
  688:- if(current_predicate(uuid/2)).  689pengine_uuid(Id) :-
  690    uuid(Id, [version(4)]).             % Version 4 is random.
  691:- else.  692:- use_module(library(random)).  693pengine_uuid(Id) :-
  694    Max is 1<<128,
  695    random_between(0, Max, Num),
  696    atom_number(Id, Num).
  697:- endif.
 protect_pengine(+Id, :Goal) is semidet
Run Goal while protecting the Pengine Id from being destroyed. Used by the HTTP I/O routines to avoid that the Pengine's module disappears while I/O is in progress. We use a pool of locks because the lock may be held relatively long by output routines.

This also runs Goal if the Pengine no longer exists. This deals with Pengines terminated through destroy_or_continue/1.

bug
- After destroy_or_continue/1 takes the destroy route, the module may drop-out at any point in time, resulting in a possible crash. Seems the only safe way out is to do (de)serialization inside the Pengine.
  714:- meta_predicate protect_pengine(+, 0).  715
  716protect_pengine(Id, Goal) :-
  717    term_hash(Id, Hash),
  718    LockN is Hash mod 64,
  719    atom_concat(pengine_done_, LockN, Lock),
  720    with_mutex(Lock,
  721               (   pengine_thread(Id, _)
  722               ->  Goal
  723               ;   Goal
  724               )).
 pengine_application(+Application) is det
Directive that must be used to declare a pengine application module. The module must not be associated to any file. The default application is pengine_sandbox. The example below creates a new application address_book and imports the API defined in the module file adress_book_api.pl into the application.
:- pengine_application(address_book).
:- use_module(address_book:adress_book_api).

*/

  741pengine_application(Application) :-
  742    throw(error(context_error(nodirective,
  743                             pengine_application(Application)), _)).
  744
  745:- multifile
  746    system:term_expansion/2,
  747    current_application/1.
 current_pengine_application(?Application) is nondet
True when Application is a currently defined application.
See also
- pengine_application/1
  755current_pengine_application(Application) :-
  756    current_application(Application).
  757
  758
  759% Default settings for all applications
  760
  761:- setting(thread_pool_size, integer, 100,
  762           'Maximum number of pengines this application can run.').  763:- setting(thread_pool_stacks, list(compound), [],
  764           'Maximum stack sizes for pengines this application can run.').  765:- setting(slave_limit, integer, 3,
  766           'Maximum number of slave pengines a master pengine can create.').  767:- setting(time_limit, number, 300,
  768           'Maximum time to wait for output').  769:- setting(idle_limit, number, 300,
  770           'Pengine auto-destroys when idle for this time').  771:- setting(safe_goal_limit, number, 10,
  772           'Maximum time to try proving safety of the goal').  773:- setting(program_space, integer, 100_000_000,
  774           'Maximum memory used by predicates').  775:- setting(allow_from, list(atom), [*],
  776           'IP addresses from which remotes are allowed to connect').  777:- setting(deny_from, list(atom), [],
  778           'IP addresses from which remotes are NOT allowed to connect').  779:- setting(debug_info, boolean, false,
  780           'Keep information to support source-level debugging').  781
  782
  783system:term_expansion((:- pengine_application(Application)), Expanded) :-
  784    must_be(atom, Application),
  785    (   module_property(Application, file(_))
  786    ->  permission_error(create, pengine_application, Application)
  787    ;   true
  788    ),
  789    expand_term((:- setting(Application:thread_pool_size, integer,
  790                            setting(pengines:thread_pool_size),
  791                            'Maximum number of pengines this \c
  792                            application can run.')),
  793                ThreadPoolSizeSetting),
  794    expand_term((:- setting(Application:thread_pool_stacks, list(compound),
  795                            setting(pengines:thread_pool_stacks),
  796                            'Maximum stack sizes for pengines \c
  797                            this application can run.')),
  798                ThreadPoolStacksSetting),
  799    expand_term((:- setting(Application:slave_limit, integer,
  800                            setting(pengines:slave_limit),
  801                            'Maximum number of local slave pengines \c
  802                            a master pengine can create.')),
  803                SlaveLimitSetting),
  804    expand_term((:- setting(Application:time_limit, number,
  805                            setting(pengines:time_limit),
  806                            'Maximum time to wait for output')),
  807                TimeLimitSetting),
  808    expand_term((:- setting(Application:idle_limit, number,
  809                            setting(pengines:idle_limit),
  810                            'Pengine auto-destroys when idle for this time')),
  811                IdleLimitSetting),
  812    expand_term((:- setting(Application:safe_goal_limit, number,
  813                            setting(pengines:safe_goal_limit),
  814                            'Maximum time to try proving safety of the goal')),
  815                SafeGoalLimitSetting),
  816    expand_term((:- setting(Application:program_space, integer,
  817                            setting(pengines:program_space),
  818                            'Maximum memory used by predicates')),
  819                ProgramSpaceSetting),
  820    expand_term((:- setting(Application:allow_from, list(atom),
  821                            setting(pengines:allow_from),
  822                            'IP addresses from which remotes are allowed \c
  823                            to connect')),
  824                AllowFromSetting),
  825    expand_term((:- setting(Application:deny_from, list(atom),
  826                            setting(pengines:deny_from),
  827                            'IP addresses from which remotes are NOT \c
  828                            allowed to connect')),
  829                DenyFromSetting),
  830    expand_term((:- setting(Application:debug_info, boolean,
  831                            setting(pengines:debug_info),
  832                            'Keep information to support source-level \c
  833                            debugging')),
  834                DebugInfoSetting),
  835    flatten([ pengines:current_application(Application),
  836              ThreadPoolSizeSetting,
  837              ThreadPoolStacksSetting,
  838              SlaveLimitSetting,
  839              TimeLimitSetting,
  840              IdleLimitSetting,
  841              SafeGoalLimitSetting,
  842              ProgramSpaceSetting,
  843              AllowFromSetting,
  844              DenyFromSetting,
  845              DebugInfoSetting
  846            ], Expanded).
  847
  848% Register default application
  849
  850:- pengine_application(pengine_sandbox).
 pengine_property(?Pengine, ?Property) is nondet
True when Property is a property of the given Pengine. Enumerates all pengines that are known to the calling Prolog process. Defined properties are:
self(ID)
Identifier of the pengine. This is the same as the first argument, and can be used to enumerate all known pengines.
alias(Name)
Name is the alias name of the pengine, as provided through the alias option when creating the pengine.
thread(Thread)
If the pengine is a local pengine, Thread is the Prolog thread identifier of the pengine.
remote(Server)
If the pengine is remote, the URL of the server.
application(Application)
Pengine runs the given application
module(Module)
Temporary module used for running the Pengine.
destroy(Destroy)
Destroy is true if the pengines is destroyed automatically after completing the query.
parent(Queue)
Message queue to which the (local) pengine reports.
source(?SourceID, ?Source)
Source is the source code with the given SourceID. May be present if the setting debug_info is present.
detached(?Time)
Pengine was detached at Time. */
  887pengine_property(Id, Prop) :-
  888    nonvar(Id), nonvar(Prop),
  889    pengine_property2(Id, Prop),
  890    !.
  891pengine_property(Id, Prop) :-
  892    pengine_property2(Prop, Id).
  893
  894pengine_property2(self(Id), Id) :-
  895    current_pengine(Id, _Parent, _Thread, _URL, _Application, _Destroy).
  896pengine_property2(module(Id), Id) :-
  897    current_pengine(Id, _Parent, _Thread, _URL, _Application, _Destroy).
  898pengine_property2(alias(Alias), Id) :-
  899    child(Alias, Id),
  900    Alias \== Id.
  901pengine_property2(thread(Thread), Id) :-
  902    current_pengine(Id, _Parent, Thread, _URL, _Application, _Destroy),
  903    Thread \== 0.
  904pengine_property2(remote(Server), Id) :-
  905    current_pengine(Id, _Parent, 0, Server, _Application, _Destroy).
  906pengine_property2(application(Application), Id) :-
  907    current_pengine(Id, _Parent, _Thread, _Server, Application, _Destroy).
  908pengine_property2(destroy(Destroy), Id) :-
  909    current_pengine(Id, _Parent, _Thread, _Server, _Application, Destroy).
  910pengine_property2(parent(Parent), Id) :-
  911    current_pengine(Id, Parent, _Thread, _URL, _Application, _Destroy).
  912pengine_property2(source(SourceID, Source), Id) :-
  913    pengine_data(Id, source(SourceID, Source)).
  914pengine_property2(detached(When), Id) :-
  915    pengine_detached(Id, When).
 pengine_output(+Term) is det
Sends Term to the parent pengine or thread. */
  922pengine_output(Term) :-
  923    pengine_self(Me),
  924    pengine_reply(output(Me, Term)).
 pengine_debug(+Format, +Args) is det
Create a message using format/3 from Format and Args and send this to the client. The default JavaScript client will call console.log(Message) if there is a console. The predicate pengine_rpc/3 calls debug(pengine(debug), '~w', [Message]). The debug topic pengine(debug) is enabled by default.
See also
- debug/1 and nodebug/1 for controlling the pengine(debug) topic
- format/2 for format specifications */
  939pengine_debug(Format, Args) :-
  940    pengine_parent(Queue),
  941    pengine_self(Self),
  942    catch(safe_goal(format(atom(_), Format, Args)), E, true),
  943    (   var(E)
  944    ->  format(atom(Message), Format, Args)
  945    ;   message_to_string(E, Message)
  946    ),
  947    pengine_reply(Queue, debug(Self, Message)).
  948
  949
  950/*================= Local pengine =======================
  951*/
 local_pengine_create(+Options)
Creates a local Pengine, which is a thread running pengine_main/2. It maintains two predicates:
  962local_pengine_create(Options) :-
  963    thread_self(Self),
  964    option(application(Application), Options, pengine_sandbox),
  965    create(Self, Child, Options, local, Application),
  966    option(alias(Name), Options, Child),
  967    assert(child(Name, Child)).
 thread_pool:create_pool(+Application) is det
On demand creation of a thread pool for a pengine application.
  974thread_pool:create_pool(Application) :-
  975    current_application(Application),
  976    setting(Application:thread_pool_size, Size),
  977    setting(Application:thread_pool_stacks, Stacks),
  978    thread_pool_create(Application, Size, Stacks).
 create(+Queue, -Child, +Options, +URL, +Application) is det
Create a new pengine thread.
Arguments:
Queue- is the queue (or thread handle) to report to
Child- is the identifier of the created pengine.
URL- is one of local or http
  988create(Queue, Child, Options, local, Application) :-
  989    !,
  990    pengine_child_id(Child),
  991    create0(Queue, Child, Options, local, Application).
  992create(Queue, Child, Options, URL, Application) :-
  993    pengine_child_id(Child),
  994    catch(create0(Queue, Child, Options, URL, Application),
  995          Error,
  996          create_error(Queue, Child, Error)).
  997
  998pengine_child_id(Child) :-
  999    (   nonvar(Child)
 1000    ->  true
 1001    ;   pengine_uuid(Child)
 1002    ).
 1003
 1004create_error(Queue, Child, Error) :-
 1005    pengine_reply(Queue, error(Child, Error)).
 1006
 1007create0(Queue, Child, Options, URL, Application) :-
 1008    (  current_application(Application)
 1009    -> true
 1010    ;  existence_error(pengine_application, Application)
 1011    ),
 1012    (   URL \== http                    % pengine is _not_ a child of the
 1013                                        % HTTP server thread
 1014    ->  aggregate_all(count, child(_,_), Count),
 1015        setting(Application:slave_limit, Max),
 1016        (   Count >= Max
 1017        ->  throw(error(resource_error(max_pengines), _))
 1018        ;   true
 1019        )
 1020    ;   true
 1021    ),
 1022    partition(pengine_create_option, Options, PengineOptions, RestOptions),
 1023    thread_create_in_pool(
 1024        Application,
 1025        pengine_main(Queue, PengineOptions, Application), ChildThread,
 1026        [ at_exit(pengine_done)
 1027        | RestOptions
 1028        ]),
 1029    option(destroy(Destroy), PengineOptions, true),
 1030    pengine_register_local(Child, ChildThread, Queue, URL, Application, Destroy),
 1031    thread_send_message(ChildThread, pengine_registered(Child)),
 1032    (   option(id(Id), Options)
 1033    ->  Id = Child
 1034    ;   true
 1035    ).
 1036
 1037pengine_create_option(src_text(_)).
 1038pengine_create_option(src_url(_)).
 1039pengine_create_option(application(_)).
 1040pengine_create_option(destroy(_)).
 1041pengine_create_option(ask(_)).
 1042pengine_create_option(template(_)).
 1043pengine_create_option(bindings(_)).
 1044pengine_create_option(chunk(_)).
 1045pengine_create_option(alias(_)).
 1046pengine_create_option(user(_)).
 pengine_done is det
Called from the pengine thread at_exit option. Destroys child pengines using pengine_destroy/1. Cleaning up the Pengine is synchronised by the pengine_done mutex. See read_event/6.
 1055:- public
 1056    pengine_done/0. 1057
 1058pengine_done :-
 1059    thread_self(Me),
 1060    (   thread_property(Me, status(exception('$aborted'))),
 1061        thread_detach(Me),
 1062        pengine_self(Pengine)
 1063    ->  catch(pengine_reply(destroy(Pengine, abort(Pengine))),
 1064              error(_,_), true)
 1065    ;   true
 1066    ),
 1067    forall(child(_Name, Child),
 1068           pengine_destroy(Child)),
 1069    pengine_self(Id),
 1070    protect_pengine(Id, pengine_unregister(Id)).
 pengine_main(+Parent, +Options, +Application)
Run a pengine main loop. First acknowledges its creation and run pengine_main_loop/1.
 1078:- thread_local wrap_first_answer_in_create_event/2. 1079
 1080:- meta_predicate
 1081    pengine_prepare_source(:, +). 1082
 1083pengine_main(Parent, Options, Application) :-
 1084    fix_streams,
 1085    thread_get_message(pengine_registered(Self)),
 1086    nb_setval(pengine_parent, Parent),
 1087    pengine_register_user(Options),
 1088    set_prolog_flag(mitigate_spectre, true),
 1089    catch(in_temporary_module(
 1090              Self,
 1091              pengine_prepare_source(Application, Options),
 1092              pengine_create_and_loop(Self, Application, Options)),
 1093          prepare_source_failed,
 1094          pengine_terminate(Self)).
 1095
 1096pengine_create_and_loop(Self, Application, Options) :-
 1097    setting(Application:slave_limit, SlaveLimit),
 1098    CreateEvent = create(Self, [slave_limit(SlaveLimit)|Extra]),
 1099    (   option(ask(Query0), Options)
 1100    ->  asserta(wrap_first_answer_in_create_event(CreateEvent, Extra)),
 1101        (   string(Query0)                      % string is not callable
 1102        ->  (   option(template(TemplateS), Options)
 1103            ->  Ask2 = Query0-TemplateS
 1104            ;   Ask2 = Query0
 1105            ),
 1106            catch(ask_to_term(Ask2, Self, Query, Template, Bindings),
 1107                  Error, true),
 1108            (   var(Error)
 1109            ->  true
 1110            ;   send_error(Error),
 1111                throw(prepare_source_failed)
 1112            )
 1113        ;   Query = Query0,
 1114            option(template(Template), Options, Query),
 1115            option(bindings(Bindings), Options, [])
 1116        ),
 1117        option(chunk(Chunk), Options, 1),
 1118        pengine_ask(Self, Query,
 1119                    [ template(Template),
 1120                      chunk(Chunk),
 1121                      bindings(Bindings)
 1122                    ])
 1123    ;   Extra = [],
 1124        pengine_reply(CreateEvent)
 1125    ),
 1126    pengine_main_loop(Self).
 ask_to_term(+AskSpec, +Module, -Options, OptionsTail) is det
Translate the AskSpec into a query, template and bindings. The trick is that we must parse using the operator declarations of the source and we must make sure variable sharing between query and answer template are known.
 1136ask_to_term(Ask-Template, Module, Ask1, Template1, Bindings) :-
 1137    !,
 1138    format(string(AskTemplate), 't((~s),(~s))', [Template, Ask]),
 1139    term_string(t(Template1,Ask1), AskTemplate,
 1140                [ variable_names(Bindings0),
 1141                  module(Module)
 1142                ]),
 1143    phrase(template_bindings(Template1, Bindings0), Bindings).
 1144ask_to_term(Ask, Module, Ask1, Template, Bindings1) :-
 1145    term_string(Ask1, Ask,
 1146                [ variable_names(Bindings),
 1147                  module(Module)
 1148                ]),
 1149    exclude(anon, Bindings, Bindings1),
 1150    dict_create(Template, swish_default_template, Bindings1).
 1151
 1152template_bindings(Var, Bindings) -->
 1153    { var(Var) }, !,
 1154    (   { var_binding(Bindings, Var, Binding)
 1155        }
 1156    ->  [Binding]
 1157    ;   []
 1158    ).
 1159template_bindings([H|T], Bindings) -->
 1160    !,
 1161    template_bindings(H, Bindings),
 1162    template_bindings(T, Bindings).
 1163template_bindings(Compoound, Bindings) -->
 1164    { compound(Compoound), !,
 1165      compound_name_arguments(Compoound, _, Args)
 1166    },
 1167    template_bindings(Args, Bindings).
 1168template_bindings(_, _) --> [].
 1169
 1170var_binding(Bindings, Var, Binding) :-
 1171    member(Binding, Bindings),
 1172    arg(2, Binding, V),
 1173    V == Var, !.
 fix_streams is det
If we are a pengine that is created from a web server thread, the current output points to a CGI stream.
 1180fix_streams :-
 1181    fix_stream(current_output).
 1182
 1183fix_stream(Name) :-
 1184    is_cgi_stream(Name),
 1185    !,
 1186    debug(pengine(stream), '~w is a CGI stream!', [Name]),
 1187    set_stream(user_output, alias(Name)).
 1188fix_stream(_).
 pengine_prepare_source(:Application, +Options) is det
Load the source into the pengine's module.
throws
- prepare_source_failed if it failed to prepare the sources.
 1197pengine_prepare_source(Module:Application, Options) :-
 1198    setting(Application:program_space, SpaceLimit),
 1199    set_module(Module:program_space(SpaceLimit)),
 1200    delete_import_module(Module, user),
 1201    add_import_module(Module, Application, start),
 1202    catch(prep_module(Module, Application, Options), Error, true),
 1203    (   var(Error)
 1204    ->  true
 1205    ;   send_error(Error),
 1206        throw(prepare_source_failed)
 1207    ).
 1208
 1209prep_module(Module, Application, Options) :-
 1210    maplist(copy_flag(Module, Application), [var_prefix]),
 1211    forall(prepare_module(Module, Application, Options), true),
 1212    setup_call_cleanup(
 1213        '$set_source_module'(OldModule, Module),
 1214        maplist(process_create_option(Module), Options),
 1215        '$set_source_module'(OldModule)).
 1216
 1217copy_flag(Module, Application, Flag) :-
 1218    current_prolog_flag(Application:Flag, Value),
 1219    !,
 1220    set_prolog_flag(Module:Flag, Value).
 1221copy_flag(_, _, _).
 1222
 1223process_create_option(Application, src_text(Text)) :-
 1224    !,
 1225    pengine_src_text(Text, Application).
 1226process_create_option(Application, src_url(URL)) :-
 1227    !,
 1228    pengine_src_url(URL, Application).
 1229process_create_option(_, _).
 prepare_module(+Module, +Application, +Options) is semidet
Hook, called to initialize the temporary private module that provides the working context of a pengine. This hook is executed by the pengine's thread. Preparing the source consists of three steps:
  1. Add Application as (first) default import module for Module
  2. Call this hook
  3. Compile the source provided by the the src_text and src_url options
Arguments:
Module- is a new temporary module (see in_temporary_module/3) that may be (further) prepared by this hook.
Application- (also a module) associated to the pengine.
Options- is passed from the environment and should (currently) be ignored.
 1252pengine_main_loop(ID) :-
 1253    catch(guarded_main_loop(ID), abort_query, pengine_aborted(ID)).
 1254
 1255pengine_aborted(ID) :-
 1256    thread_self(Self),
 1257    debug(pengine(abort), 'Aborting ~p (thread ~p)', [ID, Self]),
 1258    empty_queue,
 1259    destroy_or_continue(abort(ID)).
 guarded_main_loop(+Pengine) is det
Executes state `2' of the pengine, where it waits for two events:
destroy
Terminate the pengine
ask(:Goal, +Options)
Solve Goal.
 1272guarded_main_loop(ID) :-
 1273    pengine_request(Request),
 1274    (   Request = destroy
 1275    ->  debug(pengine(transition), '~q: 2 = ~q => 1', [ID, destroy]),
 1276        pengine_terminate(ID)
 1277    ;   Request = ask(Goal, Options)
 1278    ->  debug(pengine(transition), '~q: 2 = ~q => 3', [ID, ask(Goal)]),
 1279        ask(ID, Goal, Options)
 1280    ;   debug(pengine(transition), '~q: 2 = ~q => 2', [ID, protocol_error]),
 1281        pengine_reply(error(ID, error(protocol_error, _))),
 1282        guarded_main_loop(ID)
 1283    ).
 1284
 1285
 1286pengine_terminate(ID) :-
 1287    pengine_reply(destroy(ID)),
 1288    thread_self(Me),            % Make the thread silently disappear
 1289    thread_detach(Me).
 solve(+Chunk, +Template, :Goal, +ID) is det
Solve Goal. Note that because we can ask for a new goal in state `6', we must provide for an ancesteral cut (prolog_cut_to/1). We need to be sure to have a choice point before we can call prolog_current_choice/1. This is the reason why this predicate has two clauses.
 1300solve(Chunk, Template, Goal, ID) :-
 1301    prolog_current_choice(Choice),
 1302    State = count(Chunk),
 1303    statistics(cputime, Epoch),
 1304    Time = time(Epoch),
 1305    nb_current('$variable_names', Bindings),
 1306    filter_template(Template, Bindings, Template2),
 1307    '$current_typein_module'(CurrTypeIn),
 1308    (   '$set_typein_module'(ID),
 1309        call_cleanup(catch(findnsols_no_empty(State, Template2,
 1310                                              set_projection(Goal, Bindings),
 1311                                              Result),
 1312                           Error, true),
 1313                     query_done(Det, CurrTypeIn)),
 1314        arg(1, Time, T0),
 1315        statistics(cputime, T1),
 1316        CPUTime is T1-T0,
 1317        (   var(Error)
 1318        ->  projection(Projection),
 1319            (   var(Det)
 1320            ->  pengine_reply(success(ID, Result, Projection,
 1321                                      CPUTime, true)),
 1322                more_solutions(ID, Choice, State, Time)
 1323            ;   !,                      % commit
 1324                destroy_or_continue(success(ID, Result, Projection,
 1325                                            CPUTime, false))
 1326            )
 1327        ;   !,                          % commit
 1328            (   Error == abort_query
 1329            ->  throw(Error)
 1330            ;   destroy_or_continue(error(ID, Error))
 1331            )
 1332        )
 1333    ;   !,                              % commit
 1334        arg(1, Time, T0),
 1335        statistics(cputime, T1),
 1336        CPUTime is T1-T0,
 1337        destroy_or_continue(failure(ID, CPUTime))
 1338    ).
 1339solve(_, _, _, _).                      % leave a choice point
 1340
 1341query_done(true, CurrTypeIn) :-
 1342    '$set_typein_module'(CurrTypeIn).
 set_projection(:Goal, +Bindings)
findnsols/4 copies its goal and template to avoid instantiation thereof when it stops after finding N solutions. Using this helper we can a renamed version of Bindings that we can set.
 1351set_projection(Goal, Bindings) :-
 1352    b_setval('$variable_names', Bindings),
 1353    call(Goal).
 1354
 1355projection(Projection) :-
 1356    nb_current('$variable_names', Bindings),
 1357    !,
 1358    maplist(var_name, Bindings, Projection).
 1359projection([]).
 filter_template(+Template0, +Bindings, -Template) is det
Establish the final template. This is there because hooks such as goal_expansion/2 and the SWISH query hooks can modify the set of bindings.
bug
- Projection and template handling is pretty messy.
 1369filter_template(Template0, Bindings, Template) :-
 1370    is_dict(Template0, swish_default_template),
 1371    !,
 1372    dict_create(Template, swish_default_template, Bindings).
 1373filter_template(Template, _Bindings, Template).
 1374
 1375findnsols_no_empty(N, Template, Goal, List) :-
 1376    findnsols(N, Template, Goal, List),
 1377    List \== [].
 1378
 1379destroy_or_continue(Event) :-
 1380    arg(1, Event, ID),
 1381    (   pengine_property(ID, destroy(true))
 1382    ->  thread_self(Me),
 1383        thread_detach(Me),
 1384        pengine_reply(destroy(ID, Event))
 1385    ;   pengine_reply(Event),
 1386        garbage_collect,                % minimise our footprint
 1387        trim_stacks,
 1388        guarded_main_loop(ID)
 1389    ).
 more_solutions(+Pengine, +Choice, +State, +Time)
Called after a solution was found while there can be more. This is state `6' of the state machine. It processes these events:
stop
Go back via state `7' to state `2' (guarded_main_loop/1)
next
Fail. This causes solve/3 to backtrack on the goal asked, providing at most the current chunk solutions.
next(Count)
As next, but sets the new chunk-size to Count.
ask(Goal, Options)
Ask another goal. Note that we must commit the choice point of the previous goal asked for.
 1407more_solutions(ID, Choice, State, Time) :-
 1408    pengine_request(Event),
 1409    more_solutions(Event, ID, Choice, State, Time).
 1410
 1411more_solutions(stop, ID, _Choice, _State, _Time) :-
 1412    !,
 1413    debug(pengine(transition), '~q: 6 = ~q => 7', [ID, stop]),
 1414    destroy_or_continue(stop(ID)).
 1415more_solutions(next, ID, _Choice, _State, Time) :-
 1416    !,
 1417    debug(pengine(transition), '~q: 6 = ~q => 3', [ID, next]),
 1418    statistics(cputime, T0),
 1419    nb_setarg(1, Time, T0),
 1420    fail.
 1421more_solutions(next(Count), ID, _Choice, State, Time) :-
 1422    Count > 0,
 1423    !,
 1424    debug(pengine(transition), '~q: 6 = ~q => 3', [ID, next(Count)]),
 1425    nb_setarg(1, State, Count),
 1426    statistics(cputime, T0),
 1427    nb_setarg(1, Time, T0),
 1428    fail.
 1429more_solutions(ask(Goal, Options), ID, Choice, _State, _Time) :-
 1430    !,
 1431    debug(pengine(transition), '~q: 6 = ~q => 3', [ID, ask(Goal)]),
 1432    prolog_cut_to(Choice),
 1433    ask(ID, Goal, Options).
 1434more_solutions(destroy, ID, _Choice, _State, _Time) :-
 1435    !,
 1436    debug(pengine(transition), '~q: 6 = ~q => 1', [ID, destroy]),
 1437    pengine_terminate(ID).
 1438more_solutions(Event, ID, Choice, State, Time) :-
 1439    debug(pengine(transition), '~q: 6 = ~q => 6', [ID, protocol_error(Event)]),
 1440    pengine_reply(error(ID, error(protocol_error, _))),
 1441    more_solutions(ID, Choice, State, Time).
 ask(+Pengine, :Goal, +Options)
Migrate from state `2' to `3'. This predicate validates that it is safe to call Goal using safe_goal/1 and then calls solve/3 to prove the goal. It takes care of the chunk(N) option.
 1449ask(ID, Goal, Options) :-
 1450    catch(prepare_goal(ID, Goal, Goal1, Options), Error, true),
 1451    !,
 1452    (   var(Error)
 1453    ->  option(template(Template), Options, Goal),
 1454        option(chunk(N), Options, 1),
 1455        solve(N, Template, Goal1, ID)
 1456    ;   pengine_reply(error(ID, Error)),
 1457        guarded_main_loop(ID)
 1458    ).
 prepare_goal(+Pengine, +GoalIn, -GoalOut, +Options) is det
Prepare GoalIn for execution in Pengine. This implies we must perform goal expansion and, if the system is sandboxed, check the sandbox.

Note that expand_goal(Module:GoalIn, GoalOut) is what we'd like to write, but this does not work correctly if the user wishes to expand X:Y while interpreting X not as the module in which to run Y. This happens in the CQL package. Possibly we should disallow this reinterpretation?

 1472prepare_goal(ID, Goal0, Module:Goal, Options) :-
 1473    option(bindings(Bindings), Options, []),
 1474    b_setval('$variable_names', Bindings),
 1475    (   prepare_goal(Goal0, Goal1, Options)
 1476    ->  true
 1477    ;   Goal1 = Goal0
 1478    ),
 1479    get_pengine_module(ID, Module),
 1480    setup_call_cleanup(
 1481        '$set_source_module'(Old, Module),
 1482        expand_goal(Goal1, Goal),
 1483        '$set_source_module'(_, Old)),
 1484    (   pengine_not_sandboxed(ID)
 1485    ->  true
 1486    ;   get_pengine_application(ID, App),
 1487        setting(App:safe_goal_limit, Limit),
 1488        catch(call_with_time_limit(
 1489                  Limit,
 1490                  safe_goal(Module:Goal)), E, true)
 1491    ->  (   var(E)
 1492        ->  true
 1493        ;   E = time_limit_exceeded
 1494        ->  throw(error(sandbox(time_limit_exceeded, Limit),_))
 1495        ;   throw(E)
 1496        )
 1497    ).
 prepare_goal(+Goal0, -Goal1, +Options) is semidet
Pre-preparation hook for running Goal0. The hook runs in the context of the pengine. Goal is the raw goal given to ask. The returned Goal1 is subject to goal expansion (expand_goal/2) and sandbox validation (safe_goal/1) prior to execution. If this goal fails, Goal0 is used for further processing.
Arguments:
Options- provides the options as given to ask
 pengine_not_sandboxed(+Pengine) is semidet
True when pengine does not operate in sandboxed mode. This implies a user must be registered by authentication_hook/3 and the hook pengines:not_sandboxed(User, Application) must succeed.
 1517pengine_not_sandboxed(ID) :-
 1518    pengine_user(ID, User),
 1519    pengine_property(ID, application(App)),
 1520    not_sandboxed(User, App),
 1521    !.
 not_sandboxed(+User, +Application) is semidet
This hook is called to see whether the Pengine must be executed in a protected environment. It is only called after authentication_hook/3 has confirmed the authentity of the current user. If this hook succeeds, both loading the code and executing the query is executed without enforcing sandbox security. Typically, one should:
  1. Provide a safe user authentication hook.
  2. Enable HTTPS in the server or put it behind an HTTPS proxy and ensure that the network between the proxy and the pengine server can be trusted.
 pengine_pull_response(+Pengine, +Options) is det
Pulls a response (an event term) from the slave Pengine if Pengine is a remote process, else does nothing at all. */
 1543pengine_pull_response(Pengine, Options) :-
 1544    pengine_remote(Pengine, Server),
 1545    !,
 1546    remote_pengine_pull_response(Server, Pengine, Options).
 1547pengine_pull_response(_ID, _Options).
 pengine_input(+Prompt, -Term) is det
Sends Prompt to the master (parent) pengine and waits for input. Note that Prompt may be any term, compound as well as atomic. */
 1556pengine_input(Prompt, Term) :-
 1557    pengine_self(Self),
 1558    pengine_parent(Parent),
 1559    pengine_reply(Parent, prompt(Self, Prompt)),
 1560    pengine_request(Request),
 1561    (   Request = input(Input)
 1562    ->  Term = Input
 1563    ;   Request == destroy
 1564    ->  abort
 1565    ;   throw(error(protocol_error,_))
 1566    ).
 pengine_respond(+Pengine, +Input, +Options) is det
Sends a response in the form of the term Input to a slave (child) pengine that has prompted its master (parent) for input.

Defined in terms of pengine_send/3, as follows:

pengine_respond(Pengine, Input, Options) :-
    pengine_send(Pengine, input(Input), Options).

*/

 1583pengine_respond(Pengine, Input, Options) :-
 1584    pengine_send(Pengine, input(Input), Options).
 send_error(+Error) is det
Send an error to my parent. Remove non-readable blobs from the error term first using replace_blobs/2. If the error contains a stack-trace, this is resolved to a string before sending.
 1593send_error(error(Formal, context(prolog_stack(Frames), Message))) :-
 1594    is_list(Frames),
 1595    !,
 1596    with_output_to(string(Stack),
 1597                   print_prolog_backtrace(current_output, Frames)),
 1598    pengine_self(Self),
 1599    replace_blobs(Formal, Formal1),
 1600    replace_blobs(Message, Message1),
 1601    pengine_reply(error(Self, error(Formal1,
 1602                                    context(prolog_stack(Stack), Message1)))).
 1603send_error(Error) :-
 1604    pengine_self(Self),
 1605    replace_blobs(Error, Error1),
 1606    pengine_reply(error(Self, Error1)).
 replace_blobs(Term0, Term) is det
Copy Term0 to Term, replacing non-text blobs. This is required for error messages that may hold streams and other handles to non-readable objects.
 1614replace_blobs(Blob, Atom) :-
 1615    blob(Blob, Type), Type \== text,
 1616    !,
 1617    format(atom(Atom), '~p', [Blob]).
 1618replace_blobs(Term0, Term) :-
 1619    compound(Term0),
 1620    !,
 1621    compound_name_arguments(Term0, Name, Args0),
 1622    maplist(replace_blobs, Args0, Args),
 1623    compound_name_arguments(Term, Name, Args).
 1624replace_blobs(Term, Term).
 1625
 1626
 1627/*================= Remote pengines =======================
 1628*/
 1629
 1630
 1631remote_pengine_create(BaseURL, Options) :-
 1632    partition(pengine_create_option, Options, PengineOptions0, RestOptions),
 1633        (       option(ask(Query), PengineOptions0),
 1634                \+ option(template(_Template), PengineOptions0)
 1635        ->      PengineOptions = [template(Query)|PengineOptions0]
 1636        ;       PengineOptions = PengineOptions0
 1637        ),
 1638    options_to_dict(PengineOptions, PostData),
 1639    remote_post_rec(BaseURL, create, PostData, Reply, RestOptions),
 1640    arg(1, Reply, ID),
 1641    (   option(id(ID2), Options)
 1642    ->  ID = ID2
 1643    ;   true
 1644    ),
 1645    option(alias(Name), Options, ID),
 1646    assert(child(Name, ID)),
 1647    (   (   functor(Reply, create, _)   % actually created
 1648        ;   functor(Reply, output, _)   % compiler messages
 1649        )
 1650    ->  option(application(Application), PengineOptions, pengine_sandbox),
 1651        option(destroy(Destroy), PengineOptions, true),
 1652        pengine_register_remote(ID, BaseURL, Application, Destroy)
 1653    ;   true
 1654    ),
 1655    thread_self(Queue),
 1656    pengine_reply(Queue, Reply).
 1657
 1658options_to_dict(Options, Dict) :-
 1659    select_option(ask(Ask), Options, Options1),
 1660    select_option(template(Template), Options1, Options2),
 1661    !,
 1662    no_numbered_var_in(Ask+Template),
 1663    findall(AskString-TemplateString,
 1664            ask_template_to_strings(Ask, Template, AskString, TemplateString),
 1665            [ AskString-TemplateString ]),
 1666    options_to_dict(Options2, Dict0),
 1667    Dict = Dict0.put(_{ask:AskString,template:TemplateString}).
 1668options_to_dict(Options, Dict) :-
 1669    maplist(prolog_option, Options, Options1),
 1670    dict_create(Dict, _, Options1).
 1671
 1672no_numbered_var_in(Term) :-
 1673    sub_term(Sub, Term),
 1674    subsumes_term('$VAR'(_), Sub),
 1675    !,
 1676    domain_error(numbered_vars_free_term, Term).
 1677no_numbered_var_in(_).
 1678
 1679ask_template_to_strings(Ask, Template, AskString, TemplateString) :-
 1680    numbervars(Ask+Template, 0, _),
 1681    WOpts = [ numbervars(true), ignore_ops(true), quoted(true) ],
 1682    format(string(AskTemplate), '~W\n~W', [ Ask, WOpts,
 1683                                            Template, WOpts
 1684                                          ]),
 1685    split_string(AskTemplate, "\n", "", [AskString, TemplateString]).
 1686
 1687prolog_option(Option0, Option) :-
 1688    create_option_type(Option0, term),
 1689    !,
 1690    Option0 =.. [Name,Value],
 1691    format(string(String), '~k', [Value]),
 1692    Option =.. [Name,String].
 1693prolog_option(Option, Option).
 1694
 1695create_option_type(ask(_),         term).
 1696create_option_type(template(_),    term).
 1697create_option_type(application(_), atom).
 1698
 1699remote_pengine_send(BaseURL, ID, Event, Options) :-
 1700    remote_send_rec(BaseURL, send, ID, [event=Event], Reply, Options),
 1701    thread_self(Queue),
 1702    pengine_reply(Queue, Reply).
 1703
 1704remote_pengine_pull_response(BaseURL, ID, Options) :-
 1705    remote_send_rec(BaseURL, pull_response, ID, [], Reply, Options),
 1706    thread_self(Queue),
 1707    pengine_reply(Queue, Reply).
 1708
 1709remote_pengine_abort(BaseURL, ID, Options) :-
 1710    remote_send_rec(BaseURL, abort, ID, [], Reply, Options),
 1711    thread_self(Queue),
 1712    pengine_reply(Queue, Reply).
 remote_send_rec(+Server, +Action, +ID, +Params, -Reply, +Options)
Issue a GET request on Server and unify Reply with the replied term.
 1719remote_send_rec(Server, Action, ID, [event=Event], Reply, Options) :-
 1720    !,
 1721    server_url(Server, Action, [id=ID], URL),
 1722    http_open(URL, Stream,              % putting this in setup_call_cleanup/3
 1723              [ post(prolog(Event))     % makes it impossible to interrupt.
 1724              | Options
 1725              ]),
 1726    call_cleanup(
 1727        read_prolog_reply(Stream, Reply),
 1728        close(Stream)).
 1729remote_send_rec(Server, Action, ID, Params, Reply, Options) :-
 1730    server_url(Server, Action, [id=ID|Params], URL),
 1731    http_open(URL, Stream, Options),
 1732    call_cleanup(
 1733        read_prolog_reply(Stream, Reply),
 1734        close(Stream)).
 1735
 1736remote_post_rec(Server, Action, Data, Reply, Options) :-
 1737    server_url(Server, Action, [], URL),
 1738    probe(Action, URL),
 1739    http_open(URL, Stream,
 1740              [ post(json(Data))
 1741              | Options
 1742              ]),
 1743    call_cleanup(
 1744        read_prolog_reply(Stream, Reply),
 1745        close(Stream)).
 probe(+Action, +URL) is det
Probe the target. This is a good idea before posting a large document and be faced with an authentication challenge. Possibly we should make this an option for simpler scenarios.
 1753probe(create, URL) :-
 1754    !,
 1755    http_open(URL, Stream, [method(options)]),
 1756    close(Stream).
 1757probe(_, _).
 1758
 1759read_prolog_reply(In, Reply) :-
 1760    set_stream(In, encoding(utf8)),
 1761    read(In, Reply0),
 1762    rebind_cycles(Reply0, Reply).
 1763
 1764rebind_cycles(@(Reply, Bindings), Reply) :-
 1765    is_list(Bindings),
 1766    !,
 1767    maplist(bind, Bindings).
 1768rebind_cycles(Reply, Reply).
 1769
 1770bind(Var = Value) :-
 1771    Var = Value.
 1772
 1773server_url(Server, Action, Params, URL) :-
 1774    uri_components(Server, Components0),
 1775    uri_query_components(Query, Params),
 1776    uri_data(path, Components0, Path0),
 1777    atom_concat('pengine/', Action, PAction),
 1778    directory_file_path(Path0, PAction, Path),
 1779    uri_data(path, Components0, Path, Components),
 1780    uri_data(search, Components, Query),
 1781    uri_components(URL, Components).
 pengine_event(?EventTerm) is det
 pengine_event(?EventTerm, +Options) is det
Examines the pengine's event queue and if necessary blocks execution until a term that unifies to Term arrives in the queue. After a term from the queue has been unified to Term, the term is deleted from the queue.

Valid options are:

timeout(+Time)
Time is a float or integer and specifies the maximum time to wait in seconds. If no event has arrived before the time is up EventTerm is bound to the atom timeout.
listen(+Id)
Only listen to events from the pengine identified by Id. */
 1802pengine_event(Event) :-
 1803    pengine_event(Event, []).
 1804
 1805pengine_event(Event, Options) :-
 1806    thread_self(Self),
 1807    option(listen(Id), Options, _),
 1808    (   thread_get_message(Self, pengine_event(Id, Event), Options)
 1809    ->  true
 1810    ;   Event = timeout
 1811    ),
 1812    update_remote_destroy(Event).
 1813
 1814update_remote_destroy(Event) :-
 1815    destroy_event(Event),
 1816    arg(1, Event, Id),
 1817    pengine_remote(Id, _Server),
 1818    !,
 1819    pengine_unregister_remote(Id).
 1820update_remote_destroy(_).
 1821
 1822destroy_event(destroy(_)).
 1823destroy_event(destroy(_,_)).
 1824destroy_event(create(_,Features)) :-
 1825    memberchk(answer(Answer), Features),
 1826    !,
 1827    nonvar(Answer),
 1828    destroy_event(Answer).
 pengine_event_loop(:Closure, +Options) is det
Starts an event loop accepting event terms sent to the current pengine or thread. For each such event E, calls ignore(call(Closure, E)). A closure thus acts as a handler for the event. Some events are also treated specially:
create(ID, Term)
The ID is placed in a list of active pengines.
destroy(ID)
The ID is removed from the list of active pengines. When the last pengine ID is removed, the loop terminates.
output(ID, Term)
The predicate pengine_pull_response/2 is called.

Valid options are:

autoforward(+To)
Forwards received event terms to slaves. To is either all, all_but_sender or a Prolog list of NameOrIDs. [not yet implemented]

*/

 1857pengine_event_loop(Closure, Options) :-
 1858    child(_,_),
 1859    !,
 1860    pengine_event(Event),
 1861    (   option(autoforward(all), Options) % TODO: Implement all_but_sender and list of IDs
 1862    ->  forall(child(_,ID), pengine_send(ID, Event))
 1863    ;   true
 1864    ),
 1865    pengine_event_loop(Event, Closure, Options).
 1866pengine_event_loop(_, _).
 1867
 1868:- meta_predicate
 1869    pengine_process_event(+, 1, -, +). 1870
 1871pengine_event_loop(Event, Closure, Options) :-
 1872    pengine_process_event(Event, Closure, Continue, Options),
 1873    (   Continue == true
 1874    ->  pengine_event_loop(Closure, Options)
 1875    ;   true
 1876    ).
 1877
 1878pengine_process_event(create(ID, T), Closure, Continue, Options) :-
 1879    debug(pengine(transition), '~q: 1 = /~q => 2', [ID, create(T)]),
 1880    (   select(answer(First), T, T1)
 1881    ->  ignore(call(Closure, create(ID, T1))),
 1882        pengine_process_event(First, Closure, Continue, Options)
 1883    ;   ignore(call(Closure, create(ID, T))),
 1884        Continue = true
 1885    ).
 1886pengine_process_event(output(ID, Msg), Closure, true, _Options) :-
 1887    debug(pengine(transition), '~q: 3 = /~q => 4', [ID, output(Msg)]),
 1888    ignore(call(Closure, output(ID, Msg))),
 1889    pengine_pull_response(ID, []).
 1890pengine_process_event(debug(ID, Msg), Closure, true, _Options) :-
 1891    debug(pengine(transition), '~q: 3 = /~q => 4', [ID, debug(Msg)]),
 1892    ignore(call(Closure, debug(ID, Msg))),
 1893    pengine_pull_response(ID, []).
 1894pengine_process_event(prompt(ID, Term), Closure, true, _Options) :-
 1895    debug(pengine(transition), '~q: 3 = /~q => 5', [ID, prompt(Term)]),
 1896    ignore(call(Closure, prompt(ID, Term))).
 1897pengine_process_event(success(ID, Sol, _Proj, _Time, More), Closure, true, _) :-
 1898    debug(pengine(transition), '~q: 3 = /~q => 6/2', [ID, success(Sol, More)]),
 1899    ignore(call(Closure, success(ID, Sol, More))).
 1900pengine_process_event(failure(ID, _Time), Closure, true, _Options) :-
 1901    debug(pengine(transition), '~q: 3 = /~q => 2', [ID, failure]),
 1902    ignore(call(Closure, failure(ID))).
 1903pengine_process_event(error(ID, Error), Closure, Continue, _Options) :-
 1904    debug(pengine(transition), '~q: 3 = /~q => 2', [ID, error(Error)]),
 1905    (   call(Closure, error(ID, Error))
 1906    ->  Continue = true
 1907    ;   forall(child(_,Child), pengine_destroy(Child)),
 1908        throw(Error)
 1909    ).
 1910pengine_process_event(stop(ID), Closure, true, _Options) :-
 1911    debug(pengine(transition), '~q: 7 = /~q => 2', [ID, stop]),
 1912    ignore(call(Closure, stop(ID))).
 1913pengine_process_event(destroy(ID, Event), Closure, Continue, Options) :-
 1914    pengine_process_event(Event, Closure, _, Options),
 1915    pengine_process_event(destroy(ID), Closure, Continue, Options).
 1916pengine_process_event(destroy(ID), Closure, true, _Options) :-
 1917    retractall(child(_,ID)),
 1918    debug(pengine(transition), '~q: 1 = /~q => 0', [ID, destroy]),
 1919    ignore(call(Closure, destroy(ID))).
 pengine_rpc(+URL, +Query) is nondet
 pengine_rpc(+URL, +Query, +Options) is nondet
Semantically equivalent to the sequence below, except that the query is executed in (and in the Prolog context of) the pengine server referred to by URL, rather than locally.
  copy_term_nat(Query, Copy),  % attributes are not copied to the server
  call(Copy),			 % executed on server at URL
  Query = Copy.

Valid options are:

chunk(+Integer)
Can be used to reduce the number of network roundtrips being made. See pengine_ask/3.
timeout(+Time)
Wait at most Time seconds for the next event from the server. The default is defined by the setting pengines:time_limit.

Remaining options (except the server option) are passed to pengine_create/1. */

 1948pengine_rpc(URL, Query) :-
 1949    pengine_rpc(URL, Query, []).
 1950
 1951pengine_rpc(URL, Query, M:Options0) :-
 1952    translate_local_sources(Options0, Options1, M),
 1953    (  option(timeout(_), Options1)
 1954    -> Options = Options1
 1955    ;  setting(time_limit, Limit),
 1956       Options = [timeout(Limit)|Options1]
 1957    ),
 1958    term_variables(Query, Vars),
 1959    Template =.. [v|Vars],
 1960    State = destroy(true),              % modified by process_event/4
 1961    setup_call_catcher_cleanup(
 1962        pengine_create([ ask(Query),
 1963                         template(Template),
 1964                         server(URL),
 1965                         id(Id)
 1966                       | Options
 1967                       ]),
 1968        wait_event(Template, State, [listen(Id)|Options]),
 1969        Why,
 1970        pengine_destroy_and_wait(State, Id, Why)).
 1971
 1972pengine_destroy_and_wait(destroy(true), Id, Why) :-
 1973    !,
 1974    debug(pengine(rpc), 'Destroying RPC because of ~p', [Why]),
 1975    pengine_destroy(Id),
 1976    wait_destroy(Id, 10).
 1977pengine_destroy_and_wait(_, _, Why) :-
 1978    debug(pengine(rpc), 'Not destroying RPC (~p)', [Why]).
 1979
 1980wait_destroy(Id, _) :-
 1981    \+ child(_, Id),
 1982    !.
 1983wait_destroy(Id, N) :-
 1984    pengine_event(Event, [listen(Id),timeout(10)]),
 1985    !,
 1986    (   destroy_event(Event)
 1987    ->  retractall(child(_,Id))
 1988    ;   succ(N1, N)
 1989    ->  wait_destroy(Id, N1)
 1990    ;   debug(pengine(rpc), 'RPC did not answer to destroy ~p', [Id]),
 1991        pengine_unregister_remote(Id),
 1992        retractall(child(_,Id))
 1993    ).
 1994
 1995wait_event(Template, State, Options) :-
 1996    pengine_event(Event, Options),
 1997    debug(pengine(event), 'Received ~p', [Event]),
 1998    process_event(Event, Template, State, Options).
 1999
 2000process_event(create(_ID, Features), Template, State, Options) :-
 2001    memberchk(answer(First), Features),
 2002    process_event(First, Template, State, Options).
 2003process_event(error(_ID, Error), _Template, _, _Options) :-
 2004    throw(Error).
 2005process_event(failure(_ID, _Time), _Template, _, _Options) :-
 2006    fail.
 2007process_event(prompt(ID, Prompt), Template, State, Options) :-
 2008    pengine_rpc_prompt(ID, Prompt, Reply),
 2009    pengine_send(ID, input(Reply)),
 2010    wait_event(Template, State, Options).
 2011process_event(output(ID, Term), Template, State, Options) :-
 2012    pengine_rpc_output(ID, Term),
 2013    pengine_pull_response(ID, Options),
 2014    wait_event(Template, State, Options).
 2015process_event(debug(ID, Message), Template, State, Options) :-
 2016    debug(pengine(debug), '~w', [Message]),
 2017    pengine_pull_response(ID, Options),
 2018    wait_event(Template, State, Options).
 2019process_event(success(_ID, Solutions, _Proj, _Time, false),
 2020              Template, _, _Options) :-
 2021    !,
 2022    member(Template, Solutions).
 2023process_event(success(ID, Solutions, _Proj, _Time, true),
 2024              Template, State, Options) :-
 2025    (   member(Template, Solutions)
 2026    ;   pengine_next(ID, Options),
 2027        wait_event(Template, State, Options)
 2028    ).
 2029process_event(destroy(ID, Event), Template, State, Options) :-
 2030    !,
 2031    retractall(child(_,ID)),
 2032    nb_setarg(1, State, false),
 2033    debug(pengine(destroy), 'State: ~p~n', [State]),
 2034    process_event(Event, Template, State, Options).
 2035% compatibility with older versions of the protocol.
 2036process_event(success(ID, Solutions, Time, More),
 2037              Template, State, Options) :-
 2038    process_event(success(ID, Solutions, _Proj, Time, More),
 2039                  Template, State, Options).
 2040
 2041
 2042pengine_rpc_prompt(ID, Prompt, Term) :-
 2043    prompt(ID, Prompt, Term0),
 2044    !,
 2045    Term = Term0.
 2046pengine_rpc_prompt(_ID, Prompt, Term) :-
 2047    setup_call_cleanup(
 2048        prompt(Old, Prompt),
 2049        read(Term),
 2050        prompt(_, Old)).
 2051
 2052pengine_rpc_output(ID, Term) :-
 2053    output(ID, Term),
 2054    !.
 2055pengine_rpc_output(_ID, Term) :-
 2056    print(Term).
 prompt(+ID, +Prompt, -Term) is semidet
Hook to handle pengine_input/2 from the remote pengine. If the hooks fails, pengine_rpc/3 calls read/1 using the current prompt.
 2063:- multifile prompt/3.
 output(+ID, +Term) is semidet
Hook to handle pengine_output/1 from the remote pengine. If the hook fails, it calls print/1 on Term.
 2070:- multifile output/2. 2071
 2072
 2073/*================= HTTP handlers =======================
 2074*/
 2075
 2076%   Declare  HTTP  locations  we  serve  and   how.  Note  that  we  use
 2077%   time_limit(inifinite) because pengines have their  own timeout. Also
 2078%   note that we use spawn. This  is   needed  because we can easily get
 2079%   many clients waiting for  some  action   on  a  pengine to complete.
 2080%   Without spawning, we would quickly exhaust   the  worker pool of the
 2081%   HTTP server.
 2082%
 2083%   FIXME: probably we should wait for a   short time for the pengine on
 2084%   the default worker thread. Only if  that   time  has expired, we can
 2085%   call http_spawn/2 to continue waiting on   a  new thread. That would
 2086%   improve the performance and reduce the usage of threads.
 2087
 2088:- http_handler(root(pengine),               http_404([]),
 2089                [ id(pengines) ]). 2090:- http_handler(root(pengine/create),        http_pengine_create,
 2091                [ time_limit(infinite), spawn([]) ]). 2092:- http_handler(root(pengine/send),          http_pengine_send,
 2093                [ time_limit(infinite), spawn([]) ]). 2094:- http_handler(root(pengine/pull_response), http_pengine_pull_response,
 2095                [ time_limit(infinite), spawn([]) ]). 2096:- http_handler(root(pengine/abort),         http_pengine_abort,         []). 2097:- http_handler(root(pengine/detach),        http_pengine_detach,        []). 2098:- http_handler(root(pengine/list),          http_pengine_list,          []). 2099:- http_handler(root(pengine/ping),          http_pengine_ping,          []). 2100:- http_handler(root(pengine/destroy_all),   http_pengine_destroy_all,   []). 2101
 2102:- http_handler(root(pengine/'pengines.js'),
 2103                http_reply_file(library('http/web/js/pengines.js'), []), []). 2104:- http_handler(root(pengine/'plterm.css'),
 2105                http_reply_file(library('http/web/css/plterm.css'), []), []).
 http_pengine_create(+Request)
HTTP POST handler for =/pengine/create=. This API accepts the pengine creation parameters both as application/json and as www-form-encoded. Accepted parameters:
ParameterDefaultComment
formatprologOutput format
applicationpengine_sandboxPengine application
chunk1Chunk-size for results
solutionschunkedIf all, emit all results
ask-The query
template-Output template
src_text""Program
src_url-Program to download
disposition-Download location

Note that solutions=all internally uses chunking to obtain the results from the pengine, but the results are combined in a single HTTP reply. This is currently only implemented by the CSV backend that is part of SWISH for downloading unbounded result sets with limited memory resources.

 2132http_pengine_create(Request) :-
 2133    reply_options(Request, [post]),
 2134    !.
 2135http_pengine_create(Request) :-
 2136    memberchk(content_type(CT), Request),
 2137    sub_atom(CT, 0, _, _, 'application/json'),
 2138    !,
 2139    http_read_json_dict(Request, Dict),
 2140    dict_atom_option(format, Dict, Format, prolog),
 2141    dict_atom_option(application, Dict, Application, pengine_sandbox),
 2142    http_pengine_create(Request, Application, Format, Dict).
 2143http_pengine_create(Request) :-
 2144    Optional = [optional(true)],
 2145    OptString = [string|Optional],
 2146    Form = [ format(Format, [default(prolog)]),
 2147             application(Application, [default(pengine_sandbox)]),
 2148             chunk(_, [integer, default(1)]),
 2149             solutions(_, [oneof([all,chunked]), default(chunked)]),
 2150             ask(_, OptString),
 2151             template(_, OptString),
 2152             src_text(_, OptString),
 2153             disposition(_, OptString),
 2154             src_url(_, Optional)
 2155           ],
 2156    http_parameters(Request, Form),
 2157    form_dict(Form, Dict),
 2158    http_pengine_create(Request, Application, Format, Dict).
 2159
 2160dict_atom_option(Key, Dict, Atom, Default) :-
 2161    (   get_dict(Key, Dict, String)
 2162    ->  atom_string(Atom, String)
 2163    ;   Atom = Default
 2164    ).
 2165
 2166form_dict(Form, Dict) :-
 2167    form_values(Form, Pairs),
 2168    dict_pairs(Dict, _, Pairs).
 2169
 2170form_values([], []).
 2171form_values([H|T], Pairs) :-
 2172    arg(1, H, Value),
 2173    nonvar(Value),
 2174    !,
 2175    functor(H, Name, _),
 2176    Pairs = [Name-Value|PairsT],
 2177    form_values(T, PairsT).
 2178form_values([_|T], Pairs) :-
 2179    form_values(T, Pairs).
 http_pengine_create(+Request, +Application, +Format, +OptionsDict)
 2184http_pengine_create(Request, Application, Format, Dict) :-
 2185    current_application(Application),
 2186    !,
 2187    allowed(Request, Application),
 2188    authenticate(Request, Application, UserOptions),
 2189    dict_to_options(Dict, Application, CreateOptions0),
 2190    append(UserOptions, CreateOptions0, CreateOptions),
 2191    pengine_uuid(Pengine),
 2192    message_queue_create(Queue, [max_size(25)]),
 2193    setting(Application:time_limit, TimeLimit),
 2194    get_time(Now),
 2195    asserta(pengine_queue(Pengine, Queue, TimeLimit, Now)),
 2196    broadcast(pengine(create(Pengine, Application, CreateOptions))),
 2197    create(Queue, Pengine, CreateOptions, http, Application),
 2198    create_wait_and_output_result(Pengine, Queue, Format,
 2199                                  TimeLimit, Dict),
 2200    gc_abandoned_queues.
 2201http_pengine_create(_Request, Application, Format, _Dict) :-
 2202    Error = existence_error(pengine_application, Application),
 2203    pengine_uuid(ID),
 2204    output_result(Format, error(ID, error(Error, _))).
 2205
 2206
 2207dict_to_options(Dict, Application, CreateOptions) :-
 2208    dict_pairs(Dict, _, Pairs),
 2209    pairs_create_options(Pairs, Application, CreateOptions).
 2210
 2211pairs_create_options([], _, []) :- !.
 2212pairs_create_options([N-V0|T0], App, [Opt|T]) :-
 2213    Opt =.. [N,V],
 2214    pengine_create_option(Opt), N \== user,
 2215    !,
 2216    (   create_option_type(Opt, atom)
 2217    ->  atom_string(V, V0)               % term creation must be done if
 2218    ;   V = V0                           % we created the source and know
 2219    ),                                   % the operators.
 2220    pairs_create_options(T0, App, T).
 2221pairs_create_options([_|T0], App, T) :-
 2222    pairs_create_options(T0, App, T).
 wait_and_output_result(+Pengine, +Queue, +Format, +TimeLimit) is det
Wait for the Pengine's Queue and if there is a message, send it to the requester using output_result/1. If Pengine does not answer within the time specified by the setting time_limit, Pengine is aborted and the result is error(time_limit_exceeded, _).
 2233wait_and_output_result(Pengine, Queue, Format, TimeLimit) :-
 2234    (   catch(thread_get_message(Queue, pengine_event(_, Event),
 2235                                 [ timeout(TimeLimit)
 2236                                 ]),
 2237              Error, true)
 2238    ->  (   var(Error)
 2239        ->  debug(pengine(wait), 'Got ~q from ~q', [Event, Queue]),
 2240            ignore(destroy_queue_from_http(Pengine, Event, Queue)),
 2241            protect_pengine(Pengine, output_result(Format, Event))
 2242        ;   output_result(Format, died(Pengine))
 2243        )
 2244    ;   time_limit_exceeded(Pengine, Format)
 2245    ).
 create_wait_and_output_result(+Pengine, +Queue, +Format, +TimeLimit, +Dict) is det
Intercepts the `solutions=all' case used for downloading results. Dict may contain a disposition key to denote the download location.
 2254create_wait_and_output_result(Pengine, Queue, Format, TimeLimit, Dict) :-
 2255    get_dict(solutions, Dict, all),
 2256    !,
 2257    between(1, infinite, Page),
 2258    (   catch(thread_get_message(Queue, pengine_event(_, Event),
 2259                                 [ timeout(TimeLimit)
 2260                                 ]),
 2261              Error, true)
 2262    ->  (   var(Error)
 2263        ->  debug(pengine(wait), 'Page ~D: got ~q from ~q', [Page, Event, Queue]),
 2264            (   destroy_queue_from_http(Pengine, Event, Queue)
 2265            ->  !,
 2266                protect_pengine(Pengine,
 2267                                output_result(Format, page(Page, Event), Dict))
 2268            ;   is_more_event(Event)
 2269            ->  pengine_thread(Pengine, Thread),
 2270                thread_send_message(Thread, pengine_request(next)),
 2271                protect_pengine(Pengine,
 2272                                output_result(Format, page(Page, Event), Dict)),
 2273                fail
 2274            ;   !,
 2275                protect_pengine(Pengine,
 2276                                output_result(Format, page(Page, Event), Dict))
 2277            )
 2278        ;   !, output_result(Format, died(Pengine))
 2279        )
 2280    ;   !, time_limit_exceeded(Pengine, Format)
 2281    ),
 2282    !.
 2283create_wait_and_output_result(Pengine, Queue, Format, TimeLimit, _Dict) :-
 2284    wait_and_output_result(Pengine, Queue, Format, TimeLimit).
 2285
 2286is_more_event(success(_Id, _Answers, _Projection, _Time, true)).
 2287is_more_event(create(_, Options)) :-
 2288    memberchk(answer(Event), Options),
 2289    is_more_event(Event).
 time_limit_exceeded(+Pengine, +Format)
The Pengine did not reply within its time limit. Send a reply to the client in the requested format and interrupt the Pengine.
bug
- Ideally, if the Pengine has destroy set to false, we should get the Pengine back to its main loop. Unfortunately we only have normal exceptions that may be caught by the Pengine and abort which cannot be caught and thus destroys the Pengine.
 2303time_limit_exceeded(Pengine, Format) :-
 2304    call_cleanup(
 2305        pengine_destroy(Pengine, [force(true)]),
 2306        output_result(Format,
 2307                      destroy(Pengine,
 2308                              error(Pengine, time_limit_exceeded)))).
 destroy_queue_from_http(+Pengine, +Event, +Queue) is semidet
Consider destroying the output queue for Pengine after sending Event back to the HTTP client. We can destroy the queue if
To be done
- If the client did not request all output, the queue will not be destroyed. We need some timeout and GC for that.
 2323destroy_queue_from_http(ID, _, Queue) :-
 2324    output_queue(ID, Queue, _),
 2325    !,
 2326    destroy_queue_if_empty(Queue).
 2327destroy_queue_from_http(ID, Event, Queue) :-
 2328    debug(pengine(destroy), 'DESTROY? ~p', [Event]),
 2329    is_destroy_event(Event),
 2330    !,
 2331    message_queue_property(Queue, size(Waiting)),
 2332    debug(pengine(destroy), 'Destroy ~p (waiting ~D)', [Queue, Waiting]),
 2333    with_mutex(pengine, sync_destroy_queue_from_http(ID, Queue)).
 2334
 2335is_destroy_event(destroy(_)).
 2336is_destroy_event(destroy(_,_)).
 2337is_destroy_event(create(_, Options)) :-
 2338    memberchk(answer(Event), Options),
 2339    is_destroy_event(Event).
 2340
 2341destroy_queue_if_empty(Queue) :-
 2342    thread_peek_message(Queue, _),
 2343    !.
 2344destroy_queue_if_empty(Queue) :-
 2345    retractall(output_queue(_, Queue, _)),
 2346    message_queue_destroy(Queue).
 gc_abandoned_queues
Check whether there are queues that have been abadoned. This happens if the stream contains output events and not all of them are read by the client.
 2354:- dynamic
 2355    last_gc/1. 2356
 2357gc_abandoned_queues :-
 2358    consider_queue_gc,
 2359    !,
 2360    get_time(Now),
 2361    (   output_queue(_, Queue, Time),
 2362        Now-Time > 15*60,
 2363        retract(output_queue(_, Queue, Time)),
 2364        message_queue_destroy(Queue),
 2365        fail
 2366    ;   retractall(last_gc(_)),
 2367        asserta(last_gc(Now))
 2368    ).
 2369gc_abandoned_queues.
 2370
 2371consider_queue_gc :-
 2372    predicate_property(output_queue(_,_,_), number_of_clauses(N)),
 2373    N > 100,
 2374    (   last_gc(Time),
 2375        get_time(Now),
 2376        Now-Time > 5*60
 2377    ->  true
 2378    ;   \+ last_gc(_)
 2379    ).
 sync_destroy_queue_from_http(+Pengine, +Queue) is det
 sync_delay_destroy_queue(+Pengine, +Queue) is det
Handle destruction of the message queue connecting the HTTP side to the pengine. We cannot delete the queue when the pengine dies because the queue may contain output events. Termination of the pengine and finishing the HTTP exchange may happen in both orders. This means we need handle this using synchronization.
sync_destroy_queue_from_pengine(+Pengine, +Queue)
Called (indirectly) from pengine_done/1 if the pengine's thread dies.
sync_destroy_queue_from_http(+Pengine, +Queue)
Called from destroy_queue/3, from wait_and_output_result/4, i.e., from the HTTP side.
 2397:- dynamic output_queue_destroyed/1. 2398
 2399sync_destroy_queue_from_http(ID, Queue) :-
 2400    (   output_queue(ID, Queue, _)
 2401    ->  destroy_queue_if_empty(Queue)
 2402    ;   thread_peek_message(Queue, pengine_event(_, output(_,_)))
 2403    ->  debug(pengine(destroy), 'Delay destruction of ~p because of output',
 2404              [Queue]),
 2405        get_time(Now),
 2406        asserta(output_queue(ID, Queue, Now))
 2407    ;   message_queue_destroy(Queue),
 2408        asserta(output_queue_destroyed(Queue))
 2409    ).
 sync_destroy_queue_from_pengine(+Pengine, +Queue)
Called from pengine_unregister/1 when the pengine thread terminates. It is called while the mutex pengine held.
 2416sync_destroy_queue_from_pengine(ID, Queue) :-
 2417    (   retract(output_queue_destroyed(Queue))
 2418    ->  true
 2419    ;   get_time(Now),
 2420        asserta(output_queue(ID, Queue, Now))
 2421    ),
 2422    retractall(pengine_queue(ID, Queue, _, _)).
 2423
 2424
 2425http_pengine_send(Request) :-
 2426    reply_options(Request, [get,post]),
 2427    !.
 2428http_pengine_send(Request) :-
 2429    http_parameters(Request,
 2430                    [ id(ID, [ type(atom) ]),
 2431                      event(EventString, [optional(true)]),
 2432                      format(Format, [default(prolog)])
 2433                    ]),
 2434    catch(read_event(ID, Request, Format, EventString, Event),
 2435          Error,
 2436          true),
 2437    (   var(Error)
 2438    ->  debug(pengine(event), 'HTTP send: ~p', [Event]),
 2439        (   pengine_thread(ID, Thread)
 2440        ->  pengine_queue(ID, Queue, TimeLimit, _),
 2441            random_delay,
 2442            broadcast(pengine(send(ID, Event))),
 2443            thread_send_message(Thread, pengine_request(Event)),
 2444            wait_and_output_result(ID, Queue, Format, TimeLimit)
 2445        ;   atom(ID)
 2446        ->  pengine_died(Format, ID)
 2447        ;   http_404([], Request)
 2448        )
 2449    ;   Error = error(existence_error(pengine, ID), _)
 2450    ->  pengine_died(Format, ID)
 2451    ;   output_result(Format, error(ID, Error))
 2452    ).
 2453
 2454pengine_died(Format, Pengine) :-
 2455    output_result(Format, error(Pengine,
 2456                                error(existence_error(pengine, Pengine),_))).
 read_event(+Pengine, +Request, +Format, +EventString, -Event) is det
Read an event on behalve of Pengine. Note that the pengine's module should not be deleted while we are reading using its syntax (module). This is ensured using the pengine_done mutex.
See also
- pengine_done/0.
 2467read_event(Pengine, Request, Format, EventString, Event) :-
 2468    protect_pengine(
 2469        Pengine,
 2470        ( get_pengine_module(Pengine, Module),
 2471          read_event_2(Request, EventString, Module, Event0, Bindings)
 2472        )),
 2473    !,
 2474    fix_bindings(Format, Event0, Bindings, Event).
 2475read_event(Pengine, Request, _Format, _EventString, _Event) :-
 2476    debug(pengine(event), 'Pengine ~q vanished', [Pengine]),
 2477    discard_post_data(Request),
 2478    existence_error(pengine, Pengine).
 read_event_(+Request, +EventString, +Module, -Event, -Bindings)
Read the sent event. The event is a Prolog term that is either in the event parameter or as a posted document.
 2486read_event_2(_Request, EventString, Module, Event, Bindings) :-
 2487    nonvar(EventString),
 2488    !,
 2489    term_string(Event, EventString,
 2490                [ variable_names(Bindings),
 2491                  module(Module)
 2492                ]).
 2493read_event_2(Request, _EventString, Module, Event, Bindings) :-
 2494    option(method(post), Request),
 2495    http_read_data(Request,     Event,
 2496                   [ content_type('application/x-prolog'),
 2497                     module(Module),
 2498                     variable_names(Bindings)
 2499                   ]).
 discard_post_data(+Request) is det
If this is a POST request, discard the posted data.
 2505discard_post_data(Request) :-
 2506    option(method(post), Request),
 2507    !,
 2508    setup_call_cleanup(
 2509        open_null_stream(NULL),
 2510        http_read_data(Request, _, [to(stream(NULL))]),
 2511        close(NULL)).
 2512discard_post_data(_).
 fix_bindings(+Format, +EventIn, +Bindings, -Event) is det
Generate the template for json(-s) Format from the variables in the asked Goal. Variables starting with an underscore, followed by an capital letter are ignored from the template.
 2520fix_bindings(Format,
 2521             ask(Goal, Options0), Bindings,
 2522             ask(Goal, NewOptions)) :-
 2523    json_lang(Format),
 2524    !,
 2525    exclude(anon, Bindings, NamedBindings),
 2526    template(NamedBindings, Template, Options0, Options1),
 2527    select_option(chunk(Paging), Options1, Options2, 1),
 2528    NewOptions = [ template(Template),
 2529                   chunk(Paging),
 2530                   bindings(NamedBindings)
 2531                 | Options2
 2532                 ].
 2533fix_bindings(_, Command, _, Command).
 2534
 2535template(_, Template, Options0, Options) :-
 2536    select_option(template(Template), Options0, Options),
 2537    !.
 2538template(Bindings, Template, Options, Options) :-
 2539    dict_create(Template, swish_default_template, Bindings).
 2540
 2541anon(Name=_) :-
 2542    sub_atom(Name, 0, _, _, '_'),
 2543    sub_atom(Name, 1, 1, _, Next),
 2544    char_type(Next, prolog_var_start).
 2545
 2546var_name(Name=_, Name).
 json_lang(+Format) is semidet
True if Format is a JSON variation.
 2553json_lang(json) :- !.
 2554json_lang(Format) :-
 2555    sub_atom(Format, 0, _, _, 'json-').
 http_pengine_pull_response(+Request)
HTTP handler for /pengine/pull_response. Pulls possible pending messages from the pengine.
 2562http_pengine_pull_response(Request) :-
 2563    reply_options(Request, [get]),
 2564    !.
 2565http_pengine_pull_response(Request) :-
 2566    http_parameters(Request,
 2567            [   id(ID, []),
 2568                format(Format, [default(prolog)])
 2569            ]),
 2570    reattach(ID),
 2571    (   (   pengine_queue(ID, Queue, TimeLimit, _)
 2572        ->  true
 2573        ;   output_queue(ID, Queue, _),
 2574            TimeLimit = 0
 2575        )
 2576    ->  wait_and_output_result(ID, Queue, Format, TimeLimit)
 2577    ;   http_404([], Request)
 2578    ).
 http_pengine_abort(+Request)
HTTP handler for /pengine/abort. Note that abort may be sent at any time and the reply may be handled by a pull_response. In that case, our pengine has already died before we get to wait_and_output_result/4.
 2587http_pengine_abort(Request) :-
 2588    reply_options(Request, [get,post]),
 2589    !.
 2590http_pengine_abort(Request) :-
 2591    http_parameters(Request,
 2592            [   id(ID, [])
 2593            ]),
 2594    (   pengine_thread(ID, _Thread)
 2595    ->  broadcast(pengine(abort(ID))),
 2596        abort_pending_output(ID),
 2597        pengine_abort(ID),
 2598        reply_json(true)
 2599    ;   http_404([], Request)
 2600    ).
 http_pengine_detach(+Request)
Detach a Pengine while keeping it running. This has the following consequences:
 2612http_pengine_detach(Request) :-
 2613    reply_options(Request, [post]),
 2614    !.
 2615http_pengine_detach(Request) :-
 2616    http_parameters(Request,
 2617                    [ id(ID, [])
 2618                    ]),
 2619    http_read_json_dict(Request, ClientData),
 2620    (   pengine_property(ID, application(Application)),
 2621        allowed(Request, Application),
 2622        authenticate(Request, Application, _UserOptions)
 2623    ->  broadcast(pengine(detach(ID))),
 2624        get_time(Now),
 2625        assertz(pengine_detached(ID, ClientData.put(time, Now))),
 2626        pengine_queue(ID, Queue, _TimeLimit, _Now),
 2627        message_queue_set(Queue, max_size(1000)),
 2628        pengine_reply(Queue, detached(ID)),
 2629        reply_json(true)
 2630    ;   http_404([], Request)
 2631    ).
 2632
 2633:- if(\+current_predicate(message_queue_set/2)). 2634message_queue_set(_,_).
 2635:- endif. 2636
 2637reattach(ID) :-
 2638    (   retract(pengine_detached(ID, _Data)),
 2639        pengine_queue(ID, Queue, _TimeLimit, _Now)
 2640    ->  message_queue_set(Queue, max_size(25))
 2641    ;   true
 2642    ).
 http_pengine_destroy_all(+Request)
Destroy a list of pengines. Normally called by pengines.js if the browser window is closed.
 2650http_pengine_destroy_all(Request) :-
 2651    reply_options(Request, [get,post]),
 2652    !.
 2653http_pengine_destroy_all(Request) :-
 2654    http_parameters(Request,
 2655                    [ ids(IDsAtom, [])
 2656                    ]),
 2657    atomic_list_concat(IDs, ',', IDsAtom),
 2658    forall(( member(ID, IDs),
 2659             \+ pengine_detached(ID, _)
 2660           ),
 2661           pengine_destroy(ID, [force(true)])),
 2662    reply_json("ok").
 http_pengine_ping(+Request)
HTTP handler for /pengine/ping. If the requested Pengine is alive and event status(Pengine, Stats) is created, where Stats is the return of thread_statistics/2.
 2670http_pengine_ping(Request) :-
 2671    reply_options(Request, [get]),
 2672    !.
 2673http_pengine_ping(Request) :-
 2674    http_parameters(Request,
 2675                    [ id(Pengine, []),
 2676                      format(Format, [default(prolog)])
 2677                    ]),
 2678    (   pengine_thread(Pengine, Thread),
 2679        Error = error(_,_),
 2680        catch(thread_statistics(Thread, Stats), Error, fail)
 2681    ->  output_result(Format, ping(Pengine, Stats))
 2682    ;   output_result(Format, died(Pengine))
 2683    ).
 http_pengine_list(+Request)
HTTP handler for `/pengine/list`, providing information about running Pengines.
To be done
- Only list detached Pengines associated to the logged in user.
 2692http_pengine_list(Request) :-
 2693    reply_options(Request, [get]),
 2694    !.
 2695http_pengine_list(Request) :-
 2696    http_parameters(Request,
 2697                    [ status(Status, [default(detached), oneof([detached])]),
 2698                      application(Application, [default(pengine_sandbox)])
 2699                    ]),
 2700    allowed(Request, Application),
 2701    authenticate(Request, Application, _UserOptions),
 2702    findall(Term, listed_pengine(Application, Status, Term), Terms),
 2703    reply_json(json{pengines: Terms}).
 2704
 2705listed_pengine(Application, detached, State) :-
 2706    State = pengine{id:Id,
 2707                    detached:Time,
 2708                    queued:Queued,
 2709                    stats:Stats},
 2710
 2711    pengine_property(Id, application(Application)),
 2712    pengine_property(Id, detached(Time)),
 2713    pengine_queue(Id, Queue, _TimeLimit, _Now),
 2714    message_queue_property(Queue, size(Queued)),
 2715    (   pengine_thread(Id, Thread),
 2716        catch(thread_statistics(Thread, Stats), _, fail)
 2717    ->  true
 2718    ;   Stats = thread{status:died}
 2719    ).
 output_result(+Format, +EventTerm) is det
 output_result(+Format, +EventTerm, +OptionsDict) is det
Formulate an HTTP response from a pengine event term. Format is one of prolog, json or json-s.
 2728:- dynamic
 2729    pengine_replying/2.             % +Pengine, +Thread
 2730
 2731output_result(Format, Event) :-
 2732    arg(1, Event, Pengine),
 2733    thread_self(Thread),
 2734    cors_enable,            % contingent on http:cors setting
 2735    disable_client_cache,
 2736    setup_call_cleanup(
 2737        asserta(pengine_replying(Pengine, Thread), Ref),
 2738        catch(output_result(Format, Event, _{}),
 2739              pengine_abort_output,
 2740              true),
 2741        erase(Ref)).
 2742
 2743output_result(Lang, Event, Dict) :-
 2744    write_result(Lang, Event, Dict),
 2745    !.
 2746output_result(prolog, Event, _) :-
 2747    !,
 2748    format('Content-type: text/x-prolog; charset=UTF-8~n~n'),
 2749    write_term(Event,
 2750               [ quoted(true),
 2751                 ignore_ops(true),
 2752                 fullstop(true),
 2753                 blobs(portray),
 2754                 portray_goal(portray_blob),
 2755                 nl(true)
 2756               ]).
 2757output_result(Lang, Event, _) :-
 2758    json_lang(Lang),
 2759    !,
 2760    (   event_term_to_json_data(Event, JSON, Lang)
 2761    ->  reply_json(JSON)
 2762    ;   assertion(event_term_to_json_data(Event, _, Lang))
 2763    ).
 2764output_result(Lang, _Event, _) :-    % FIXME: allow for non-JSON format
 2765    domain_error(pengine_format, Lang).
 portray_blob(+Blob, +Options) is det
Portray non-text blobs that may appear in output terms. Not really sure about that. Basically such terms need to be avoided as they are meaningless outside the process. The generated error is hard to debug though, so now we send them as '$BLOB'(Type). Future versions may include more info, depending on Type.
 2775:- public portray_blob/2.               % called from write-term
 2776portray_blob(Blob, _Options) :-
 2777    blob(Blob, Type),
 2778    writeq('$BLOB'(Type)).
 abort_pending_output(+Pengine) is det
If we get an abort, it is possible that output is being produced for the client. This predicate aborts these threads.
 2785abort_pending_output(Pengine) :-
 2786    forall(pengine_replying(Pengine, Thread),
 2787           abort_output_thread(Thread)).
 2788
 2789abort_output_thread(Thread) :-
 2790    catch(thread_signal(Thread, throw(pengine_abort_output)),
 2791          error(existence_error(thread, _), _),
 2792          true).
 write_result(+Lang, +Event, +Dict) is semidet
Hook that allows for different output formats. The core Pengines library supports prolog and various JSON dialects. The hook event_to_json/3 can be used to refine the JSON dialects. This hook must be used if a completely different output format is desired.
 disable_client_cache
Make sure the client will not cache our page.
See also
- http://stackoverflow.com/questions/49547/making-sure-a-web-page-is-not-cached-across-all-browsers
 2808disable_client_cache :-
 2809    format('Cache-Control: no-cache, no-store, must-revalidate\r\n\c
 2810            Pragma: no-cache\r\n\c
 2811            Expires: 0\r\n').
 2812
 2813event_term_to_json_data(Event, JSON, Lang) :-
 2814    event_to_json(Event, JSON, Lang),
 2815    !.
 2816event_term_to_json_data(success(ID, Bindings0, Projection, Time, More),
 2817                        json{event:success, id:ID, time:Time,
 2818                             data:Bindings, more:More, projection:Projection},
 2819                        json) :-
 2820    !,
 2821    term_to_json(Bindings0, Bindings).
 2822event_term_to_json_data(destroy(ID, Event),
 2823                        json{event:destroy, id:ID, data:JSON},
 2824                        Style) :-
 2825    !,
 2826    event_term_to_json_data(Event, JSON, Style).
 2827event_term_to_json_data(create(ID, Features0), JSON, Style) :-
 2828    !,
 2829    (   select(answer(First0), Features0, Features1)
 2830    ->  event_term_to_json_data(First0, First, Style),
 2831        Features = [answer(First)|Features1]
 2832    ;   Features = Features0
 2833    ),
 2834    dict_create(JSON, json, [event(create), id(ID)|Features]).
 2835event_term_to_json_data(destroy(ID, Event),
 2836                        json{event:destroy, id:ID, data:JSON}, Style) :-
 2837    !,
 2838    event_term_to_json_data(Event, JSON, Style).
 2839event_term_to_json_data(error(ID, ErrorTerm), Error, _Style) :-
 2840    !,
 2841    Error0 = json{event:error, id:ID, data:Message},
 2842    add_error_details(ErrorTerm, Error0, Error),
 2843    message_to_string(ErrorTerm, Message).
 2844event_term_to_json_data(failure(ID, Time),
 2845                        json{event:failure, id:ID, time:Time}, _) :-
 2846    !.
 2847event_term_to_json_data(EventTerm, json{event:F, id:ID}, _) :-
 2848    functor(EventTerm, F, 1),
 2849    !,
 2850    arg(1, EventTerm, ID).
 2851event_term_to_json_data(EventTerm, json{event:F, id:ID, data:JSON}, _) :-
 2852    functor(EventTerm, F, 2),
 2853    arg(1, EventTerm, ID),
 2854    arg(2, EventTerm, Data),
 2855    term_to_json(Data, JSON).
 2856
 2857:- public add_error_details/3.
 add_error_details(+Error, +JSON0, -JSON)
Add format error code and location information to an error. Also used by pengines_io.pl.
 2864add_error_details(Error, JSON0, JSON) :-
 2865    add_error_code(Error, JSON0, JSON1),
 2866    add_error_location(Error, JSON1, JSON).
 add_error_code(+Error, +JSON0, -JSON) is det
Add a code field to JSON0 of Error is an ISO error term. The error code is the functor name of the formal part of the error, e.g., syntax_error, type_error, etc. Some errors carry more information:
existence_error(Type, Obj)
{arg1:Type, arg2:Obj}, where Obj is stringified of it is not atomic.
 2879add_error_code(error(existence_error(Type, Obj), _), Error0, Error) :-
 2880    atom(Type),
 2881    !,
 2882    to_atomic(Obj, Value),
 2883    Error = Error0.put(_{code:existence_error, arg1:Type, arg2:Value}).
 2884add_error_code(error(Formal, _), Error0, Error) :-
 2885    callable(Formal),
 2886    !,
 2887    functor(Formal, Code, _),
 2888    Error = Error0.put(code, Code).
 2889add_error_code(_, Error, Error).
 2890
 2891% What to do with large integers?
 2892to_atomic(Obj, Atomic) :- atom(Obj),   !, Atomic = Obj.
 2893to_atomic(Obj, Atomic) :- number(Obj), !, Atomic = Obj.
 2894to_atomic(Obj, Atomic) :- string(Obj), !, Atomic = Obj.
 2895to_atomic(Obj, Atomic) :- term_string(Obj, Atomic).
 add_error_location(+Error, +JSON0, -JSON) is det
Add a location property if the error can be associated with a source location. The location is an object with properties file and line and, if available, the character location in the line.
 2904add_error_location(error(_, file(Path, Line, -1, _CharNo)), Term0, Term) :-
 2905    atom(Path), integer(Line),
 2906    !,
 2907    Term = Term0.put(_{location:_{file:Path, line:Line}}).
 2908add_error_location(error(_, file(Path, Line, Ch, _CharNo)), Term0, Term) :-
 2909    atom(Path), integer(Line), integer(Ch),
 2910    !,
 2911    Term = Term0.put(_{location:_{file:Path, line:Line, ch:Ch}}).
 2912add_error_location(_, Term, Term).
 event_to_json(+Event, -JSONTerm, +Lang) is semidet
Hook that translates a Pengine event structure into a term suitable for reply_json/1, according to the language specification Lang. This can be used to massage general Prolog terms, notably associated with success(ID, Bindings, Projection, Time, More) and output(ID, Term) into a format suitable for processing at the client side.
 2923%:- multifile pengines:event_to_json/3.
 2924
 2925
 2926                 /*******************************
 2927                 *        ACCESS CONTROL        *
 2928                 *******************************/
 allowed(+Request, +Application) is det
Check whether the peer is allowed to connect. Returns a forbidden header if contact is not allowed.
 2935allowed(Request, Application) :-
 2936    setting(Application:allow_from, Allow),
 2937    match_peer(Request, Allow),
 2938    setting(Application:deny_from, Deny),
 2939    \+ match_peer(Request, Deny),
 2940    !.
 2941allowed(Request, _Application) :-
 2942    memberchk(request_uri(Here), Request),
 2943    throw(http_reply(forbidden(Here))).
 2944
 2945match_peer(_, Allowed) :-
 2946    memberchk(*, Allowed),
 2947    !.
 2948match_peer(_, []) :- !, fail.
 2949match_peer(Request, Allowed) :-
 2950    http_peer(Request, Peer),
 2951    debug(pengine(allow), 'Peer: ~q, Allow: ~q', [Peer, Allowed]),
 2952    (   memberchk(Peer, Allowed)
 2953    ->  true
 2954    ;   member(Pattern, Allowed),
 2955        match_peer_pattern(Pattern, Peer)
 2956    ).
 2957
 2958match_peer_pattern(Pattern, Peer) :-
 2959    ip_term(Pattern, IP),
 2960    ip_term(Peer, IP),
 2961    !.
 2962
 2963ip_term(Peer, Pattern) :-
 2964    split_string(Peer, ".", "", PartStrings),
 2965    ip_pattern(PartStrings, Pattern).
 2966
 2967ip_pattern([], []).
 2968ip_pattern([*], _) :- !.
 2969ip_pattern([S|T0], [N|T]) :-
 2970    number_string(N, S),
 2971    ip_pattern(T0, T).
 authenticate(+Request, +Application, -UserOptions:list) is det
Call authentication_hook/3, returning either [user(User)], [] or an exception.
 2979authenticate(Request, Application, UserOptions) :-
 2980    authentication_hook(Request, Application, User),
 2981    !,
 2982    must_be(ground, User),
 2983    UserOptions = [user(User)].
 2984authenticate(_, _, []).
 authentication_hook(+Request, +Application, -User) is semidet
This hook is called from the =/pengine/create= HTTP handler to discover whether the server is accessed by an authorized user. It can react in three ways:
See also
- http_authenticate/3 can be used to implement this hook using default HTTP authentication data.
 3006pengine_register_user(Options) :-
 3007    option(user(User), Options),
 3008    !,
 3009    pengine_self(Me),
 3010    asserta(pengine_user(Me, User)).
 3011pengine_register_user(_).
 pengine_user(-User) is semidet
True when the pengine was create by an HTTP request that authorized User.
See also
- authentication_hook/3 can be used to extract authorization from the HTTP header.
 3022pengine_user(User) :-
 3023    pengine_self(Me),
 3024    pengine_user(Me, User).
 reply_options(+Request, +Methods) is semidet
Reply the HTTP OPTIONS request
 3030reply_options(Request, Allowed) :-
 3031    option(method(options), Request),
 3032    !,
 3033    cors_enable(Request,
 3034                [ methods(Allowed)
 3035                ]),
 3036    format('Content-type: text/plain\r\n'),
 3037    format('~n').                   % empty body
 3038
 3039
 3040                 /*******************************
 3041                 *        COMPILE SOURCE        *
 3042                 *******************************/
 pengine_src_text(+SrcText, +Module) is det
Asserts the clauses defined in SrcText in the private database of the current Pengine. This predicate processes the `src_text' option of pengine_create/1. */
 3051pengine_src_text(Src, Module) :-
 3052    pengine_self(Self),
 3053    format(atom(ID), 'pengine://~w/src', [Self]),
 3054    extra_load_options(Self, Options),
 3055    setup_call_cleanup(
 3056        open_chars_stream(Src, Stream),
 3057        load_files(Module:ID,
 3058                   [ stream(Stream),
 3059                     module(Module),
 3060                     silent(true)
 3061                   | Options
 3062                   ]),
 3063        close(Stream)),
 3064    keep_source(Self, ID, Src).
 3065
 3066system:'#file'(File, _Line) :-
 3067    prolog_load_context(stream, Stream),
 3068    set_stream(Stream, file_name(File)),
 3069    set_stream(Stream, record_position(false)),
 3070    set_stream(Stream, record_position(true)).
 pengine_src_url(+URL, +Module) is det
Asserts the clauses defined in URL in the private database of the current Pengine. This predicate processes the `src_url' option of pengine_create/1.
To be done
- : make a sensible guess at the encoding.
 3080pengine_src_url(URL, Module) :-
 3081    pengine_self(Self),
 3082    uri_encoded(path, URL, Path),
 3083    format(atom(ID), 'pengine://~w/url/~w', [Self, Path]),
 3084    extra_load_options(Self, Options),
 3085    (   get_pengine_application(Self, Application),
 3086        setting(Application:debug_info, false)
 3087    ->  setup_call_cleanup(
 3088            http_open(URL, Stream, []),
 3089            ( set_stream(Stream, encoding(utf8)),
 3090              load_files(Module:ID,
 3091                         [ stream(Stream),
 3092                           module(Module)
 3093                         | Options
 3094                         ])
 3095            ),
 3096            close(Stream))
 3097    ;   setup_call_cleanup(
 3098            http_open(URL, TempStream, []),
 3099            ( set_stream(TempStream, encoding(utf8)),
 3100              read_string(TempStream, _, Src)
 3101            ),
 3102            close(TempStream)),
 3103        setup_call_cleanup(
 3104            open_chars_stream(Src, Stream),
 3105            load_files(Module:ID,
 3106                       [ stream(Stream),
 3107                         module(Module)
 3108                       | Options
 3109                       ]),
 3110            close(Stream)),
 3111        keep_source(Self, ID, Src)
 3112    ).
 3113
 3114
 3115extra_load_options(Pengine, Options) :-
 3116    pengine_not_sandboxed(Pengine),
 3117    !,
 3118    Options = [].
 3119extra_load_options(_, [sandboxed(true)]).
 3120
 3121
 3122keep_source(Pengine, ID, SrcText) :-
 3123    get_pengine_application(Pengine, Application),
 3124    setting(Application:debug_info, true),
 3125    !,
 3126    to_string(SrcText, SrcString),
 3127    assertz(pengine_data(Pengine, source(ID, SrcString))).
 3128keep_source(_, _, _).
 3129
 3130to_string(String, String) :-
 3131    string(String),
 3132    !.
 3133to_string(Atom, String) :-
 3134    atom_string(Atom, String),
 3135    !.
 3136
 3137		 /*******************************
 3138		 *            SANDBOX		*
 3139		 *******************************/
 3140
 3141:- multifile
 3142    sandbox:safe_primitive/1. 3143
 3144sandbox:safe_primitive(pengines:pengine_input(_, _)).
 3145sandbox:safe_primitive(pengines:pengine_output(_)).
 3146sandbox:safe_primitive(pengines:pengine_debug(_,_)).
 3147
 3148
 3149                 /*******************************
 3150                 *            MESSAGES          *
 3151                 *******************************/
 3152
 3153prolog:error_message(sandbox(time_limit_exceeded, Limit)) -->
 3154    [ 'Could not prove safety of your goal within ~f seconds.'-[Limit], nl,
 3155      'This is normally caused by an insufficiently instantiated'-[], nl,
 3156      'meta-call (e.g., call(Var)) for which it is too expensive to'-[], nl,
 3157      'find all possible instantations of Var.'-[]
 3158    ]