View source with raw comments or as raw
    1/*  Part of SWI-Prolog
    2
    3    Author:        Jan Wielemaker
    4    E-mail:        J.Wielemaker@vu.nl
    5    WWW:           http://www.swi-prolog.org
    6    Copyright (c)  2002-2020, University of Amsterdam
    7                              VU University Amsterdam
    8                              CWI, Amsterdam
    9    All rights reserved.
   10
   11    Redistribution and use in source and binary forms, with or without
   12    modification, are permitted provided that the following conditions
   13    are met:
   14
   15    1. Redistributions of source code must retain the above copyright
   16       notice, this list of conditions and the following disclaimer.
   17
   18    2. Redistributions in binary form must reproduce the above copyright
   19       notice, this list of conditions and the following disclaimer in
   20       the documentation and/or other materials provided with the
   21       distribution.
   22
   23    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
   24    "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
   25    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
   26    FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
   27    COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
   28    INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
   29    BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
   30    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
   31    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   32    LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
   33    ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
   34    POSSIBILITY OF SUCH DAMAGE.
   35*/
   36
   37:- module(thread_httpd,
   38          [ http_current_server/2,      % ?:Goal, ?Port
   39            http_server_property/2,     % ?Port, ?Property
   40            http_server/2,              % :Goal, +Options
   41            http_workers/2,             % +Port, ?WorkerCount
   42            http_add_worker/2,          % +Port, +Options
   43            http_current_worker/2,      % ?Port, ?ThreadID
   44            http_stop_server/2,         % +Port, +Options
   45            http_spawn/2,               % :Goal, +Options
   46
   47            http_requeue/1,             % +Request
   48            http_close_connection/1,    % +Request
   49            http_enough_workers/3       % +Queue, +Why, +Peer
   50          ]).   51:- use_module(library(debug)).   52:- use_module(library(error)).   53:- use_module(library(option)).   54:- use_module(library(socket)).   55:- use_module(library(thread_pool)).   56:- use_module(library(gensym)).   57:- use_module(http_wrapper).   58:- use_module(http_path).   59
   60
   61:- predicate_options(http_server/2, 2,
   62                     [ port(any),
   63                       entry_page(atom),
   64                       tcp_socket(any),
   65                       workers(positive_integer),
   66                       timeout(number),
   67                       keep_alive_timeout(number),
   68                       silent(boolean),
   69                       ssl(list(any)),  % if http/http_ssl_plugin is loaded
   70                       pass_to(system:thread_create/3, 3)
   71                     ]).   72:- predicate_options(http_spawn/2, 2,
   73                     [ pool(atom),
   74                       pass_to(system:thread_create/3, 3),
   75                       pass_to(thread_pool:thread_create_in_pool/4, 4)
   76                     ]).   77:- predicate_options(http_add_worker/2, 2,
   78                     [ timeout(number),
   79                       keep_alive_timeout(number),
   80                       max_idle_time(number),
   81                       pass_to(system:thread_create/3, 3)
   82                     ]).

Threaded HTTP server

Most code doesn't need to use this directly; instead use library(http/http_server), which combines this library with the typical HTTP libraries that most servers need.

This library defines the HTTP server frontend of choice for SWI-Prolog. It is based on the multi-threading capabilities of SWI-Prolog and thus exploits multiple cores to serve requests concurrently. The server scales well and can cooperate with library(thread_pool) to control the number of concurrent requests of a given type. For example, it can be configured to handle 200 file download requests concurrently, 2 requests that potentially uses a lot of memory and 8 requests that use a lot of CPU resources.

On Unix systems, this library can be combined with library(http/http_unix_daemon) to realise a proper Unix service process that creates a web server at port 80, runs under a specific account, optionally detaches from the controlling terminal, etc.

Combined with library(http/http_ssl_plugin) from the SSL package, this library can be used to create an HTTPS server. See <plbase>/doc/packages/examples/ssl/https for an example server using a self-signed SSL certificate. */

  110:- meta_predicate
  111    http_server(1, :),
  112    http_current_server(1, ?),
  113    http_spawn(0, +).  114
  115:- dynamic
  116    current_server/6,       % Port, Goal, Thread, Queue, Scheme, StartTime
  117    queue_worker/2,         % Queue, ThreadID
  118    queue_options/2.        % Queue, Options
  119
  120:- multifile
  121    make_socket_hook/3,
  122    accept_hook/2,
  123    close_hook/1,
  124    open_client_hook/6,
  125    http:create_pool/1,
  126    http:schedule_workers/1.  127
  128:- meta_predicate
  129    thread_repeat_wait(0).
 http_server(:Goal, :Options) is det
