View source with formatted comments or as raw
    1/*  Part of SWI-Prolog
    2
    3    Author:        Jan Wielemaker
    4    E-mail:        J.Wielemaker@vu.nl
    5    WWW:           http://www.swi-prolog.org
    6    Copyright (c)  2002-2024, University of Amsterdam
    7                              VU University Amsterdam
    8                              CWI, Amsterdam
    9                              SWI-Prolog Solutions b.v.
   10    All rights reserved.
   11
   12    Redistribution and use in source and binary forms, with or without
   13    modification, are permitted provided that the following conditions
   14    are met:
   15
   16    1. Redistributions of source code must retain the above copyright
   17       notice, this list of conditions and the following disclaimer.
   18
   19    2. Redistributions in binary form must reproduce the above copyright
   20       notice, this list of conditions and the following disclaimer in
   21       the documentation and/or other materials provided with the
   22       distribution.
   23
   24    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
   25    "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
   26    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
   27    FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
   28    COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
   29    INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
   30    BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
   31    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
   32    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   33    LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
   34    ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
   35    POSSIBILITY OF SUCH DAMAGE.
   36*/
   37
   38:- module(thread_httpd,
   39          [ http_current_server/2,      % ?:Goal, ?Port
   40            http_server_property/2,     % ?Port, ?Property
   41            http_server/2,              % :Goal, +Options
   42            http_workers/2,             % +Port, ?WorkerCount
   43            http_add_worker/2,          % +Port, +Options
   44            http_current_worker/2,      % ?Port, ?ThreadID
   45            http_stop_server/2,         % +Port, +Options
   46            http_spawn/2,               % :Goal, +Options
   47
   48            http_requeue/1,             % +Request
   49            http_close_connection/1,    % +Request
   50            http_enough_workers/3       % +Queue, +Why, +Peer
   51          ]).   52:- use_module(library(debug)).   53:- use_module(library(error)).   54:- use_module(library(option)).   55:- use_module(library(socket)).   56:- use_module(library(thread_pool)).   57:- use_module(library(gensym)).   58:- use_module(http_wrapper).   59:- use_module(http_path).   60
   61:- autoload(library(uri), [uri_resolve/3]).   62:- autoload(library(aggregate), [aggregate_all/3]).   63
   64:- predicate_options(http_server/2, 2,
   65                     [ port(any),
   66                       unix_socket(atom),
   67                       entry_page(atom),
   68                       tcp_socket(any),
   69                       workers(positive_integer),
   70                       timeout(number),
   71                       keep_alive_timeout(number),
   72                       silent(boolean),
   73                       ssl(list(any)),  % if http/http_ssl_plugin is loaded
   74                       pass_to(system:thread_create/3, 3)
   75                     ]).   76:- predicate_options(http_spawn/2, 2,
   77                     [ pool(atom),
   78                       pass_to(system:thread_create/3, 3),
   79                       pass_to(thread_pool:thread_create_in_pool/4, 4)
   80                     ]).   81:- predicate_options(http_add_worker/2, 2,
   82                     [ timeout(number),
   83                       keep_alive_timeout(number),
   84                       max_idle_time(number),
   85                       pass_to(system:thread_create/3, 3)
   86                     ]).   87
   88/** <module> Threaded HTTP server
   89
   90Most   code   doesn't   need  to   use  this   directly;  instead   use
   91library(http/http_server),  which  combines   this  library  with   the
   92typical HTTP libraries that most servers need.
   93
   94This library defines the HTTP server  frontend of choice for SWI-Prolog.
   95It is based on the multi-threading   capabilities of SWI-Prolog and thus
   96exploits multiple cores  to  serve   requests  concurrently.  The server
   97scales well and can cooperate with   library(thread_pool) to control the
   98number of concurrent requests of a given   type.  For example, it can be
   99configured to handle 200 file download requests concurrently, 2 requests
  100that potentially uses a lot of memory and   8 requests that use a lot of
  101CPU resources.
  102
  103On   Unix   systems,    this    library     can    be    combined   with
  104library(http/http_unix_daemon) to realise a proper  Unix service process
  105that creates a web server at  port   80,  runs under a specific account,
  106optionally detaches from the controlling terminal, etc.
  107
  108Combined with library(http/http_ssl_plugin) from the   SSL package, this
  109library   can   be   used   to    create     an    HTTPS   server.   See
  110<plbase>/doc/packages/examples/ssl/https for an example   server using a
  111self-signed SSL certificate.
  112*/
  113
  114:- meta_predicate
  115    http_server(1, :),
  116    http_current_server(1, ?),
  117    http_spawn(0, +).  118
  119:- dynamic
  120    current_server/6,       % Port, Goal, Thread, Queue, Scheme, StartTime
  121    queue_worker/2,         % Queue, ThreadID
  122    queue_options/2.        % Queue, Options
  123
  124:- multifile
  125    make_socket_hook/3,
  126    accept_hook/2,
  127    close_hook/1,
  128    open_client_hook/6,
  129    discard_client_hook/1,
  130    http:create_pool/1,
  131    http:schedule_workers/1.  132
  133:- meta_predicate
  134    thread_repeat_wait(0).  135
  136%!  http_server(:Goal, :Options) is det.
  137%
  138%   Create a server at Port that calls Goal for each parsed request.
  139%   Options provide a list of options. Defined options are
  140%
  141%     * port(?Address)
  142%     Port to bind to.  Address is either a port or a term
  143%     Host:Port. The port may be a variable, causing the system
  144%     to select a free port.  See tcp_bind/2.
  145%
  146%     * unix_socket(+Path)
  147%     Instead of binding to a TCP port, bind to a _Unix Domain
  148%     Socket_ at Path.
  149%
  150%     * entry_page(+URI)
  151%     Affects the message printed while the server is started.
  152%     Interpreted as a URI relative to the server root.
  153%
  154%     * tcp_socket(+Socket)
  155%     If provided, use this socket instead of the creating one and
  156%     binding it to an address.  The socket must be bound to an
  157%     address.  Note that this also allows binding an HTTP server to
  158%     a Unix domain socket (``AF_UNIX``).  See socket_create/2.
  159%
  160%     * workers(+Count)
  161%     Determine the number of worker threads.  Default is 5.  This
  162%     is fine for small scale usage.  Public servers typically need
  163%     a higher number.
  164%
  165%     * timeout(+Seconds)
  166%     Maximum time of inactivity trying to read the request after a
  167%     connection has been opened.  Default is 60 seconds.  See
  168%     set_stream/1 using the _timeout_ option.
  169%
  170%     * keep_alive_timeout(+Seconds)
  171%     Time to keep `Keep alive' connections alive.  Default is
  172%     2 seconds.
  173%
  174%     * stack_limit(+Bytes)
  175%     Stack limit to use for the workers.  The default is inherited
  176%     from the `main` thread.
  177%     If you need to control resource usage you may consider the
  178%     `spawn` option of http_handler/3 and library(thread_pool).
  179%
  180%     * silent(Bool)
  181%     If `true` (default `false`), do not print an informational
  182%     message that the server was started.
  183%
  184%   A  typical  initialization  for  an    HTTP   server  that  uses
  185%   http_dispatch/1 to relay requests to predicates is:
  186%
  187%     ==
  188%     :- use_module(library(http/thread_httpd)).
  189%     :- use_module(library(http/http_dispatch)).
  190%
  191%     start_server(Port) :-
  192%         http_server(http_dispatch, [port(Port)]).
  193%     ==
  194%
  195%   Note that multiple servers  can  coexist   in  the  same  Prolog
  196%   process. A notable application of this is   to have both an HTTP
  197%   and HTTPS server, where the HTTP   server redirects to the HTTPS
  198%   server for handling sensitive requests.
  199
  200http_server(Goal, M:Options0) :-
  201    server_address(Address, Options0),
  202    !,
  203    make_socket(Address, M:Options0, Options),
  204    create_workers(Options),
  205    create_server(Goal, Address, Options),
  206    (   option(silent(true), Options0)
  207    ->  true
  208    ;   print_message(informational,
  209                      httpd_started_server(Address, Options0))
  210    ).
  211http_server(_Goal, _:Options0) :-
  212    existence_error(server_address, Options0).
  213
  214server_address(Address, Options) :-
  215    (   option(port(Port), Options)
  216    ->  Address = Port
  217    ;   option(unix_socket(Path), Options)
  218    ->  Address = unix_socket(Path)
  219    ).
  220
  221address_port(_IFace:Port, Port) :- !.
  222address_port(unix_socket(Path), Path) :- !.
  223address_port(Address, Address) :- !.
  224
  225tcp_address(Port) :-
  226    var(Port),
  227    !.
  228tcp_address(Port) :-
  229    integer(Port),
  230    !.
  231tcp_address(_Iface:_Port).
  232
  233address_domain(localhost:_Port, Domain) =>
  234    Domain = inet.
  235address_domain(Iface:_Port, Domain) =>
  236    (   catch(ip_name(IP, Iface), error(_,_), fail),
  237        functor(IP, ip, 8)
  238    ->  Domain = inet6
  239    ;   Domain = inet
  240    ).
  241address_domain(_, Domain) =>
  242    Domain = inet.
  243
  244
  245%!  make_socket(+Address, :OptionsIn, -OptionsOut) is det.
  246%
  247%   Create the HTTP server socket and  worker pool queue. OptionsOut
  248%   is quaranteed to hold the option queue(QueueId).
  249%
  250%   @arg   OptionsIn   is   qualified   to     allow   passing   the
  251%   module-sensitive ssl option argument.
  252
  253make_socket(Address, M:Options0, Options) :-
  254    tcp_address(Address),
  255    make_socket_hook(Address, M:Options0, Options),
  256    !.
  257make_socket(Address, _:Options0, Options) :-
  258    option(tcp_socket(_), Options0),
  259    !,
  260    make_addr_atom('httpd', Address, Queue),
  261    Options = [ queue(Queue)
  262              | Options0
  263              ].
  264make_socket(Address, _:Options0, Options) :-
  265    tcp_address(Address),
  266    !,
  267    address_domain(Address, Domain),
  268    socket_create(Socket, [domain(Domain)]),
  269    tcp_setopt(Socket, reuseaddr),
  270    tcp_bind(Socket, Address),
  271    tcp_listen(Socket, 64),
  272    make_addr_atom('httpd', Address, Queue),
  273    Options = [ queue(Queue),
  274                tcp_socket(Socket)
  275              | Options0
  276              ].
  277:- if(current_predicate(unix_domain_socket/1)).  278make_socket(Address, _:Options0, Options) :-
  279    Address = unix_socket(Path),
  280    !,
  281    unix_domain_socket(Socket),
  282    tcp_bind(Socket, Path),
  283    tcp_listen(Socket, 64),
  284    make_addr_atom('httpd', Address, Queue),
  285    Options = [ queue(Queue),
  286                tcp_socket(Socket)
  287              | Options0
  288              ].
  289:- endif.  290
  291%!  make_addr_atom(+Scheme, +Address, -Atom) is det.
  292%
  293%   Create an atom that identifies  the   server's  queue and thread
  294%   resources.
  295
  296make_addr_atom(Scheme, Address, Atom) :-
  297    phrase(address_parts(Address), Parts),
  298    atomic_list_concat([Scheme,@|Parts], Atom).
  299
  300address_parts(Var) -->
  301    { var(Var),
  302      !,
  303      instantiation_error(Var)
  304    }.
  305address_parts(Atomic) -->
  306    { atomic(Atomic) },
  307    !,
  308    [Atomic].
  309address_parts(Host:Port) -->
  310    !,
  311    address_parts(Host), [:], address_parts(Port).
  312address_parts(ip(A,B,C,D)) -->
  313    !,
  314    [ A, '.', B, '.', C, '.', D ].
  315address_parts(unix_socket(Path)) -->
  316    [Path].
  317address_parts(Address) -->
  318    { domain_error(http_server_address, Address) }.
  319
  320
  321%!  create_server(:Goal, +Address, +Options) is det.
  322%
  323%   Create the main server thread that runs accept_server/2 to
  324%   listen to new requests.
  325
  326create_server(Goal, Address, Options) :-
  327    get_time(StartTime),
  328    memberchk(queue(Queue), Options),
  329    scheme(Scheme, Options),
  330    autoload_https(Scheme),
  331    address_port(Address, Port),
  332    make_addr_atom(Scheme, Port, Alias),
  333    thread_self(Initiator),
  334    thread_create(accept_server(Goal, Initiator, Options), _,
  335                  [ alias(Alias)
  336                  ]),
  337    thread_get_message(server_started),
  338    assert(current_server(Port, Goal, Alias, Queue, Scheme, StartTime)).
  339
  340scheme(Scheme, Options) :-
  341    option(scheme(Scheme), Options),
  342    !.
  343scheme(Scheme, Options) :-
  344    (   option(ssl(_), Options)
  345    ;   option(ssl_instance(_), Options)
  346    ),
  347    !,
  348    Scheme = https.
  349scheme(http, _).
  350
  351autoload_https(https) :-
  352    \+ clause(accept_hook(_Goal, _Options), _),
  353    exists_source(library(http/http_ssl_plugin)),
  354    !,
  355    use_module(library(http/http_ssl_plugin)).
  356autoload_https(_).
  357
  358%!  http_current_server(:Goal, ?Port) is nondet.
  359%
  360%   True if Goal is the goal of a server at Port.
  361%
  362%   @deprecated Use http_server_property(Port, goal(Goal))
  363
  364http_current_server(Goal, Port) :-
  365    current_server(Port, Goal, _, _, _, _).
  366
  367
  368%!  http_server_property(?Port, ?Property) is nondet.
  369%
  370%   True if Property is a property of the HTTP server running at
  371%   Port.  Defined properties are:
  372%
  373%       * goal(:Goal)
  374%       Goal used to start the server. This is often
  375%       http_dispatch/1.
  376%       * scheme(-Scheme)
  377%       Scheme is one of `http` or `https`.
  378%       * start_time(?Time)
  379%       Time-stamp when the server was created.
  380
  381http_server_property(_:Port, Property) :-
  382    integer(Port),
  383    !,
  384    server_property(Property, Port).
  385http_server_property(Port, Property) :-
  386    server_property(Property, Port).
  387
  388server_property(goal(Goal), Port) :-
  389    current_server(Port, Goal, _, _, _, _).
  390server_property(scheme(Scheme), Port) :-
  391    current_server(Port, _, _, _, Scheme, _).
  392server_property(start_time(Time), Port) :-
  393    current_server(Port, _, _, _, _, Time).
  394
  395
  396%!  http_workers(?Port, -Workers) is nondet.
  397%!  http_workers(+Port, +Workers:int) is det.
  398%
  399%   Query or set the number of workers for  the server at this port. The
  400%   number of workers is dynamically modified. Setting it to 1 (one) can
  401%   be used to profile the worker using tprofile/1.
  402%
  403%   @see library(http/http_dyn_workers) implements dynamic management of
  404%   the worker pool depending on usage.
  405
  406http_workers(Port, Workers) :-
  407    integer(Workers),
  408    !,
  409    must_be(ground, Port),
  410    (   current_server(Port, _, _, Queue, _, _)
  411    ->  resize_pool(Queue, Workers)
  412    ;   existence_error(http_server, Port)
  413    ).
  414http_workers(Port, Workers) :-
  415    current_server(Port, _, _, Queue, _, _),
  416    aggregate_all(count, queue_worker(Queue, _Worker), Workers).
  417
  418
  419%!  http_add_worker(+Port, +Options) is det.
  420%
  421%   Add a new worker to  the  HTTP   server  for  port Port. Options
  422%   overrule the default queue  options.   The  following additional
  423%   options are processed:
  424%
  425%     - max_idle_time(+Seconds)
  426%     The created worker will automatically terminate if there is
  427%     no new work within Seconds.
  428
  429http_add_worker(Port, Options) :-
  430    must_be(ground, Port),
  431    current_server(Port, _, _, Queue, _, _),
  432    !,
  433    queue_options(Queue, QueueOptions),
  434    merge_options(Options, QueueOptions, WorkerOptions),
  435    atom_concat(Queue, '_', AliasBase),
  436    create_workers(1, 1, Queue, AliasBase, WorkerOptions).
  437http_add_worker(Port, _) :-
  438    existence_error(http_server, Port).
  439
  440
  441%!  http_current_worker(?Port, ?ThreadID) is nondet.
  442%
  443%   True if ThreadID is the identifier   of  a Prolog thread serving
  444%   Port. This predicate is  motivated  to   allow  for  the  use of
  445%   arbitrary interaction with the worker thread for development and
  446%   statistics.
  447
  448http_current_worker(Port, ThreadID) :-
  449    current_server(Port, _, _, Queue, _, _),
  450    queue_worker(Queue, ThreadID).
  451
  452
  453%!  accept_server(:Goal, +Initiator, +Options)
  454%
  455%   The goal of a small server-thread accepting new requests and
  456%   posting them to the queue of workers.
  457
  458accept_server(Goal, Initiator, Options) :-
  459    Ex = http_stop(Stopper),
  460    catch(accept_server2(Goal, Initiator, Options), Ex, true),
  461    thread_self(Thread),
  462    debug(http(stop), '[~p]: accept server received ~p', [Thread, Ex]),
  463    retract(current_server(_Port, _, Thread, Queue, _Scheme, _StartTime)),
  464    close_pending_accepts(Queue),
  465    close_server_socket(Options),
  466    thread_send_message(Stopper, http_stopped).
  467
  468accept_server2(Goal, Initiator, Options) :-
  469    thread_send_message(Initiator, server_started),
  470    repeat,
  471      (   catch(accept_server3(Goal, Options), E, true)
  472      ->  (   var(E)
  473          ->  fail
  474          ;   accept_rethrow_error(E)
  475          ->  throw(E)
  476          ;   print_message(error, E),
  477              fail
  478          )
  479      ;   print_message(error,      % internal error
  480                        goal_failed(accept_server3(Goal, Options))),
  481          fail
  482      ).
  483
  484accept_server3(Goal, Options) :-
  485    accept_hook(Goal, Options),
  486    !.
  487accept_server3(Goal, Options) :-
  488    memberchk(tcp_socket(Socket), Options),
  489    memberchk(queue(Queue), Options),
  490    debug(http(connection), 'Waiting for connection', []),
  491    tcp_accept(Socket, Client, Peer),
  492    sig_atomic(send_to_worker(Queue, Client, Goal, Peer)),
  493    http_enough_workers(Queue, accept, Peer).
  494
  495send_to_worker(Queue, Client, Goal, Peer) :-
  496    debug(http(connection), 'New HTTP connection from ~p', [Peer]),
  497    thread_send_message(Queue, tcp_client(Client, Goal, Peer)).
  498
  499accept_rethrow_error(http_stop(_)).
  500
  501%!  close_server_socket(+Options)
  502%
  503%   Close the server socket.
  504
  505close_server_socket(Options) :-
  506    close_hook(Options),
  507    !.
  508close_server_socket(Options) :-
  509    memberchk(tcp_socket(Socket), Options),
  510    !,
  511    tcp_close_socket(Socket).
  512
  513%!  close_pending_accepts(+Queue)
  514
  515close_pending_accepts(Queue) :-
  516    (   thread_get_message(Queue, Msg, [timeout(0)])
  517    ->  close_client(Msg),
  518        close_pending_accepts(Queue)
  519    ;   true
  520    ).
  521
  522close_client(tcp_client(Client, _Goal, _0Peer)) =>
  523    debug(http(stop), 'Closing connection from ~p during shut-down', [_0Peer]),
  524    tcp_close_socket(Client).
  525close_client(Msg) =>
  526    (   discard_client_hook(Msg)
  527    ->  true
  528    ;   print_message(warning, http_close_client(Msg))
  529    ).
  530
  531
  532%!  http_stop_server(+Port, +Options)
  533%
  534%   Stop the indicated  HTTP  server   gracefully.  First  stops all
  535%   workers, then stops the server.
  536%
  537%   @tbd    Realise non-graceful stop
  538
  539http_stop_server(Host:Port, Options) :-         % e.g., localhost:4000
  540    ground(Host),
  541    !,
  542    http_stop_server(Port, Options).
  543http_stop_server(Port, _Options) :-
  544    http_workers(Port, 0),                  % checks Port is ground
  545    current_server(Port, _, Thread, Queue, _Scheme, _Start),
  546    retractall(queue_options(Queue, _)),
  547    debug(http(stop), 'Signalling HTTP server thread ~p to stop', [Thread]),
  548    thread_self(Stopper),
  549    thread_signal(Thread, throw(http_stop(Stopper))),
  550    (   thread_get_message(Stopper, http_stopped, [timeout(0.1)])
  551    ->  true
  552    ;   catch(connect(localhost:Port), _, true)
  553    ),
  554    thread_join(Thread, _0Status),
  555    debug(http(stop), 'Joined HTTP server thread ~p (~p)', [Thread, _0Status]),
  556    message_queue_destroy(Queue).
  557
  558connect(Address) :-
  559    setup_call_cleanup(
  560        tcp_socket(Socket),
  561        tcp_connect(Socket, Address),
  562        tcp_close_socket(Socket)).
  563
  564%!  http_enough_workers(+Queue, +Why, +Peer) is det.
  565%
  566%   Check that we have enough workers in our queue. If not, call the
  567%   hook http:schedule_workers/1 to extend  the   worker  pool. This
  568%   predicate can be used by accept_hook/2.
  569
  570http_enough_workers(Queue, _Why, _Peer) :-
  571    message_queue_property(Queue, waiting(_0)),
  572    !,
  573    debug(http(scheduler), '~D waiting for work; ok', [_0]).
  574http_enough_workers(Queue, Why, Peer) :-
  575    message_queue_property(Queue, size(Size)),
  576    (   enough(Size, Why)
  577    ->  debug(http(scheduler), '~D in queue; ok', [Size])
  578    ;   current_server(Port, _, _, Queue, _, _),
  579        Data = _{ port:Port,
  580                  reason:Why,
  581                  peer:Peer,
  582                  waiting:Size,
  583                  queue:Queue
  584                },
  585        debug(http(scheduler), 'Asking to reschedule: ~p', [Data]),
  586        catch(http:schedule_workers(Data),
  587              Error,
  588              print_message(error, Error))
  589    ->  true
  590    ;   true
  591    ).
  592
  593enough(0, _).
  594enough(1, keep_alive).                  % I will be ready myself
  595
  596
  597%!  http:schedule_workers(+Data:dict) is semidet.
  598%
  599%   Hook called if a  new  connection   or  a  keep-alive connection
  600%   cannot be scheduled _immediately_ to a worker. Dict contains the
  601%   following keys:
  602%
  603%     - port:Port
  604%     Port number that identifies the server.
  605%     - reason:Reason
  606%     One of =accept= for a new connection or =keep_alive= if a
  607%     worker tries to reschedule itself.
  608%     - peer:Peer
  609%     Identify the other end of the connection
  610%     - waiting:Size
  611%     Number of messages waiting in the queue.
  612%     - queue:Queue
  613%     Message queue used to dispatch accepted messages.
  614%
  615%   Note that, when called with `reason:accept`,   we  are called in
  616%   the time critical main accept loop.   An  implementation of this
  617%   hook shall typically send  the  event   to  thread  dedicated to
  618%   dynamic worker-pool management.
  619%
  620%   @see    http_add_worker/2 may be used to create (temporary) extra
  621%           workers.
  622
  623
  624                 /*******************************
  625                 *    WORKER QUEUE OPERATIONS   *
  626                 *******************************/
  627
  628%!  create_workers(+Options)
  629%
  630%   Create the pool of HTTP worker-threads. Each worker has the
  631%   alias http_worker_N.
  632
  633create_workers(Options) :-
  634    option(workers(N), Options, 5),
  635    option(queue(Queue), Options),
  636    catch(message_queue_create(Queue), _, true),
  637    atom_concat(Queue, '_', AliasBase),
  638    create_workers(1, N, Queue, AliasBase, Options),
  639    assert(queue_options(Queue, Options)).
  640
  641create_workers(I, N, _, _, _) :-
  642    I > N,
  643    !.
  644create_workers(I, N, Queue, AliasBase, Options) :-
  645    gensym(AliasBase, Alias),
  646    thread_create(http_worker(Options), Id,
  647                  [ alias(Alias)
  648                  | Options
  649                  ]),
  650    assertz(queue_worker(Queue, Id)),
  651    I2 is I + 1,
  652    create_workers(I2, N, Queue, AliasBase, Options).
  653
  654
  655%!  resize_pool(+Queue, +Workers) is det.
  656%
  657%   Create or destroy workers. If workers   are  destroyed, the call
  658%   waits until the desired number of waiters is reached.
  659
  660resize_pool(Queue, Size) :-
  661    findall(W, queue_worker(Queue, W), Workers),
  662    length(Workers, Now),
  663    (   Now < Size
  664    ->  queue_options(Queue, Options),
  665        atom_concat(Queue, '_', AliasBase),
  666        I0 is Now+1,
  667        create_workers(I0, Size, Queue, AliasBase, Options)
  668    ;   Now == Size
  669    ->  true
  670    ;   Now > Size
  671    ->  Excess is Now - Size,
  672        thread_self(Me),
  673        forall(between(1, Excess, _), thread_send_message(Queue, quit(Me))),
  674        forall(between(1, Excess, _), thread_get_message(quitted(_)))
  675    ).
  676
  677
  678%!  http_worker(+Options)
  679%
  680%   Run HTTP worker main loop. Workers   simply  wait until they are
  681%   passed an accepted socket to process  a client.
  682%
  683%   If the message quit(Sender) is read   from the queue, the worker
  684%   stops.
  685
  686http_worker(Options) :-
  687    debug(http(scheduler), 'New worker', []),
  688    prolog_listen(this_thread_exit, done_worker),
  689    option(queue(Queue), Options),
  690    option(max_idle_time(MaxIdle), Options, infinite),
  691    thread_repeat_wait(get_work(Queue, Message, MaxIdle)),
  692      debug(http(worker), 'Waiting for a job ...', []),
  693      debug(http(worker), 'Got job ~p', [Message]),
  694      (   Message = quit(Sender)
  695      ->  !,
  696          thread_self(Self),
  697          thread_detach(Self),
  698          (   Sender == idle
  699          ->  true
  700          ;   retract(queue_worker(Queue, Self)),
  701              thread_send_message(Sender, quitted(Self))
  702          )
  703      ;   open_client(Message, Queue, Goal, In, Out,
  704                      Options, ClientOptions),
  705          (   catch(http_process(Goal, In, Out, ClientOptions),
  706                    Error, true)
  707          ->  true
  708          ;   Error = goal_failed(http_process/4)
  709          ),
  710          (   var(Error)
  711          ->  fail
  712          ;   current_message_level(Error, Level),
  713              print_message(Level, Error),
  714              memberchk(peer(Peer), ClientOptions),
  715              close_connection(Peer, In, Out),
  716              fail
  717          )
  718      ).
  719
  720get_work(Queue, Message, infinite) :-
  721    !,
  722    thread_get_message(Queue, Message).
  723get_work(Queue, Message, MaxIdle) :-
  724    (   thread_get_message(Queue, Message, [timeout(MaxIdle)])
  725    ->  true
  726    ;   Message = quit(idle)
  727    ).
  728
  729
  730%!  open_client(+Message, +Queue, -Goal, -In, -Out,
  731%!              +Options, -ClientOptions) is semidet.
  732%
  733%   Opens the connection to the client in a worker from the message
  734%   sent to the queue by accept_server/2.
  735
  736open_client(requeue(In, Out, Goal, ClOpts),
  737            _, Goal, In, Out, Opts, ClOpts) :-
  738    !,
  739    memberchk(peer(Peer), ClOpts),
  740    option(keep_alive_timeout(KeepAliveTMO), Opts, 2),
  741    check_keep_alive_connection(In, KeepAliveTMO, Peer, In, Out).
  742open_client(Message, Queue, Goal, In, Out, Opts,
  743            [ pool(client(Queue, Goal, In, Out)),
  744              timeout(Timeout)
  745            | Options
  746            ]) :-
  747    catch(open_client(Message, Goal, In, Out, Options, Opts),
  748          E, report_error(E)),
  749    option(timeout(Timeout), Opts, 60),
  750    (   debugging(http(connection))
  751    ->  memberchk(peer(Peer), Options),
  752        debug(http(connection), 'Opened connection from ~p', [Peer])
  753    ;   true
  754    ).
  755
  756
  757%!  open_client(+Message, +Goal, -In, -Out,
  758%!              -ClientOptions, +Options) is det.
  759
  760open_client(Message, Goal, In, Out, ClientOptions, Options) :-
  761    open_client_hook(Message, Goal, In, Out, ClientOptions, Options),
  762    !.
  763open_client(tcp_client(Socket, Goal, Peer), Goal, In, Out,
  764            [ peer(Peer),
  765              protocol(http)
  766            ], _) :-
  767    tcp_open_socket(Socket, In, Out).
  768
  769report_error(E) :-
  770    print_message(error, E),
  771    fail.
  772
  773
  774%!  check_keep_alive_connection(+In, +TimeOut, +Peer, +In, +Out) is semidet.
  775%
  776%   Wait for the client for at most  TimeOut seconds. Succeed if the
  777%   client starts a new request within   this  time. Otherwise close
  778%   the connection and fail.
  779
  780check_keep_alive_connection(In, TMO, Peer, In, Out) :-
  781    stream_property(In, timeout(Old)),
  782    set_stream(In, timeout(TMO)),
  783    debug(http(keep_alive), 'Waiting for keep-alive ...', []),
  784    catch(peek_code(In, Code), E, true),
  785    (   var(E),                     % no exception
  786        Code \== -1                 % no end-of-file
  787    ->  set_stream(In, timeout(Old)),
  788        debug(http(keep_alive), '\tre-using keep-alive connection', [])
  789    ;   (   Code == -1
  790        ->  debug(http(keep_alive), '\tRemote closed keep-alive connection', [])
  791        ;   debug(http(keep_alive), '\tTimeout on keep-alive connection', [])
  792        ),
  793        close_connection(Peer, In, Out),
  794        fail
  795    ).
  796
  797
  798%!  done_worker
  799%
  800%   Called when worker is terminated  due   to  http_workers/2  or a
  801%   (debugging) exception. In  the   latter  case, recreate_worker/2
  802%   creates a new worker.
  803
  804done_worker :-
  805    thread_self(Self),
  806    thread_detach(Self),
  807    retract(queue_worker(Queue, Self)),
  808    thread_property(Self, status(Status)),
  809    !,
  810    (   catch(recreate_worker(Status, Queue), _, fail)
  811    ->  print_message(informational,
  812                      httpd_restarted_worker(Self))
  813    ;   done_status_message_level(Status, Level),
  814        print_message(Level,
  815                      httpd_stopped_worker(Self, Status))
  816    ).
  817done_worker :-                                  % received quit(Sender)
  818    thread_self(Self),
  819    thread_property(Self, status(Status)),
  820    done_status_message_level(Status, Level),
  821    print_message(Level,
  822                  httpd_stopped_worker(Self, Status)).
  823
  824done_status_message_level(true, silent) :- !.
  825done_status_message_level(exception('$aborted'), silent) :- !.
  826done_status_message_level(exception(unwind(abort)), silent) :- !.
  827done_status_message_level(exception(unwind(halt(_))), silent) :- !.
  828done_status_message_level(_, informational).
  829
  830
  831%!  recreate_worker(+Status, +Queue) is semidet.
  832%
  833%   Deal with the possibility  that   threads  are,  during development,
  834%   killed with abort/0. We recreate the worker to avoid that eventually
  835%   we run out of workers. If  we  are   aborted  due  to a halt/0 call,
  836%   thread_create/3 will raise a permission error.
  837%
  838%   The first clause deals with the possibility  that we cannot write to
  839%   `user_error`. This is possible when Prolog   is started as a service
  840%   using some service managers. Would be  nice   if  we  could write an
  841%   error, but where?
  842
  843recreate_worker(exception(error(io_error(write,user_error),_)), _Queue) :-
  844    halt(2).
  845recreate_worker(exception(Error), Queue) :-
  846    recreate_on_error(Error),
  847    queue_options(Queue, Options),
  848    atom_concat(Queue, '_', AliasBase),
  849    create_workers(1, 1, Queue, AliasBase, Options).
  850
  851recreate_on_error('$aborted').
  852recreate_on_error(unwind(abort)).
  853recreate_on_error(time_limit_exceeded).
  854
  855%!  thread_httpd:message_level(+Exception, -Level)
  856%
  857%   Determine the message stream used  for   exceptions  that  may occur
  858%   during server_loop/5. Being multifile, clauses can   be added by the
  859%   application to refine error handling.   See  also message_hook/3 for
  860%   further programming error handling.
  861
  862:- multifile
  863    message_level/2.  864
  865message_level(error(io_error(read, _), _),               silent).
  866message_level(error(socket_error(epipe,_), _),           silent).
  867message_level(error(http_write_short(_Obj,_Written), _), silent).
  868message_level(error(timeout_error(read, _), _),          informational).
  869message_level(keep_alive_timeout,                        silent).
  870
  871current_message_level(Term, Level) :-
  872    (   message_level(Term, Level)
  873    ->  true
  874    ;   Level = error
  875    ).
  876
  877
  878%!  http_requeue(+Header)
  879%
  880%   Re-queue a connection to  the  worker   pool.  This  deals  with
  881%   processing additional requests on keep-alive connections.
  882
  883http_requeue(Header) :-
  884    requeue_header(Header, ClientOptions),
  885    memberchk(pool(client(Queue, Goal, In, Out)), ClientOptions),
  886    memberchk(peer(Peer), ClientOptions),
  887    http_enough_workers(Queue, keep_alive, Peer),
  888    thread_send_message(Queue, requeue(In, Out, Goal, ClientOptions)),
  889    !.
  890http_requeue(Header) :-
  891    debug(http(error), 'Re-queue failed: ~p', [Header]),
  892    fail.
  893
  894requeue_header([], []).
  895requeue_header([H|T0], [H|T]) :-
  896    requeue_keep(H),
  897    !,
  898    requeue_header(T0, T).
  899requeue_header([_|T0], T) :-
  900    requeue_header(T0, T).
  901
  902requeue_keep(pool(_)).
  903requeue_keep(peer(_)).
  904requeue_keep(protocol(_)).
  905
  906
  907%!  http_process(Message, Queue, +Options)
  908%
  909%   Handle a single client message on the given stream.
  910
  911http_process(Goal, In, Out, Options) :-
  912    debug(http(server), 'Running server goal ~p on ~p -> ~p',
  913          [Goal, In, Out]),
  914    option(timeout(TMO), Options, 60),
  915    set_stream(In, timeout(TMO)),
  916    set_stream(Out, timeout(TMO)),
  917    http_wrapper(Goal, In, Out, Connection,
  918                 [ request(Request)
  919                 | Options
  920                 ]),
  921    next(Connection, Request).
  922
  923next(Connection, Request) :-
  924    next_(Connection, Request), !.
  925next(Connection, Request) :-
  926    print_message(warning, goal_failed(next(Connection,Request))).
  927
  928next_(switch_protocol(SwitchGoal, _SwitchOptions), Request) :-
  929    !,
  930    memberchk(pool(client(_Queue, _Goal, In, Out)), Request),
  931    (   catch(call(SwitchGoal, In, Out), E,
  932              (   print_message(error, E),
  933                  fail))
  934    ->  true
  935    ;   http_close_connection(Request)
  936    ).
  937next_(spawned(ThreadId), _) :-
  938    !,
  939    debug(http(spawn), 'Handler spawned to thread ~w', [ThreadId]).
  940next_(Connection, Request) :-
  941    downcase_atom(Connection, 'keep-alive'),
  942    http_requeue(Request),
  943    !.
  944next_(_, Request) :-
  945    http_close_connection(Request).
  946
  947
  948%!  http_close_connection(+Request)
  949%
  950%   Close connection associated to Request.  See also http_requeue/1.
  951
  952http_close_connection(Request) :-
  953    memberchk(pool(client(_Queue, _Goal, In, Out)), Request),
  954    memberchk(peer(Peer), Request),
  955    close_connection(Peer, In, Out).
  956
  957%!  close_connection(+Peer, +In, +Out)
  958%
  959%   Closes the connection from the server to the client.  Errors are
  960%   currently silently ignored.
  961
  962close_connection(Peer, In, Out) :-
  963    debug(http(connection), 'Closing connection from ~p', [Peer]),
  964    catch(close(In, [force(true)]), _, true),
  965    catch(close(Out, [force(true)]), _, true).
  966
  967%!  http_spawn(:Goal, +Options) is det.
  968%
  969%   Continue this connection on a  new   thread.  A handler may call
  970%   http_spawn/2 to start a new thread that continues processing the
  971%   current request using Goal. The original   thread returns to the
  972%   worker pool for processing new requests.   Options are passed to
  973%   thread_create/3, except for:
  974%
  975%       * pool(+Pool)
  976%       Interfaces to library(thread_pool), starting the thread
  977%       on the given pool.
  978%
  979%   If a pool does not exist, this predicate calls the multifile
  980%   hook http:create_pool/1 to create it. If this predicate succeeds
  981%   the operation is retried.
  982
  983http_spawn(Goal, Options) :-
  984    select_option(pool(Pool), Options, ThreadOptions),
  985    !,
  986    current_output(CGI),
  987    catch(thread_create_in_pool(Pool,
  988                                wrap_spawned(CGI, Goal), Id,
  989                                [ detached(true)
  990                                | ThreadOptions
  991                                ]),
  992          Error,
  993          true),
  994    (   var(Error)
  995    ->  http_spawned(Id)
  996    ;   Error = error(resource_error(threads_in_pool(_)), _)
  997    ->  throw(http_reply(busy))
  998    ;   Error = error(existence_error(thread_pool, Pool), _),
  999        create_pool(Pool)
 1000    ->  http_spawn(Goal, Options)
 1001    ;   throw(Error)
 1002    ).
 1003http_spawn(Goal, Options) :-
 1004    current_output(CGI),
 1005    thread_create(wrap_spawned(CGI, Goal), Id,
 1006                  [ detached(true)
 1007                  | Options
 1008                  ]),
 1009    http_spawned(Id).
 1010
 1011wrap_spawned(CGI, Goal) :-
 1012    set_output(CGI),
 1013    http_wrap_spawned(Goal, Request, Connection),
 1014    next(Connection, Request).
 1015
 1016%!  create_pool(+Pool)
 1017%
 1018%   Lazy  creation  of  worker-pools  for   the  HTTP  server.  This
 1019%   predicate calls the hook http:create_pool/1.   If the hook fails
 1020%   it creates a default pool of size   10. This should suffice most
 1021%   typical usecases. Note that we  get   a  permission error if the
 1022%   pool is already created.  We can ignore this.
 1023
 1024create_pool(Pool) :-
 1025    E = error(permission_error(create, thread_pool, Pool), _),
 1026    catch(http:create_pool(Pool), E, true).
 1027create_pool(Pool) :-
 1028    print_message(informational, httpd(created_pool(Pool))),
 1029    thread_pool_create(Pool, 10, []).
 1030
 1031
 1032		 /*******************************
 1033		 *         WAIT POLICIES	*
 1034		 *******************************/
 1035
 1036:- meta_predicate
 1037    thread_repeat_wait(0). 1038
 1039%!  thread_repeat_wait(:Goal) is multi.
 1040%
 1041%   Acts as `repeat,  thread_idle(Goal)`,  choosing   whether  to  use a
 1042%   `long` or `short` idle time based on the average firing rate.
 1043
 1044thread_repeat_wait(Goal) :-
 1045    new_rate_mma(5, 1000, State),
 1046    repeat,
 1047      update_rate_mma(State, MMA),
 1048      long(MMA, IsLong),
 1049      (   IsLong == brief
 1050      ->  call(Goal)
 1051      ;   thread_idle(Goal, IsLong)
 1052      ).
 1053
 1054long(MMA, brief) :-
 1055    MMA < 0.05,
 1056    !.
 1057long(MMA, short) :-
 1058    MMA < 1,
 1059    !.
 1060long(_, long).
 1061
 1062%!  new_rate_mma(+N, +Resolution, -State) is det.
 1063%!  update_rate_mma(!State, -MMA) is det.
 1064%
 1065%   Implement _Modified Moving  Average_  computing   the  average  time
 1066%   between requests as an exponential moving averate with alpha=1/N.
 1067%
 1068%   @arg Resolution is the time resolution  in 1/Resolution seconds. All
 1069%   storage is done in integers to avoid  the need for stack freezing in
 1070%   nb_setarg/3.
 1071%
 1072%   @see https://en.wikipedia.org/wiki/Moving_average
 1073
 1074new_rate_mma(N, Resolution, rstate(Base, 0, MaxI, Resolution, N, 0)) :-
 1075    current_prolog_flag(max_tagged_integer, MaxI),
 1076    get_time(Base).
 1077
 1078update_rate_mma(State, MMAr) :-
 1079    State = rstate(Base, Last, MaxI, Resolution, N, MMA0),
 1080    get_time(Now),
 1081    Stamp is round((Now-Base)*Resolution),
 1082    (   Stamp > MaxI
 1083    ->  nb_setarg(1, State, Now),
 1084        nb_setarg(2, State, 0)
 1085    ;   true
 1086    ),
 1087    Diff is Stamp-Last,
 1088    nb_setarg(2, State, Stamp),
 1089    MMA is round(((N-1)*MMA0+Diff)/N),
 1090    nb_setarg(6, State, MMA),
 1091    MMAr is MMA/float(Resolution).
 1092
 1093
 1094                 /*******************************
 1095                 *            MESSAGES          *
 1096                 *******************************/
 1097
 1098:- multifile
 1099    prolog:message/3. 1100
 1101prolog:message(httpd_started_server(Port, Options)) -->
 1102    [ 'Started server at '-[] ],
 1103    http_root(Port, Options).
 1104prolog:message(httpd_stopped_worker(Self, Status)) -->
 1105    [ 'Stopped worker ~p: ~p'-[Self, Status] ].
 1106prolog:message(httpd_restarted_worker(Self)) -->
 1107    [ 'Replaced aborted worker ~p'-[Self] ].
 1108prolog:message(httpd(created_pool(Pool))) -->
 1109    [ 'Created thread-pool ~p of size 10'-[Pool], nl,
 1110      'Create this pool at startup-time or define the hook ', nl,
 1111      'http:create_pool/1 to avoid this message and create a ', nl,
 1112      'pool that fits the usage-profile.'
 1113    ].
 1114
 1115http_root(Address, Options) -->
 1116    { landing_page(Address, URI, Options) },
 1117    [ '~w'-[URI] ].
 1118
 1119landing_page(Host:Port, URI, Options) :-
 1120    !,
 1121    must_be(atom, Host),
 1122    must_be(integer, Port),
 1123    http_server_property(Port, scheme(Scheme)),
 1124    (   default_port(Scheme, Port)
 1125    ->  format(atom(Base), '~w://~w', [Scheme, Host])
 1126    ;   format(atom(Base), '~w://~w:~w', [Scheme, Host, Port])
 1127    ),
 1128    entry_page(Base, URI, Options).
 1129landing_page(unix_socket(Path), URI, _Options) :-
 1130    !,
 1131    format(string(URI), 'Unix domain socket "~w"', [Path]).
 1132landing_page(Port, URI, Options) :-
 1133    landing_page(localhost:Port, URI, Options).
 1134
 1135default_port(http, 80).
 1136default_port(https, 443).
 1137
 1138entry_page(Base, URI, Options) :-
 1139    option(entry_page(Entry), Options),
 1140    !,
 1141    uri_resolve(Entry, Base, URI).
 1142entry_page(Base, URI, _) :-
 1143    http_absolute_location(root(.), Entry, []),
 1144    uri_resolve(Entry, Base, URI)