View source with raw comments or as raw
    1/*  Part of SWI-Prolog
    2
    3    Author:        Jan Wielemaker
    4    E-mail:        J.Wielemaker@vu.nl
    5    WWW:           http://www.swi-prolog.org
    6    Copyright (c)  2002-2024, University of Amsterdam
    7                              VU University Amsterdam
    8                              CWI, Amsterdam
    9                              SWI-Prolog Solutions b.v.
   10    All rights reserved.
   11
   12    Redistribution and use in source and binary forms, with or without
   13    modification, are permitted provided that the following conditions
   14    are met:
   15
   16    1. Redistributions of source code must retain the above copyright
   17       notice, this list of conditions and the following disclaimer.
   18
   19    2. Redistributions in binary form must reproduce the above copyright
   20       notice, this list of conditions and the following disclaimer in
   21       the documentation and/or other materials provided with the
   22       distribution.
   23
   24    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
   25    "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
   26    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
   27    FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
   28    COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
   29    INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
   30    BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
   31    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
   32    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   33    LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
   34    ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
   35    POSSIBILITY OF SUCH DAMAGE.
   36*/
   37
   38:- module(thread_httpd,
   39          [ http_current_server/2,      % ?:Goal, ?Port
   40            http_server_property/2,     % ?Port, ?Property
   41            http_server/2,              % :Goal, +Options
   42            http_workers/2,             % +Port, ?WorkerCount
   43            http_add_worker/2,          % +Port, +Options
   44            http_current_worker/2,      % ?Port, ?ThreadID
   45            http_stop_server/2,         % +Port, +Options
   46            http_spawn/2,               % :Goal, +Options
   47
   48            http_requeue/1,             % +Request
   49            http_close_connection/1,    % +Request
   50            http_enough_workers/3       % +Queue, +Why, +Peer
   51          ]).   52:- use_module(library(debug)).   53:- use_module(library(error)).   54:- use_module(library(option)).   55:- use_module(library(socket)).   56:- use_module(library(thread_pool)).   57:- use_module(library(gensym)).   58:- use_module(http_wrapper).   59:- use_module(http_path).   60
   61:- autoload(library(uri), [uri_resolve/3]).   62:- autoload(library(aggregate), [aggregate_all/3]).   63
   64:- predicate_options(http_server/2, 2,
   65                     [ port(any),
   66                       unix_socket(atom),
   67                       entry_page(atom),
   68                       tcp_socket(any),
   69                       workers(positive_integer),
   70                       timeout(number),
   71                       keep_alive_timeout(number),
   72                       silent(boolean),
   73                       ssl(list(any)),  % if http/http_ssl_plugin is loaded
   74                       pass_to(system:thread_create/3, 3)
   75                     ]).   76:- predicate_options(http_spawn/2, 2,
   77                     [ pool(atom),
   78                       pass_to(system:thread_create/3, 3),
   79                       pass_to(thread_pool:thread_create_in_pool/4, 4)
   80                     ]).   81:- predicate_options(http_add_worker/2, 2,
   82                     [ timeout(number),
   83                       keep_alive_timeout(number),
   84                       max_idle_time(number),
   85                       pass_to(system:thread_create/3, 3)
   86                     ]).

Threaded HTTP server

Most code doesn't need to use this directly; instead use library(http/http_server), which combines this library with the typical HTTP libraries that most servers need.

This library defines the HTTP server frontend of choice for SWI-Prolog. It is based on the multi-threading capabilities of SWI-Prolog and thus exploits multiple cores to serve requests concurrently. The server scales well and can cooperate with library(thread_pool) to control the number of concurrent requests of a given type. For example, it can be configured to handle 200 file download requests concurrently, 2 requests that potentially uses a lot of memory and 8 requests that use a lot of CPU resources.

On Unix systems, this library can be combined with library(http/http_unix_daemon) to realise a proper Unix service process that creates a web server at port 80, runs under a specific account, optionally detaches from the controlling terminal, etc.

Combined with library(http/http_ssl_plugin) from the SSL package, this library can be used to create an HTTPS server. See <plbase>/doc/packages/examples/ssl/https for an example server using a self-signed SSL certificate. */

  114:- meta_predicate
  115    http_server(1, :),
  116    http_current_server(1, ?),
  117    http_spawn(0, +).  118
  119:- dynamic
  120    current_server/6,       % Port, Goal, Thread, Queue, Scheme, StartTime
  121    queue_worker/2,         % Queue, ThreadID
  122    queue_options/2.        % Queue, Options
  123
  124:- multifile
  125    make_socket_hook/3,
  126    accept_hook/2,
  127    close_hook/1,
  128    open_client_hook/6,
  129    discard_client_hook/1,
  130    http:create_pool/1,
  131    http:schedule_workers/1.  132
  133:- meta_predicate
  134    thread_repeat_wait(0).
 http_server(:Goal, :Options) is det