Create a server at Port that calls Goal for each parsed request. Options provide a list of options. Defined options are
port(?Address)
Port to bind to. Address is either a port or a term Host:Port. The port may be a variable, causing the system to select a free port. See tcp_bind/2.
entry_page(+URI)
Affects the message printed while the server is started. Interpreted as a URI relative to the server root.
tcp_socket(+Socket)
If provided, use this socket instead of the creating one and binding it to an address. The socket must be bound to an address.
workers(+Count)
Determine the number of worker threads. Default is 5. This is fine for small scale usage. Public servers typically need a higher number.
timeout(+Seconds)
Maximum time of inactivity trying to read the request after a connection has been opened. Default is 60 seconds. See set_stream/1 using the timeout option.
keep_alive_timeout(+Seconds)
Time to keep `Keep alive' connections alive. Default is 2 seconds.
stack_limit(+Bytes)
Stack limit to use for the workers. The default is inherited from the main thread. If you need to control resource usage you may consider the spawn option of http_handler/3 and library(thread_pool).
silent(Bool)
If true (default false), do not print an informational message that the server was started.

A typical initialization for an HTTP server that uses http_dispatch/1 to relay requests to predicates is:

:- use_module(library(http/thread_httpd)).
:- use_module(library(http/http_dispatch)).

start_server(Port) :-
    http_server(http_dispatch, [port(Port)]).

Note that multiple servers can coexist in the same Prolog process. A notable application of this is to have both an HTTP and HTTPS server, where the HTTP server redirects to the HTTPS server for handling sensitive requests.

  190http_server(Goal, M:Options0) :-
  191    option(port(Port), Options0),
  192    !,
  193    make_socket(Port, M:Options0, Options),
  194    create_workers(Options),
  195    create_server(Goal, Port, Options),
  196    (   option(silent(true), Options0)
  197    ->  true
  198    ;   print_message(informational,
  199                      httpd_started_server(Port, Options0))
  200    ).
  201http_server(_Goal, _Options) :-
  202    existence_error(option, port).
 make_socket(?Port, :OptionsIn, -OptionsOut) is det
Create the HTTP server socket and worker pool queue. OptionsOut is quaranteed to hold the option queue(QueueId).
Arguments:
OptionsIn- is qualified to allow passing the module-sensitive ssl option argument.
  213make_socket(Port, Options0, Options) :-
  214    make_socket_hook(Port, Options0, Options),
  215    !.
  216make_socket(Port, _:Options0, Options) :-
  217    option(tcp_socket(_), Options0),
  218    !,
  219    make_addr_atom('httpd', Port, Queue),
  220    Options = [ queue(Queue)
  221              | Options0
  222              ].
  223make_socket(Port, _:Options0, Options) :-
  224    tcp_socket(Socket),
  225    tcp_setopt(Socket, reuseaddr),
  226    tcp_bind(Socket, Port),
  227    tcp_listen(Socket, 64),
  228    make_addr_atom('httpd', Port, Queue),
  229    Options = [ queue(Queue),
  230                tcp_socket(Socket)
  231              | Options0
  232              ].
 make_addr_atom(+Scheme, +Address, -Atom) is det
Create an atom that identifies the server's queue and thread resources.
  239make_addr_atom(Scheme, Address, Atom) :-
  240    phrase(address_parts(Address), Parts),
  241    atomic_list_concat([Scheme,@|Parts], Atom).
  242
  243address_parts(Atomic) -->
  244    { atomic(Atomic) },
  245    !,
  246    [Atomic].
  247address_parts(Host:Port) -->
  248    !,
  249    address_parts(Host), [:], address_parts(Port).
  250address_parts(ip(A,B,C,D)) -->
  251    !,
  252    [ A, '.', B, '.', C, '.', D ].
 create_server(:Goal, +Address, +Options) is det
Create the main server thread that runs accept_server/2 to listen to new requests.
  259create_server(Goal, Address, Options) :-
  260    get_time(StartTime),
  261    memberchk(queue(Queue), Options),
  262    scheme(Scheme, Options),
  263    autoload_https(Scheme),
  264    address_port(Address, Port),
  265    make_addr_atom(Scheme, Port, Alias),
  266    thread_self(Initiator),
  267    thread_create(accept_server(Goal, Initiator, Options), _,
  268                  [ alias(Alias)
  269                  ]),
  270    thread_get_message(server_started),
  271    assert(current_server(Port, Goal, Alias, Queue, Scheme, StartTime)).
  272
  273scheme(Scheme, Options) :-
  274    option(scheme(Scheme), Options),
  275    !.
  276scheme(Scheme, Options) :-
  277    (   option(ssl(_), Options)
  278    ;   option(ssl_instance(_), Options)
  279    ),
  280    !,
  281    Scheme = https.
  282scheme(http, _).
  283
  284address_port(_Host:Port, Port) :- !.
  285address_port(Port, Port).
  286
  287autoload_https(https) :-
  288    \+ clause(accept_hook(_Goal, _Options), _),
  289    exists_source(library(http/http_ssl_plugin)),
  290    !,
  291    use_module(library(http/http_ssl_plugin)).
  292autoload_https(_).
 http_current_server(:Goal, ?Port) is nondet
True if Goal is the goal of a server at Port.
deprecated
- Use http_server_property(Port, goal(Goal))
  300http_current_server(Goal, Port) :-
  301    current_server(Port, Goal, _, _, _, _).
 http_server_property(?Port, ?Property) is nondet
True if Property is a property of the HTTP server running at Port. Defined properties are:
goal(:Goal)
Goal used to start the server. This is often http_dispatch/1.
scheme(-Scheme)
Scheme is one of http or https.
start_time(?Time)
Time-stamp when the server was created.
  317http_server_property(_:Port, Property) :-
  318    integer(Port),
  319    !,
  320    server_property(Property, Port).
  321http_server_property(Port, Property) :-
  322    server_property(Property, Port).
  323
  324server_property(goal(Goal), Port) :-
  325    current_server(Port, Goal, _, _, _, _).
  326server_property(scheme(Scheme), Port) :-
  327    current_server(Port, _, _, _, Scheme, _).
  328server_property(start_time(Time), Port) :-
  329    current_server(Port, _, _, _, _, Time).
 http_workers(+Port, -Workers) is det
http_workers(+Port, +Workers:int) is det
Query or set the number of workers for the server at this port. The number of workers is dynamically modified. Setting it to 1 (one) can be used to profile the worker using tprofile/1.
  339http_workers(Port, Workers) :-
  340    must_be(ground, Port),
  341    current_server(Port, _, _, Queue, _, _),
  342    !,
  343    (   integer(Workers)
  344    ->  resize_pool(Queue, Workers)
  345    ;   findall(W, queue_worker(Queue, W), WorkerIDs),
  346        length(WorkerIDs, Workers)
  347    ).
  348http_workers(Port, _) :-
  349    existence_error(http_server, Port).
 http_add_worker(+Port, +Options) is det
Add a new worker to the HTTP server for port Port. Options overrule the default queue options. The following additional options are processed:
max_idle_time(+Seconds)
The created worker will automatically terminate if there is no new work within Seconds.
  362http_add_worker(Port, Options) :-
  363    must_be(ground, Port),
  364    current_server(Port, _, _, Queue, _, _),
  365    !,
  366    queue_options(Queue, QueueOptions),
  367    merge_options(Options, QueueOptions, WorkerOptions),
  368    atom_concat(Queue, '_', AliasBase),
  369    create_workers(1, 1, Queue, AliasBase, WorkerOptions).
  370http_add_worker(Port, _) :-
  371    existence_error(http_server, Port).
 http_current_worker(?Port, ?ThreadID) is nondet
True if ThreadID is the identifier of a Prolog thread serving Port. This predicate is motivated to allow for the use of arbitrary interaction with the worker thread for development and statistics.
  381http_current_worker(Port, ThreadID) :-
  382    current_server(Port, _, _, Queue, _, _),
  383    queue_worker(Queue, ThreadID).
 accept_server(:Goal, +Initiator, +Options)
The goal of a small server-thread accepting new requests and posting them to the queue of workers.
  391accept_server(Goal, Initiator, Options) :-
  392    catch(accept_server2(Goal, Initiator, Options), http_stop, true),
  393    thread_self(Thread),
  394    retract(current_server(_Port, _, Thread, _Queue, _Scheme, _StartTime)),
  395    close_server_socket(Options).
  396
  397accept_server2(Goal, Initiator, Options) :-
  398    thread_send_message(Initiator, server_started),
  399    repeat,
  400      (   catch(accept_server3(Goal, Options), E, true)
  401      ->  (   var(E)
  402          ->  fail
  403          ;   accept_rethrow_error(E)
  404          ->  throw(E)
  405          ;   print_message(error, E),
  406              fail
  407          )
  408      ;   print_message(error,      % internal error
  409                        goal_failed(accept_server3(Goal, Options))),
  410          fail
  411      ).
  412
  413accept_server3(Goal, Options) :-
  414    accept_hook(Goal, Options),
  415    !.
  416accept_server3(Goal, Options) :-
  417    memberchk(tcp_socket(Socket), Options),
  418    memberchk(queue(Queue), Options),
  419    debug(http(connection), 'Waiting for connection', []),
  420    tcp_accept(Socket, Client, Peer),
  421    debug(http(connection), 'New HTTP connection from ~p', [Peer]),
  422    thread_send_message(Queue, tcp_client(Client, Goal, Peer)),
  423    http_enough_workers(Queue, accept, Peer).
  424
  425accept_rethrow_error(http_stop).
  426accept_rethrow_error('$aborted').
 close_server_socket(+Options)
Close the server socket.
  433close_server_socket(Options) :-
  434    close_hook(Options),
  435    !.
  436close_server_socket(Options) :-
  437    memberchk(tcp_socket(Socket), Options),
  438    !,
  439    tcp_close_socket(Socket).
 http_stop_server(+Port, +Options)
Stop the indicated HTTP server gracefully. First stops all workers, then stops the server.
To be done
- Realise non-graceful stop
  449http_stop_server(Host:Port, Options) :-         % e.g., localhost:4000
  450    ground(Host),
  451    !,
  452    http_stop_server(Port, Options).
  453http_stop_server(Port, _Options) :-
  454    http_workers(Port, 0),                  % checks Port is ground
  455    current_server(Port, _, Thread, Queue, _Scheme, _Start),
  456    retractall(queue_options(Queue, _)),
  457    thread_signal(Thread, throw(http_stop)),
  458    catch(connect(localhost:Port), _, true),
  459    thread_join(Thread, _),
  460    message_queue_destroy(Queue).
  461
  462connect(Address) :-
  463    setup_call_cleanup(
  464        tcp_socket(Socket),
  465        tcp_connect(Socket, Address),
  466        tcp_close_socket(Socket)).
 http_enough_workers(+Queue, +Why, +Peer) is det
Check that we have enough workers in our queue. If not, call the hook http:schedule_workers/1 to extend the worker pool. This predicate can be used by accept_hook/2.
  474http_enough_workers(Queue, _Why, _Peer) :-
  475    message_queue_property(Queue, waiting(_0)),
  476    !,
  477    debug(http(scheduler), '~D waiting for work; ok', [_0]).
  478http_enough_workers(Queue, Why, Peer) :-
  479    message_queue_property(Queue, size(Size)),
  480    (   enough(Size, Why)
  481    ->  debug(http(scheduler), '~D in queue; ok', [Size])
  482    ;   current_server(Port, _, _, Queue, _, _),
  483        Data = _{ port:Port,
  484                  reason:Why,
  485                  peer:Peer,
  486                  waiting:Size,
  487                  queue:Queue
  488                },
  489        debug(http(scheduler), 'Asking to reschedule: ~p', [Data]),
  490        catch(http:schedule_workers(Data),
  491              Error,
  492              print_message(error, Error))
  493    ->  true
  494    ;   true
  495    ).
  496
  497enough(0, _).
  498enough(1, keep_alive).                  % I will be ready myself
 http:schedule_workers(+Data:dict) is semidet
Hook called if a new connection or a keep-alive connection cannot be scheduled immediately to a worker. Dict contains the following keys:
port:Port
Port number that identifies the server.
reason:Reason
One of accept for a new connection or keep_alive if a worker tries to reschedule itself.
peer:Peer
Identify the other end of the connection
waiting:Size
Number of messages waiting in the queue.
queue:Queue
Message queue used to dispatch accepted messages.

Note that, when called with reason:accept, we are called in the time critical main accept loop. An implementation of this hook shall typically send the event to thread dedicated to dynamic worker-pool management.

See also
- http_add_worker/2 may be used to create (temporary) extra workers.
  528                 /*******************************
  529                 *    WORKER QUEUE OPERATIONS   *
  530                 *******************************/
 create_workers(+Options)
Create the pool of HTTP worker-threads. Each worker has the alias http_worker_N.
  537create_workers(Options) :-
  538    option(workers(N), Options, 5),
  539    option(queue(Queue), Options),
  540    catch(message_queue_create(Queue), _, true),
  541    atom_concat(Queue, '_', AliasBase),
  542    create_workers(1, N, Queue, AliasBase, Options),
  543    assert(queue_options(Queue, Options)).
  544
  545create_workers(I, N, _, _, _) :-
  546    I > N,
  547    !.
  548create_workers(I, N, Queue, AliasBase, Options) :-
  549    gensym(AliasBase, Alias),
  550    thread_create(http_worker(Options), Id,
  551                  [ alias(Alias)
  552                  | Options
  553                  ]),
  554    assertz(queue_worker(Queue, Id)),
  555    I2 is I + 1,
  556    create_workers(I2, N, Queue, AliasBase, Options).
 resize_pool(+Queue, +Workers) is det
Create or destroy workers. If workers are destroyed, the call waits until the desired number of waiters is reached.
  564resize_pool(Queue, Size) :-
  565    findall(W, queue_worker(Queue, W), Workers),
  566    length(Workers, Now),
  567    (   Now < Size
  568    ->  queue_options(Queue, Options),
  569        atom_concat(Queue, '_', AliasBase),
  570        I0 is Now+1,
  571        create_workers(I0, Size, Queue, AliasBase, Options)
  572    ;   Now == Size
  573    ->  true
  574    ;   Now > Size
  575    ->  Excess is Now - Size,
  576        thread_self(Me),
  577        forall(between(1, Excess, _), thread_send_message(Queue, quit(Me))),
  578        forall(between(1, Excess, _), thread_get_message(quitted(_)))
  579    ).
 http_worker(+Options)
Run HTTP worker main loop. Workers simply wait until they are passed an accepted socket to process a client.

If the message quit(Sender) is read from the queue, the worker stops.

  590http_worker(Options) :-
  591    debug(http(scheduler), 'New worker', []),
  592    prolog_listen(this_thread_exit, done_worker),
  593    option(queue(Queue), Options),
  594    option(max_idle_time(MaxIdle), Options, infinite),
  595    thread_repeat_wait(get_work(Queue, Message, MaxIdle)),
  596      debug(http(worker), 'Waiting for a job ...', []),
  597      debug(http(worker), 'Got job ~p', [Message]),
  598      (   Message = quit(Sender)
  599      ->  !,
  600          thread_self(Self),
  601          thread_detach(Self),
  602          (   Sender == idle
  603          ->  true
  604          ;   retract(queue_worker(Queue, Self)),
  605              thread_send_message(Sender, quitted(Self))
  606          )
  607      ;   open_client(Message, Queue, Goal, In, Out,
  608                      Options, ClientOptions),
  609          (   catch(http_process(Goal, In, Out, ClientOptions),
  610                    Error, true)
  611          ->  true
  612          ;   Error = goal_failed(http_process/4)
  613          ),
  614          (   var(Error)
  615          ->  fail
  616          ;   current_message_level(Error, Level),
  617              print_message(Level, Error),
  618              memberchk(peer(Peer), ClientOptions),
  619              close_connection(Peer, In, Out),
  620              fail
  621          )
  622      ).
  623
  624get_work(Queue, Message, infinite) :-
  625    !,
  626    thread_get_message(Queue, Message).
  627get_work(Queue, Message, MaxIdle) :-
  628    (   thread_get_message(Queue, Message, [timeout(MaxIdle)])
  629    ->  true
  630    ;   Message = quit(idle)
  631    ).
 open_client(+Message, +Queue, -Goal, -In, -Out, +Options, -ClientOptions) is semidet
Opens the connection to the client in a worker from the message sent to the queue by accept_server/2.
  640open_client(requeue(In, Out, Goal, ClOpts),
  641            _, Goal, In, Out, Opts, ClOpts) :-
  642    !,
  643    memberchk(peer(Peer), ClOpts),
  644    option(keep_alive_timeout(KeepAliveTMO), Opts, 2),
  645    check_keep_alive_connection(In, KeepAliveTMO, Peer, In, Out).
  646open_client(Message, Queue, Goal, In, Out, Opts,
  647            [ pool(client(Queue, Goal, In, Out)),
  648              timeout(Timeout)
  649            | Options
  650            ]) :-
  651    catch(open_client(Message, Goal, In, Out, Options, Opts),
  652          E, report_error(E)),
  653    option(timeout(Timeout), Opts, 60),
  654    (   debugging(http(connection))
  655    ->  memberchk(peer(Peer), Options),
  656        debug(http(connection), 'Opened connection from ~p', [Peer])
  657    ;   true
  658    ).
 open_client(+Message, +Goal, -In, -Out, -ClientOptions, +Options) is det
  664open_client(Message, Goal, In, Out, ClientOptions, Options) :-
  665    open_client_hook(Message, Goal, In, Out, ClientOptions, Options),
  666    !.
  667open_client(tcp_client(Socket, Goal, Peer), Goal, In, Out,
  668            [ peer(Peer),
  669              protocol(http)
  670            ], _) :-
  671    tcp_open_socket(Socket, In, Out).
  672
  673report_error(E) :-
  674    print_message(error, E),
  675    fail.
 check_keep_alive_connection(+In, +TimeOut, +Peer, +In, +Out) is semidet
Wait for the client for at most TimeOut seconds. Succeed if the client starts a new request within this time. Otherwise close the connection and fail.
  684check_keep_alive_connection(In, TMO, Peer, In, Out) :-
  685    stream_property(In, timeout(Old)),
  686    set_stream(In, timeout(TMO)),
  687    debug(http(keep_alive), 'Waiting for keep-alive ...', []),
  688    catch(peek_code(In, Code), E, true),
  689    (   var(E),                     % no exception
  690        Code \== -1                 % no end-of-file
  691    ->  set_stream(In, timeout(Old)),
  692        debug(http(keep_alive), '\tre-using keep-alive connection', [])
  693    ;   (   Code == -1
  694        ->  debug(http(keep_alive), '\tRemote closed keep-alive connection', [])
  695        ;   debug(http(keep_alive), '\tTimeout on keep-alive connection', [])
  696        ),
  697        close_connection(Peer, In, Out),
  698        fail
  699    ).
 done_worker
Called when worker is terminated due to http_workers/2 or a (debugging) exception. In the latter case, recreate_worker/2 creates a new worker.
  708done_worker :-
  709    thread_self(Self),
  710    thread_detach(Self),
  711    retract(queue_worker(Queue, Self)),
  712    thread_property(Self, status(Status)),
  713    !,
  714    (   catch(recreate_worker(Status, Queue), _, fail)
  715    ->  print_message(informational,
  716                      httpd_restarted_worker(Self))
  717    ;   done_status_message_level(Status, Level),
  718        print_message(Level,
  719                      httpd_stopped_worker(Self, Status))
  720    ).
  721done_worker :-                                  % received quit(Sender)
  722    thread_self(Self),
  723    thread_property(Self, status(Status)),
  724    done_status_message_level(Status, Level),
  725    print_message(Level,
  726                  httpd_stopped_worker(Self, Status)).
  727
  728done_status_message_level(true, silent) :- !.
  729done_status_message_level(exception('$aborted'), silent) :- !.
  730done_status_message_level(_, informational).
 recreate_worker(+Status, +Queue) is semidet
Deal with the possibility that threads are, during development, killed with abort/0. We recreate the worker to avoid that eventually we run out of workers. If we are aborted due to a halt/0 call, thread_create/3 will raise a permission error.

The first clause deals with the possibility that we cannot write to user_error. This is possible when Prolog is started as a service using some service managers. Would be nice if we could write an error, but where?

  745recreate_worker(exception(error(io_error(write,user_error),_)), _Queue) :-
  746    halt(2).
  747recreate_worker(exception(Error), Queue) :-
  748    recreate_on_error(Error),
  749    queue_options(Queue, Options),
  750    atom_concat(Queue, '_', AliasBase),
  751    create_workers(1, 1, Queue, AliasBase, Options).
  752
  753recreate_on_error('$aborted').
  754recreate_on_error(time_limit_exceeded).
 thread_httpd:message_level(+Exception, -Level)
Determine the message stream used for exceptions that may occur during server_loop/5. Being multifile, clauses can be added by the application to refine error handling. See also message_hook/3 for further programming error handling.
  763:- multifile
  764    message_level/2.  765
  766message_level(error(io_error(read, _), _),               silent).
  767message_level(error(socket_error(epipe,_), _),           silent).
  768message_level(error(http_write_short(_Obj,_Written), _), silent).
  769message_level(error(timeout_error(read, _), _),          informational).
  770message_level(keep_alive_timeout,                        silent).
  771
  772current_message_level(Term, Level) :-
  773    (   message_level(Term, Level)
  774    ->  true
  775    ;   Level = error
  776    ).
 http_requeue(+Header)
Re-queue a connection to the worker pool. This deals with processing additional requests on keep-alive connections.
  784http_requeue(Header) :-
  785    requeue_header(Header, ClientOptions),
  786    memberchk(pool(client(Queue, Goal, In, Out)), ClientOptions),
  787    memberchk(peer(Peer), ClientOptions),
  788    http_enough_workers(Queue, keep_alive, Peer),
  789    thread_send_message(Queue, requeue(In, Out, Goal, ClientOptions)),
  790    !.
  791http_requeue(Header) :-
  792    debug(http(error), 'Re-queue failed: ~p', [Header]),
  793    fail.
  794
  795requeue_header([], []).
  796requeue_header([H|T0], [H|T]) :-
  797    requeue_keep(H),
  798    !,
  799    requeue_header(T0, T).
  800requeue_header([_|T0], T) :-
  801    requeue_header(T0, T).
  802
  803requeue_keep(pool(_)).
  804requeue_keep(peer(_)).
  805requeue_keep(protocol(_)).
 http_process(Message, Queue, +Options)
Handle a single client message on the given stream.
  812http_process(Goal, In, Out, Options) :-
  813    debug(http(server), 'Running server goal ~p on ~p -> ~p',
  814          [Goal, In, Out]),
  815    option(timeout(TMO), Options, 60),
  816    set_stream(In, timeout(TMO)),
  817    set_stream(Out, timeout(TMO)),
  818    http_wrapper(Goal, In, Out, Connection,
  819                 [ request(Request)
  820                 | Options
  821                 ]),
  822    next(Connection, Request).
  823
  824next(Connection, Request) :-
  825    next_(Connection, Request), !.
  826next(Connection, Request) :-
  827    print_message(warning, goal_failed(next(Connection,Request))).
  828
  829next_(switch_protocol(SwitchGoal, _SwitchOptions), Request) :-
  830    !,
  831    memberchk(pool(client(_Queue, _Goal, In, Out)), Request),
  832    (   catch(call(SwitchGoal, In, Out), E,
  833              (   print_message(error, E),
  834                  fail))
  835    ->  true
  836    ;   http_close_connection(Request)
  837    ).
  838next_(spawned(ThreadId), _) :-
  839    !,
  840    debug(http(spawn), 'Handler spawned to thread ~w', [ThreadId]).
  841next_(Connection, Request) :-
  842    downcase_atom(Connection, 'keep-alive'),
  843    http_requeue(Request),
  844    !.
  845next_(_, Request) :-
  846    http_close_connection(Request).
 http_close_connection(+Request)
Close connection associated to Request. See also http_requeue/1.
  853http_close_connection(Request) :-
  854    memberchk(pool(client(_Queue, _Goal, In, Out)), Request),
  855    memberchk(peer(Peer), Request),
  856    close_connection(Peer, In, Out).
 close_connection(+Peer, +In, +Out)
Closes the connection from the server to the client. Errors are currently silently ignored.
  863close_connection(Peer, In, Out) :-
  864    debug(http(connection), 'Closing connection from ~p', [Peer]),
  865    catch(close(In, [force(true)]), _, true),
  866    catch(close(Out, [force(true)]), _, true).
 http_spawn(:Goal, +Options) is det
Continue this connection on a new thread. A handler may call http_spawn/2 to start a new thread that continues processing the current request using Goal. The original thread returns to the worker pool for processing new requests. Options are passed to thread_create/3, except for:
pool(+Pool)
Interfaces to library(thread_pool), starting the thread on the given pool.

If a pool does not exist, this predicate calls the multifile hook http:create_pool/1 to create it. If this predicate succeeds the operation is retried.

  884http_spawn(Goal, Options) :-
  885    select_option(pool(Pool), Options, ThreadOptions),
  886    !,
  887    current_output(CGI),
  888    catch(thread_create_in_pool(Pool,
  889                                wrap_spawned(CGI, Goal), Id,
  890                                [ detached(true)
  891                                | ThreadOptions
  892                                ]),
  893          Error,
  894          true),
  895    (   var(Error)
  896    ->  http_spawned(Id)
  897    ;   Error = error(resource_error(threads_in_pool(_)), _)
  898    ->  throw(http_reply(busy))
  899    ;   Error = error(existence_error(thread_pool, Pool), _),
  900        create_pool(Pool)
  901    ->  http_spawn(Goal, Options)
  902    ;   throw(Error)
  903    ).
  904http_spawn(Goal, Options) :-
  905    current_output(CGI),
  906    thread_create(wrap_spawned(CGI, Goal), Id,
  907                  [ detached(true)
  908                  | Options
  909                  ]),
  910    http_spawned(Id).
  911
  912wrap_spawned(CGI, Goal) :-
  913    set_output(CGI),
  914    http_wrap_spawned(Goal, Request, Connection),
  915    next(Connection, Request).
 create_pool(+Pool)
Lazy creation of worker-pools for the HTTP server. This predicate calls the hook http:create_pool/1. If the hook fails it creates a default pool of size 10. This should suffice most typical usecases. Note that we get a permission error if the pool is already created. We can ignore this.
  925create_pool(Pool) :-
  926    E = error(permission_error(create, thread_pool, Pool), _),
  927    catch(http:create_pool(Pool), E, true).
  928create_pool(Pool) :-
  929    print_message(informational, httpd(created_pool(Pool))),
  930    thread_pool_create(Pool, 10, []).
  931
  932
  933		 /*******************************
  934		 *         WAIT POLICIES	*
  935		 *******************************/
  936
  937:- meta_predicate
  938    thread_repeat_wait(0).
 thread_repeat_wait(:Goal) is multi
Acts as repeat, thread_idle(Goal), choosing whether to use a long or short idle time based on the average firing rate.
  945thread_repeat_wait(Goal) :-
  946    new_rate_mma(5, 1000, State),
  947    repeat,
  948      update_rate_mma(State, MMA),
  949      long(MMA, IsLong),
  950      (   IsLong == brief
  951      ->  call(Goal)
  952      ;   thread_idle(Goal, IsLong)
  953      ).
  954
  955long(MMA, brief) :-
  956    MMA < 0.05,
  957    !.
  958long(MMA, short) :-
  959    MMA < 1,
  960    !.
  961long(_, long).
 new_rate_mma(+N, +Resolution, -State) is det
 update_rate_mma(!State, -MMA) is det
Implement Modified Moving Average computing the average time between requests as an exponential moving averate with alpha=1/N.
Arguments:
Resolution- is the time resolution in 1/Resolution seconds. All storage is done in integers to avoid the need for stack freezing in nb_setarg/3.
See also
- https://en.wikipedia.org/wiki/Moving_average
  975new_rate_mma(N, Resolution, rstate(Base, 0, MaxI, Resolution, N, 0)) :-
  976    current_prolog_flag(max_tagged_integer, MaxI),
  977    get_time(Base).
  978
  979update_rate_mma(State, MMAr) :-
  980    State = rstate(Base, Last, MaxI, Resolution, N, MMA0),
  981    get_time(Now),
  982    Stamp is round((Now-Base)*Resolution),
  983    (   Stamp > MaxI
  984    ->  nb_setarg(1, State, Now),
  985        nb_setarg(2, State, 0)
  986    ;   true
  987    ),
  988    Diff is Stamp-Last,
  989    nb_setarg(2, State, Stamp),
  990    MMA is round(((N-1)*MMA0+Diff)/N),
  991    nb_setarg(6, State, MMA),
  992    MMAr is MMA/float(Resolution).
  993
  994
  995                 /*******************************
  996                 *            MESSAGES          *
  997                 *******************************/
  998
  999:- multifile
 1000    prolog:message/3. 1001
 1002prolog:message(httpd_started_server(Port, Options)) -->
 1003    [ 'Started server at '-[] ],
 1004    http_root(Port, Options).
 1005prolog:message(httpd_stopped_worker(Self, Status)) -->
 1006    [ 'Stopped worker ~p: ~p'-[Self, Status] ].
 1007prolog:message(httpd_restarted_worker(Self)) -->
 1008    [ 'Replaced aborted worker ~p'-[Self] ].
 1009prolog:message(httpd(created_pool(Pool))) -->
 1010    [ 'Created thread-pool ~p of size 10'-[Pool], nl,
 1011      'Create this pool at startup-time or define the hook ', nl,
 1012      'http:create_pool/1 to avoid this message and create a ', nl,
 1013      'pool that fits the usage-profile.'
 1014    ].
 1015
 1016http_root(Address, Options) -->
 1017    { landing_page(Address, URI, Options) },
 1018    [ '~w'-[URI] ].
 1019
 1020landing_page(Host:Port, URI, Options) :-
 1021    must_be(atom, Host),
 1022    http_server_property(Port, scheme(Scheme)),
 1023    (   default_port(Scheme, Port)
 1024    ->  format(atom(Base), '~w://~w', [Scheme, Host])
 1025    ;   format(atom(Base), '~w://~w:~w', [Scheme, Host, Port])
 1026    ),
 1027    entry_page(Base, URI, Options).
 1028landing_page(Port, URI, Options) :-
 1029    landing_page(localhost:Port, URI, Options).
 1030
 1031default_port(http, 80).
 1032default_port(https, 443).
 1033
 1034entry_page(Base, URI, Options) :-
 1035    option(entry_page(Entry), Options),
 1036    !,
 1037    uri_resolve(Entry, Base, URI).
 1038entry_page(Base, URI, _) :-
 1039    http_absolute_location(root(.), Entry, []),
 1040    uri_resolve(Entry, Base, URI)