Create a server at Port that calls Goal for each parsed request. Options provide a list of options. Defined options are
port(?Address)
Port to bind to. Address is either a port or a term Host:Port. The port may be a variable, causing the system to select a free port. See tcp_bind/2.
unix_socket(+Path)
Instead of binding to a TCP port, bind to a Unix Domain Socket at Path.
entry_page(+URI)
Affects the message printed while the server is started. Interpreted as a URI relative to the server root.
tcp_socket(+Socket)
If provided, use this socket instead of the creating one and binding it to an address. The socket must be bound to an address.
workers(+Count)
Determine the number of worker threads. Default is 5. This is fine for small scale usage. Public servers typically need a higher number.
timeout(+Seconds)
Maximum time of inactivity trying to read the request after a connection has been opened. Default is 60 seconds. See set_stream/1 using the timeout option.
keep_alive_timeout(+Seconds)
Time to keep `Keep alive' connections alive. Default is 2 seconds.
stack_limit(+Bytes)
Stack limit to use for the workers. The default is inherited from the main thread. If you need to control resource usage you may consider the spawn option of http_handler/3 and library(thread_pool).
silent(Bool)
If true (default false), do not print an informational message that the server was started.

A typical initialization for an HTTP server that uses http_dispatch/1 to relay requests to predicates is:

:- use_module(library(http/thread_httpd)).
:- use_module(library(http/http_dispatch)).

start_server(Port) :-
    http_server(http_dispatch, [port(Port)]).

Note that multiple servers can coexist in the same Prolog process. A notable application of this is to have both an HTTP and HTTPS server, where the HTTP server redirects to the HTTPS server for handling sensitive requests.

  199http_server(Goal, M:Options0) :-
  200    server_address(Address, Options0),
  201    !,
  202    make_socket(Address, M:Options0, Options),
  203    create_workers(Options),
  204    create_server(Goal, Address, Options),
  205    (   option(silent(true), Options0)
  206    ->  true
  207    ;   print_message(informational,
  208                      httpd_started_server(Address, Options0))
  209    ).
  210http_server(_Goal, _:Options0) :-
  211    existence_error(server_address, Options0).
  212
  213server_address(Address, Options) :-
  214    (   option(port(Port), Options)
  215    ->  Address = Port
  216    ;   option(unix_socket(Path), Options)
  217    ->  Address = unix_socket(Path)
  218    ).
  219
  220address_port(_IFace:Port, Port) :- !.
  221address_port(unix_socket(Path), Path) :- !.
  222address_port(Address, Address) :- !.
  223
  224tcp_address(Port) :-
  225    var(Port),
  226    !.
  227tcp_address(Port) :-
  228    integer(Port),
  229    !.
  230tcp_address(_Iface:_Port).
  231
  232address_domain(localhost:_Port, Domain) =>
  233    Domain = inet.
  234address_domain(Iface:_Port, Domain) =>
  235    (   catch(ip_name(IP, Iface), error(_,_), fail),
  236        functor(IP, ip, 8)
  237    ->  Domain = inet6
  238    ;   Domain = inet
  239    ).
  240address_domain(_, Domain) =>
  241    Domain = inet.
 make_socket(+Address, :OptionsIn, -OptionsOut) is det
Create the HTTP server socket and worker pool queue. OptionsOut is quaranteed to hold the option queue(QueueId).
Arguments:
OptionsIn- is qualified to allow passing the module-sensitive ssl option argument.
  252make_socket(Address, M:Options0, Options) :-
  253    tcp_address(Address),
  254    make_socket_hook(Address, M:Options0, Options),
  255    !.
  256make_socket(Address, _:Options0, Options) :-
  257    option(tcp_socket(_), Options0),
  258    !,
  259    make_addr_atom('httpd', Address, Queue),
  260    Options = [ queue(Queue)
  261              | Options0
  262              ].
  263make_socket(Address, _:Options0, Options) :-
  264    tcp_address(Address),
  265    !,
  266    address_domain(Address, Domain),
  267    socket_create(Socket, [domain(Domain)]),
  268    tcp_setopt(Socket, reuseaddr),
  269    tcp_bind(Socket, Address),
  270    tcp_listen(Socket, 64),
  271    make_addr_atom('httpd', Address, Queue),
  272    Options = [ queue(Queue),
  273                tcp_socket(Socket)
  274              | Options0
  275              ].
  276:- if(current_predicate(unix_domain_socket/1)).  277make_socket(Address, _:Options0, Options) :-
  278    Address = unix_socket(Path),
  279    !,
  280    unix_domain_socket(Socket),
  281    tcp_bind(Socket, Path),
  282    tcp_listen(Socket, 64),
  283    make_addr_atom('httpd', Address, Queue),
  284    Options = [ queue(Queue),
  285                tcp_socket(Socket)
  286              | Options0
  287              ].
  288:- endif.
 make_addr_atom(+Scheme, +Address, -Atom) is det
Create an atom that identifies the server's queue and thread resources.
  295make_addr_atom(Scheme, Address, Atom) :-
  296    phrase(address_parts(Address), Parts),
  297    atomic_list_concat([Scheme,@|Parts], Atom).
  298
  299address_parts(Var) -->
  300    { var(Var),
  301      !,
  302      instantiation_error(Var)
  303    }.
  304address_parts(Atomic) -->
  305    { atomic(Atomic) },
  306    !,
  307    [Atomic].
  308address_parts(Host:Port) -->
  309    !,
  310    address_parts(Host), [:], address_parts(Port).
  311address_parts(ip(A,B,C,D)) -->
  312    !,
  313    [ A, '.', B, '.', C, '.', D ].
  314address_parts(unix_socket(Path)) -->
  315    [Path].
  316address_parts(Address) -->
  317    { domain_error(http_server_address, Address) }.
 create_server(:Goal, +Address, +Options) is det
Create the main server thread that runs accept_server/2 to listen to new requests.
  325create_server(Goal, Address, Options) :-
  326    get_time(StartTime),
  327    memberchk(queue(Queue), Options),
  328    scheme(Scheme, Options),
  329    autoload_https(Scheme),
  330    address_port(Address, Port),
  331    make_addr_atom(Scheme, Port, Alias),
  332    thread_self(Initiator),
  333    thread_create(accept_server(Goal, Initiator, Options), _,
  334                  [ alias(Alias)
  335                  ]),
  336    thread_get_message(server_started),
  337    assert(current_server(Port, Goal, Alias, Queue, Scheme, StartTime)).
  338
  339scheme(Scheme, Options) :-
  340    option(scheme(Scheme), Options),
  341    !.
  342scheme(Scheme, Options) :-
  343    (   option(ssl(_), Options)
  344    ;   option(ssl_instance(_), Options)
  345    ),
  346    !,
  347    Scheme = https.
  348scheme(http, _).
  349
  350autoload_https(https) :-
  351    \+ clause(accept_hook(_Goal, _Options), _),
  352    exists_source(library(http/http_ssl_plugin)),
  353    !,
  354    use_module(library(http/http_ssl_plugin)).
  355autoload_https(_).
 http_current_server(:Goal, ?Port) is nondet
True if Goal is the goal of a server at Port.
deprecated
- Use http_server_property(Port, goal(Goal))
  363http_current_server(Goal, Port) :-
  364    current_server(Port, Goal, _, _, _, _).
 http_server_property(?Port, ?Property) is nondet
True if Property is a property of the HTTP server running at Port. Defined properties are:
goal(:Goal)
Goal used to start the server. This is often http_dispatch/1.
scheme(-Scheme)
Scheme is one of http or https.
start_time(?Time)
Time-stamp when the server was created.
  380http_server_property(_:Port, Property) :-
  381    integer(Port),
  382    !,
  383    server_property(Property, Port).
  384http_server_property(Port, Property) :-
  385    server_property(Property, Port).
  386
  387server_property(goal(Goal), Port) :-
  388    current_server(Port, Goal, _, _, _, _).
  389server_property(scheme(Scheme), Port) :-
  390    current_server(Port, _, _, _, Scheme, _).
  391server_property(start_time(Time), Port) :-
  392    current_server(Port, _, _, _, _, Time).
 http_workers(?Port, -Workers) is nondet
http_workers(+Port, +Workers:int) is det
Query or set the number of workers for the server at this port. The number of workers is dynamically modified. Setting it to 1 (one) can be used to profile the worker using tprofile/1.
See also
- library(http/http_dyn_workers) implements dynamic management of the worker pool depending on usage.
  405http_workers(Port, Workers) :-
  406    integer(Workers),
  407    !,
  408    must_be(ground, Port),
  409    (   current_server(Port, _, _, Queue, _, _)
  410    ->  resize_pool(Queue, Workers)
  411    ;   existence_error(http_server, Port)
  412    ).
  413http_workers(Port, Workers) :-
  414    current_server(Port, _, _, Queue, _, _),
  415    aggregate_all(count, queue_worker(Queue, _Worker), Workers).
 http_add_worker(+Port, +Options) is det
Add a new worker to the HTTP server for port Port. Options overrule the default queue options. The following additional options are processed:
max_idle_time(+Seconds)
The created worker will automatically terminate if there is no new work within Seconds.
  428http_add_worker(Port, Options) :-
  429    must_be(ground, Port),
  430    current_server(Port, _, _, Queue, _, _),
  431    !,
  432    queue_options(Queue, QueueOptions),
  433    merge_options(Options, QueueOptions, WorkerOptions),
  434    atom_concat(Queue, '_', AliasBase),
  435    create_workers(1, 1, Queue, AliasBase, WorkerOptions).
  436http_add_worker(Port, _) :-
  437    existence_error(http_server, Port).
 http_current_worker(?Port, ?ThreadID) is nondet
True if ThreadID is the identifier of a Prolog thread serving Port. This predicate is motivated to allow for the use of arbitrary interaction with the worker thread for development and statistics.
  447http_current_worker(Port, ThreadID) :-
  448    current_server(Port, _, _, Queue, _, _),
  449    queue_worker(Queue, ThreadID).
 accept_server(:Goal, +Initiator, +Options)
The goal of a small server-thread accepting new requests and posting them to the queue of workers.
  457accept_server(Goal, Initiator, Options) :-
  458    Ex = http_stop(Stopper),
  459    catch(accept_server2(Goal, Initiator, Options), Ex, true),
  460    thread_self(Thread),
  461    debug(http(stop), '[~p]: accept server received ~p', [Thread, Ex]),
  462    retract(current_server(_Port, _, Thread, Queue, _Scheme, _StartTime)),
  463    close_pending_accepts(Queue),
  464    close_server_socket(Options),
  465    thread_send_message(Stopper, http_stopped).
  466
  467accept_server2(Goal, Initiator, Options) :-
  468    thread_send_message(Initiator, server_started),
  469    repeat,
  470      (   catch(accept_server3(Goal, Options), E, true)
  471      ->  (   var(E)
  472          ->  fail
  473          ;   accept_rethrow_error(E)
  474          ->  throw(E)
  475          ;   print_message(error, E),
  476              fail
  477          )
  478      ;   print_message(error,      % internal error
  479                        goal_failed(accept_server3(Goal, Options))),
  480          fail
  481      ).
  482
  483accept_server3(Goal, Options) :-
  484    accept_hook(Goal, Options),
  485    !.
  486accept_server3(Goal, Options) :-
  487    memberchk(tcp_socket(Socket), Options),
  488    memberchk(queue(Queue), Options),
  489    debug(http(connection), 'Waiting for connection', []),
  490    tcp_accept(Socket, Client, Peer),
  491    sig_atomic(send_to_worker(Queue, Client, Goal, Peer)),
  492    http_enough_workers(Queue, accept, Peer).
  493
  494send_to_worker(Queue, Client, Goal, Peer) :-
  495    debug(http(connection), 'New HTTP connection from ~p', [Peer]),
  496    thread_send_message(Queue, tcp_client(Client, Goal, Peer)).
  497
  498accept_rethrow_error(http_stop(_)).
  499accept_rethrow_error('$aborted').
 close_server_socket(+Options)
Close the server socket.
  506close_server_socket(Options) :-
  507    close_hook(Options),
  508    !.
  509close_server_socket(Options) :-
  510    memberchk(tcp_socket(Socket), Options),
  511    !,
  512    tcp_close_socket(Socket).
 close_pending_accepts(+Queue)
  516close_pending_accepts(Queue) :-
  517    (   thread_get_message(Queue, Msg, [timeout(0)])
  518    ->  close_client(Msg),
  519        close_pending_accepts(Queue)
  520    ;   true
  521    ).
  522
  523close_client(tcp_client(Client, _Goal, _0Peer)) =>
  524    debug(http(stop), 'Closing connection from ~p during shut-down', [_0Peer]),
  525    tcp_close_socket(Client).
  526close_client(Msg) =>
  527    (   discard_client_hook(Msg)
  528    ->  true
  529    ;   print_message(warning, http_close_client(Msg))
  530    ).
 http_stop_server(+Port, +Options)
Stop the indicated HTTP server gracefully. First stops all workers, then stops the server.
To be done
- Realise non-graceful stop
  540http_stop_server(Host:Port, Options) :-         % e.g., localhost:4000
  541    ground(Host),
  542    !,
  543    http_stop_server(Port, Options).
  544http_stop_server(Port, _Options) :-
  545    http_workers(Port, 0),                  % checks Port is ground
  546    current_server(Port, _, Thread, Queue, _Scheme, _Start),
  547    retractall(queue_options(Queue, _)),
  548    debug(http(stop), 'Signalling HTTP server thread ~p to stop', [Thread]),
  549    thread_self(Stopper),
  550    thread_signal(Thread, throw(http_stop(Stopper))),
  551    (   thread_get_message(Stopper, http_stopped, [timeout(0.1)])
  552    ->  true
  553    ;   catch(connect(localhost:Port), _, true)
  554    ),
  555    thread_join(Thread, _0Status),
  556    debug(http(stop), 'Joined HTTP server thread ~p (~p)', [Thread, _0Status]),
  557    message_queue_destroy(Queue).
  558
  559connect(Address) :-
  560    setup_call_cleanup(
  561        tcp_socket(Socket),
  562        tcp_connect(Socket, Address),
  563        tcp_close_socket(Socket)).
 http_enough_workers(+Queue, +Why, +Peer) is det
Check that we have enough workers in our queue. If not, call the hook http:schedule_workers/1 to extend the worker pool. This predicate can be used by accept_hook/2.
  571http_enough_workers(Queue, _Why, _Peer) :-
  572    message_queue_property(Queue, waiting(_0)),
  573    !,
  574    debug(http(scheduler), '~D waiting for work; ok', [_0]).
  575http_enough_workers(Queue, Why, Peer) :-
  576    message_queue_property(Queue, size(Size)),
  577    (   enough(Size, Why)
  578    ->  debug(http(scheduler), '~D in queue; ok', [Size])
  579    ;   current_server(Port, _, _, Queue, _, _),
  580        Data = _{ port:Port,
  581                  reason:Why,
  582                  peer:Peer,
  583                  waiting:Size,
  584                  queue:Queue
  585                },
  586        debug(http(scheduler), 'Asking to reschedule: ~p', [Data]),
  587        catch(http:schedule_workers(Data),
  588              Error,
  589              print_message(error, Error))
  590    ->  true
  591    ;   true
  592    ).
  593
  594enough(0, _).
  595enough(1, keep_alive).                  % I will be ready myself
 http:schedule_workers(+Data:dict) is semidet
Hook called if a new connection or a keep-alive connection cannot be scheduled immediately to a worker. Dict contains the following keys:
port:Port
Port number that identifies the server.
reason:Reason
One of accept for a new connection or keep_alive if a worker tries to reschedule itself.
peer:Peer
Identify the other end of the connection
waiting:Size
Number of messages waiting in the queue.
queue:Queue
Message queue used to dispatch accepted messages.

Note that, when called with reason:accept, we are called in the time critical main accept loop. An implementation of this hook shall typically send the event to thread dedicated to dynamic worker-pool management.

See also
- http_add_worker/2 may be used to create (temporary) extra workers.
  625                 /*******************************
  626                 *    WORKER QUEUE OPERATIONS   *
  627                 *******************************/
 create_workers(+Options)
Create the pool of HTTP worker-threads. Each worker has the alias http_worker_N.
  634create_workers(Options) :-
  635    option(workers(N), Options, 5),
  636    option(queue(Queue), Options),
  637    catch(message_queue_create(Queue), _, true),
  638    atom_concat(Queue, '_', AliasBase),
  639    create_workers(1, N, Queue, AliasBase, Options),
  640    assert(queue_options(Queue, Options)).
  641
  642create_workers(I, N, _, _, _) :-
  643    I > N,
  644    !.
  645create_workers(I, N, Queue, AliasBase, Options) :-
  646    gensym(AliasBase, Alias),
  647    thread_create(http_worker(Options), Id,
  648                  [ alias(Alias)
  649                  | Options
  650                  ]),
  651    assertz(queue_worker(Queue, Id)),
  652    I2 is I + 1,
  653    create_workers(I2, N, Queue, AliasBase, Options).
 resize_pool(+Queue, +Workers) is det
Create or destroy workers. If workers are destroyed, the call waits until the desired number of waiters is reached.
  661resize_pool(Queue, Size) :-
  662    findall(W, queue_worker(Queue, W), Workers),
  663    length(Workers, Now),
  664    (   Now < Size
  665    ->  queue_options(Queue, Options),
  666        atom_concat(Queue, '_', AliasBase),
  667        I0 is Now+1,
  668        create_workers(I0, Size, Queue, AliasBase, Options)
  669    ;   Now == Size
  670    ->  true
  671    ;   Now > Size
  672    ->  Excess is Now - Size,
  673        thread_self(Me),
  674        forall(between(1, Excess, _), thread_send_message(Queue, quit(Me))),
  675        forall(between(1, Excess, _), thread_get_message(quitted(_)))
  676    ).
 http_worker(+Options)
Run HTTP worker main loop. Workers simply wait until they are passed an accepted socket to process a client.

If the message quit(Sender) is read from the queue, the worker stops.

  687http_worker(Options) :-
  688    debug(http(scheduler), 'New worker', []),
  689    prolog_listen(this_thread_exit, done_worker),
  690    option(queue(Queue), Options),
  691    option(max_idle_time(MaxIdle), Options, infinite),
  692    thread_repeat_wait(get_work(Queue, Message, MaxIdle)),
  693      debug(http(worker), 'Waiting for a job ...', []),
  694      debug(http(worker), 'Got job ~p', [Message]),
  695      (   Message = quit(Sender)
  696      ->  !,
  697          thread_self(Self),
  698          thread_detach(Self),
  699          (   Sender == idle
  700          ->  true
  701          ;   retract(queue_worker(Queue, Self)),
  702              thread_send_message(Sender, quitted(Self))
  703          )
  704      ;   open_client(Message, Queue, Goal, In, Out,
  705                      Options, ClientOptions),
  706          (   catch(http_process(Goal, In, Out, ClientOptions),
  707                    Error, true)
  708          ->  true
  709          ;   Error = goal_failed(http_process/4)
  710          ),
  711          (   var(Error)
  712          ->  fail
  713          ;   current_message_level(Error, Level),
  714              print_message(Level, Error),
  715              memberchk(peer(Peer), ClientOptions),
  716              close_connection(Peer, In, Out),
  717              fail
  718          )
  719      ).
  720
  721get_work(Queue, Message, infinite) :-
  722    !,
  723    thread_get_message(Queue, Message).
  724get_work(Queue, Message, MaxIdle) :-
  725    (   thread_get_message(Queue, Message, [timeout(MaxIdle)])
  726    ->  true
  727    ;   Message = quit(idle)
  728    ).
 open_client(+Message, +Queue, -Goal, -In, -Out, +Options, -ClientOptions) is semidet
Opens the connection to the client in a worker from the message sent to the queue by accept_server/2.
  737open_client(requeue(In, Out, Goal, ClOpts),
  738            _, Goal, In, Out, Opts, ClOpts) :-
  739    !,
  740    memberchk(peer(Peer), ClOpts),
  741    option(keep_alive_timeout(KeepAliveTMO), Opts, 2),
  742    check_keep_alive_connection(In, KeepAliveTMO, Peer, In, Out).
  743open_client(Message, Queue, Goal, In, Out, Opts,
  744            [ pool(client(Queue, Goal, In, Out)),
  745              timeout(Timeout)
  746            | Options
  747            ]) :-
  748    catch(open_client(Message, Goal, In, Out, Options, Opts),
  749          E, report_error(E)),
  750    option(timeout(Timeout), Opts, 60),
  751    (   debugging(http(connection))
  752    ->  memberchk(peer(Peer), Options),
  753        debug(http(connection), 'Opened connection from ~p', [Peer])
  754    ;   true
  755    ).
 open_client(+Message, +Goal, -In, -Out, -ClientOptions, +Options) is det
  761open_client(Message, Goal, In, Out, ClientOptions, Options) :-
  762    open_client_hook(Message, Goal, In, Out, ClientOptions, Options),
  763    !.
  764open_client(tcp_client(Socket, Goal, Peer), Goal, In, Out,
  765            [ peer(Peer),
  766              protocol(http)
  767            ], _) :-
  768    tcp_open_socket(Socket, In, Out).
  769
  770report_error(E) :-
  771    print_message(error, E),
  772    fail.
 check_keep_alive_connection(+In, +TimeOut, +Peer, +In, +Out) is semidet
Wait for the client for at most TimeOut seconds. Succeed if the client starts a new request within this time. Otherwise close the connection and fail.
  781check_keep_alive_connection(In, TMO, Peer, In, Out) :-
  782    stream_property(In, timeout(Old)),
  783    set_stream(In, timeout(TMO)),
  784    debug(http(keep_alive), 'Waiting for keep-alive ...', []),
  785    catch(peek_code(In, Code), E, true),
  786    (   var(E),                     % no exception
  787        Code \== -1                 % no end-of-file
  788    ->  set_stream(In, timeout(Old)),
  789        debug(http(keep_alive), '\tre-using keep-alive connection', [])
  790    ;   (   Code == -1
  791        ->  debug(http(keep_alive), '\tRemote closed keep-alive connection', [])
  792        ;   debug(http(keep_alive), '\tTimeout on keep-alive connection', [])
  793        ),
  794        close_connection(Peer, In, Out),
  795        fail
  796    ).
 done_worker
Called when worker is terminated due to http_workers/2 or a (debugging) exception. In the latter case, recreate_worker/2 creates a new worker.
  805done_worker :-
  806    thread_self(Self),
  807    thread_detach(Self),
  808    retract(queue_worker(Queue, Self)),
  809    thread_property(Self, status(Status)),
  810    !,
  811    (   catch(recreate_worker(Status, Queue), _, fail)
  812    ->  print_message(informational,
  813                      httpd_restarted_worker(Self))
  814    ;   done_status_message_level(Status, Level),
  815        print_message(Level,
  816                      httpd_stopped_worker(Self, Status))
  817    ).
  818done_worker :-                                  % received quit(Sender)
  819    thread_self(Self),
  820    thread_property(Self, status(Status)),
  821    done_status_message_level(Status, Level),
  822    print_message(Level,
  823                  httpd_stopped_worker(Self, Status)).
  824
  825done_status_message_level(true, silent) :- !.
  826done_status_message_level(exception('$aborted'), silent) :- !.
  827done_status_message_level(_, informational).
 recreate_worker(+Status, +Queue) is semidet
Deal with the possibility that threads are, during development, killed with abort/0. We recreate the worker to avoid that eventually we run out of workers. If we are aborted due to a halt/0 call, thread_create/3 will raise a permission error.

The first clause deals with the possibility that we cannot write to user_error. This is possible when Prolog is started as a service using some service managers. Would be nice if we could write an error, but where?

  842recreate_worker(exception(error(io_error(write,user_error),_)), _Queue) :-
  843    halt(2).
  844recreate_worker(exception(Error), Queue) :-
  845    recreate_on_error(Error),
  846    queue_options(Queue, Options),
  847    atom_concat(Queue, '_', AliasBase),
  848    create_workers(1, 1, Queue, AliasBase, Options).
  849
  850recreate_on_error('$aborted').
  851recreate_on_error(time_limit_exceeded).
 thread_httpd:message_level(+Exception, -Level)
Determine the message stream used for exceptions that may occur during server_loop/5. Being multifile, clauses can be added by the application to refine error handling. See also message_hook/3 for further programming error handling.
  860:- multifile
  861    message_level/2.  862
  863message_level(error(io_error(read, _), _),               silent).
  864message_level(error(socket_error(epipe,_), _),           silent).
  865message_level(error(http_write_short(_Obj,_Written), _), silent).
  866message_level(error(timeout_error(read, _), _),          informational).
  867message_level(keep_alive_timeout,                        silent).
  868
  869current_message_level(Term, Level) :-
  870    (   message_level(Term, Level)
  871    ->  true
  872    ;   Level = error
  873    ).
 http_requeue(+Header)
Re-queue a connection to the worker pool. This deals with processing additional requests on keep-alive connections.
  881http_requeue(Header) :-
  882    requeue_header(Header, ClientOptions),
  883    memberchk(pool(client(Queue, Goal, In, Out)), ClientOptions),
  884    memberchk(peer(Peer), ClientOptions),
  885    http_enough_workers(Queue, keep_alive, Peer),
  886    thread_send_message(Queue, requeue(In, Out, Goal, ClientOptions)),
  887    !.
  888http_requeue(Header) :-
  889    debug(http(error), 'Re-queue failed: ~p', [Header]),
  890    fail.
  891
  892requeue_header([], []).
  893requeue_header([H|T0], [H|T]) :-
  894    requeue_keep(H),
  895    !,
  896    requeue_header(T0, T).
  897requeue_header([_|T0], T) :-
  898    requeue_header(T0, T).
  899
  900requeue_keep(pool(_)).
  901requeue_keep(peer(_)).
  902requeue_keep(protocol(_)).
 http_process(Message, Queue, +Options)
Handle a single client message on the given stream.
  909http_process(Goal, In, Out, Options) :-
  910    debug(http(server), 'Running server goal ~p on ~p -> ~p',
  911          [Goal, In, Out]),
  912    option(timeout(TMO), Options, 60),
  913    set_stream(In, timeout(TMO)),
  914    set_stream(Out, timeout(TMO)),
  915    http_wrapper(Goal, In, Out, Connection,
  916                 [ request(Request)
  917                 | Options
  918                 ]),
  919    next(Connection, Request).
  920
  921next(Connection, Request) :-
  922    next_(Connection, Request), !.
  923next(Connection, Request) :-
  924    print_message(warning, goal_failed(next(Connection,Request))).
  925
  926next_(switch_protocol(SwitchGoal, _SwitchOptions), Request) :-
  927    !,
  928    memberchk(pool(client(_Queue, _Goal, In, Out)), Request),
  929    (   catch(call(SwitchGoal, In, Out), E,
  930              (   print_message(error, E),
  931                  fail))
  932    ->  true
  933    ;   http_close_connection(Request)
  934    ).
  935next_(spawned(ThreadId), _) :-
  936    !,
  937    debug(http(spawn), 'Handler spawned to thread ~w', [ThreadId]).
  938next_(Connection, Request) :-
  939    downcase_atom(Connection, 'keep-alive'),
  940    http_requeue(Request),
  941    !.
  942next_(_, Request) :-
  943    http_close_connection(Request).
 http_close_connection(+Request)
Close connection associated to Request. See also http_requeue/1.
  950http_close_connection(Request) :-
  951    memberchk(pool(client(_Queue, _Goal, In, Out)), Request),
  952    memberchk(peer(Peer), Request),
  953    close_connection(Peer, In, Out).
 close_connection(+Peer, +In, +Out)
Closes the connection from the server to the client. Errors are currently silently ignored.
  960close_connection(Peer, In, Out) :-
  961    debug(http(connection), 'Closing connection from ~p', [Peer]),
  962    catch(close(In, [force(true)]), _, true),
  963    catch(close(Out, [force(true)]), _, true).
 http_spawn(:Goal, +Options) is det
Continue this connection on a new thread. A handler may call http_spawn/2 to start a new thread that continues processing the current request using Goal. The original thread returns to the worker pool for processing new requests. Options are passed to thread_create/3, except for:
pool(+Pool)
Interfaces to library(thread_pool), starting the thread on the given pool.

If a pool does not exist, this predicate calls the multifile hook http:create_pool/1 to create it. If this predicate succeeds the operation is retried.

  981http_spawn(Goal, Options) :-
  982    select_option(pool(Pool), Options, ThreadOptions),
  983    !,
  984    current_output(CGI),
  985    catch(thread_create_in_pool(Pool,
  986                                wrap_spawned(CGI, Goal), Id,
  987                                [ detached(true)
  988                                | ThreadOptions
  989                                ]),
  990          Error,
  991          true),
  992    (   var(Error)
  993    ->  http_spawned(Id)
  994    ;   Error = error(resource_error(threads_in_pool(_)), _)
  995    ->  throw(http_reply(busy))
  996    ;   Error = error(existence_error(thread_pool, Pool), _),
  997        create_pool(Pool)
  998    ->  http_spawn(Goal, Options)
  999    ;   throw(Error)
 1000    ).
 1001http_spawn(Goal, Options) :-
 1002    current_output(CGI),
 1003    thread_create(wrap_spawned(CGI, Goal), Id,
 1004                  [ detached(true)
 1005                  | Options
 1006                  ]),
 1007    http_spawned(Id).
 1008
 1009wrap_spawned(CGI, Goal) :-
 1010    set_output(CGI),
 1011    http_wrap_spawned(Goal, Request, Connection),
 1012    next(Connection, Request).
 create_pool(+Pool)
Lazy creation of worker-pools for the HTTP server. This predicate calls the hook http:create_pool/1. If the hook fails it creates a default pool of size 10. This should suffice most typical usecases. Note that we get a permission error if the pool is already created. We can ignore this.
 1022create_pool(Pool) :-
 1023    E = error(permission_error(create, thread_pool, Pool), _),
 1024    catch(http:create_pool(Pool), E, true).
 1025create_pool(Pool) :-
 1026    print_message(informational, httpd(created_pool(Pool))),
 1027    thread_pool_create(Pool, 10, []).
 1028
 1029
 1030		 /*******************************
 1031		 *         WAIT POLICIES	*
 1032		 *******************************/
 1033
 1034:- meta_predicate
 1035    thread_repeat_wait(0).
 thread_repeat_wait(:Goal) is multi
Acts as repeat, thread_idle(Goal), choosing whether to use a long or short idle time based on the average firing rate.
 1042thread_repeat_wait(Goal) :-
 1043    new_rate_mma(5, 1000, State),
 1044    repeat,
 1045      update_rate_mma(State, MMA),
 1046      long(MMA, IsLong),
 1047      (   IsLong == brief
 1048      ->  call(Goal)
 1049      ;   thread_idle(Goal, IsLong)
 1050      ).
 1051
 1052long(MMA, brief) :-
 1053    MMA < 0.05,
 1054    !.
 1055long(MMA, short) :-
 1056    MMA < 1,
 1057    !.
 1058long(_, long).
 new_rate_mma(+N, +Resolution, -State) is det
 update_rate_mma(!State, -MMA) is det
Implement Modified Moving Average computing the average time between requests as an exponential moving averate with alpha=1/N.
Arguments:
Resolution- is the time resolution in 1/Resolution seconds. All storage is done in integers to avoid the need for stack freezing in nb_setarg/3.
See also
- https://en.wikipedia.org/wiki/Moving_average
 1072new_rate_mma(N, Resolution, rstate(Base, 0, MaxI, Resolution, N, 0)) :-
 1073    current_prolog_flag(max_tagged_integer, MaxI),
 1074    get_time(Base).
 1075
 1076update_rate_mma(State, MMAr) :-
 1077    State = rstate(Base, Last, MaxI, Resolution, N, MMA0),
 1078    get_time(Now),
 1079    Stamp is round((Now-Base)*Resolution),
 1080    (   Stamp > MaxI
 1081    ->  nb_setarg(1, State, Now),
 1082        nb_setarg(2, State, 0)
 1083    ;   true
 1084    ),
 1085    Diff is Stamp-Last,
 1086    nb_setarg(2, State, Stamp),
 1087    MMA is round(((N-1)*MMA0+Diff)/N),
 1088    nb_setarg(6, State, MMA),
 1089    MMAr is MMA/float(Resolution).
 1090
 1091
 1092                 /*******************************
 1093                 *            MESSAGES          *
 1094                 *******************************/
 1095
 1096:- multifile
 1097    prolog:message/3. 1098
 1099prolog:message(httpd_started_server(Port, Options)) -->
 1100    [ 'Started server at '-[] ],
 1101    http_root(Port, Options).
 1102prolog:message(httpd_stopped_worker(Self, Status)) -->
 1103    [ 'Stopped worker ~p: ~p'-[Self, Status] ].
 1104prolog:message(httpd_restarted_worker(Self)) -->
 1105    [ 'Replaced aborted worker ~p'-[Self] ].
 1106prolog:message(httpd(created_pool(Pool))) -->
 1107    [ 'Created thread-pool ~p of size 10'-[Pool], nl,
 1108      'Create this pool at startup-time or define the hook ', nl,
 1109      'http:create_pool/1 to avoid this message and create a ', nl,
 1110      'pool that fits the usage-profile.'
 1111    ].
 1112
 1113http_root(Address, Options) -->
 1114    { landing_page(Address, URI, Options) },
 1115    [ '~w'-[URI] ].
 1116
 1117landing_page(Host:Port, URI, Options) :-
 1118    !,
 1119    must_be(atom, Host),
 1120    must_be(integer, Port),
 1121    http_server_property(Port, scheme(Scheme)),
 1122    (   default_port(Scheme, Port)
 1123    ->  format(atom(Base), '~w://~w', [Scheme, Host])
 1124    ;   format(atom(Base), '~w://~w:~w', [Scheme, Host, Port])
 1125    ),
 1126    entry_page(Base, URI, Options).
 1127landing_page(unix_socket(Path), URI, _Options) :-
 1128    !,
 1129    format(string(URI), 'Unix domain socket "~w"', [Path]).
 1130landing_page(Port, URI, Options) :-
 1131    landing_page(localhost:Port, URI, Options).
 1132
 1133default_port(http, 80).
 1134default_port(https, 443).
 1135
 1136entry_page(Base, URI, Options) :-
 1137    option(entry_page(Entry), Options),
 1138    !,
 1139    uri_resolve(Entry, Base, URI).
 1140entry_page(Base, URI, _) :-
 1141    http_absolute_location(root(.), Entry, []),
 1142    uri_resolve(Entry, Base, URI)