1/* Part of SWI-Prolog 2 3 Author: Jan Wielemaker 4 E-mail: J.Wielemaker@vu.nl 5 WWW: http://www.swi-prolog.org 6 Copyright (c) 2002-2024, University of Amsterdam 7 VU University Amsterdam 8 CWI, Amsterdam 9 SWI-Prolog Solutions b.v. 10 All rights reserved. 11 12 Redistribution and use in source and binary forms, with or without 13 modification, are permitted provided that the following conditions 14 are met: 15 16 1. Redistributions of source code must retain the above copyright 17 notice, this list of conditions and the following disclaimer. 18 19 2. Redistributions in binary form must reproduce the above copyright 20 notice, this list of conditions and the following disclaimer in 21 the documentation and/or other materials provided with the 22 distribution. 23 24 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 25 "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 26 LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 27 FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 28 COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 29 INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 30 BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 31 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 32 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 33 LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 34 ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 35 POSSIBILITY OF SUCH DAMAGE. 36*/ 37 38:- module(thread_httpd, 39 [ http_current_server/2, % ?:Goal, ?Port 40 http_server_property/2, % ?Port, ?Property 41 http_server/2, % :Goal, +Options 42 http_workers/2, % +Port, ?WorkerCount 43 http_add_worker/2, % +Port, +Options 44 http_current_worker/2, % ?Port, ?ThreadID 45 http_stop_server/2, % +Port, +Options 46 http_spawn/2, % :Goal, +Options 47 48 http_requeue/1, % +Request 49 http_close_connection/1, % +Request 50 http_enough_workers/3 % +Queue, +Why, +Peer 51 ]). 52:- use_module(library(debug)). 53:- use_module(library(error)). 54:- use_module(library(option)). 55:- use_module(library(socket)). 56:- use_module(library(thread_pool)). 57:- use_module(library(gensym)). 58:- use_module(http_wrapper). 59:- use_module(http_path). 60 61:- autoload(library(uri), [uri_resolve/3]). 62:- autoload(library(aggregate), [aggregate_all/3]). 63 64:- predicate_options(http_server/2, 2, 65 [ port(any), 66 unix_socket(atom), 67 entry_page(atom), 68 tcp_socket(any), 69 workers(positive_integer), 70 timeout(number), 71 keep_alive_timeout(number), 72 silent(boolean), 73 ssl(list(any)), % if http/http_ssl_plugin is loaded 74 pass_to(system:thread_create/3, 3) 75 ]). 76:- predicate_options(http_spawn/2, 2, 77 [ pool(atom), 78 pass_to(system:thread_create/3, 3), 79 pass_to(thread_pool:thread_create_in_pool/4, 4) 80 ]). 81:- predicate_options(http_add_worker/2, 2, 82 [ timeout(number), 83 keep_alive_timeout(number), 84 max_idle_time(number), 85 pass_to(system:thread_create/3, 3) 86 ]).
114:- meta_predicate 115 http_server( , ), 116 http_current_server( , ), 117 http_spawn( , ). 118 119:- dynamic 120 current_server/6, % Port, Goal, Thread, Queue, Scheme, StartTime 121 queue_worker/2, % Queue, ThreadID 122 queue_options/2. % Queue, Options 123 124:- multifile 125 make_socket_hook/3, 126 accept_hook/2, 127 close_hook/1, 128 open_client_hook/6, 129 discard_client_hook/1, 130 http:create_pool/1, 131 http:schedule_workers/1. 132 133:- meta_predicate 134 thread_repeat_wait( ).
main
thread.
If you need to control resource usage you may consider the
spawn
option of http_handler/3 and library(thread_pool).true
(default false
), do not print an informational
message that the server was started.A typical initialization for an HTTP server that uses http_dispatch/1 to relay requests to predicates is:
:- use_module(library(http/thread_httpd)). :- use_module(library(http/http_dispatch)). start_server(Port) :- http_server(http_dispatch, [port(Port)]).
Note that multiple servers can coexist in the same Prolog process. A notable application of this is to have both an HTTP and HTTPS server, where the HTTP server redirects to the HTTPS server for handling sensitive requests.
199http_server(Goal, M:Options0) :- 200 server_address(Address, Options0), 201 !, 202 make_socket(Address, M:Options0, Options), 203 create_workers(Options), 204 create_server(Goal, Address, Options), 205 ( option(silent(true), Options0) 206 -> true 207 ; print_message(informational, 208 httpd_started_server(Address, Options0)) 209 ). 210http_server(_Goal, _:Options0) :- 211 existence_error(server_address, Options0). 212 213server_address(Address, Options) :- 214 ( option(port(Port), Options) 215 -> Address = Port 216 ; option(unix_socket(Path), Options) 217 -> Address = unix_socket(Path) 218 ). 219 220address_port(_IFace:Port, Port) :- !. 221address_port(unix_socket(Path), Path) :- !. 222address_port(Address, Address) :- !. 223 224tcp_address(Port) :- 225 var(Port), 226 !. 227tcp_address(Port) :- 228 integer(Port), 229 !. 230tcp_address(_Iface:_Port). 231 232address_domain(localhost:_Port, Domain) => 233 Domain = inet. 234address_domain(Iface:_Port, Domain) => 235 ( catch(ip_name(IP, Iface), error(_,_), fail), 236 functor(IP, ip, 8) 237 -> Domain = inet6 238 ; Domain = inet 239 ). 240address_domain(_, Domain) => 241 Domain = inet.
queue(QueueId)
.
252make_socket(Address, M:Options0, Options) :- 253 tcp_address(Address), 254 make_socket_hook(Address, M:Options0, Options), 255 !. 256make_socket(Address, _:Options0, Options) :- 257 option(tcp_socket(_), Options0), 258 !, 259 make_addr_atom('httpd', Address, Queue), 260 Options = [ queue(Queue) 261 | Options0 262 ]. 263make_socket(Address, _:Options0, Options) :- 264 tcp_address(Address), 265 !, 266 address_domain(Address, Domain), 267 socket_create(Socket, [domain(Domain)]), 268 tcp_setopt(Socket, reuseaddr), 269 tcp_bind(Socket, Address), 270 tcp_listen(Socket, 64), 271 make_addr_atom('httpd', Address, Queue), 272 Options = [ queue(Queue), 273 tcp_socket(Socket) 274 | Options0 275 ]. 276:- if(current_predicate(unix_domain_socket/1)). 277make_socket(Address, _:Options0, Options) :- 278 Address = unix_socket(Path), 279 !, 280 unix_domain_socket(Socket), 281 tcp_bind(Socket, Path), 282 tcp_listen(Socket, 64), 283 make_addr_atom('httpd', Address, Queue), 284 Options = [ queue(Queue), 285 tcp_socket(Socket) 286 | Options0 287 ]. 288:- endif.
295make_addr_atom(Scheme, Address, Atom) :- 296 phrase(address_parts(Address), Parts), 297 atomic_list_concat([Scheme,@|Parts], Atom). 298 299address_parts(Var) --> 300 { var(Var), 301 !, 302 instantiation_error(Var) 303 }. 304address_parts(Atomic) --> 305 { atomic(Atomic) }, 306 !, 307 [Atomic]. 308address_parts(Host:Port) --> 309 !, 310 address_parts(Host), [:], address_parts(Port). 311address_parts(ip(A,B,C,D)) --> 312 !, 313 [ A, '.', B, '.', C, '.', D ]. 314address_parts(unix_socket(Path)) --> 315 [Path]. 316address_parts(Address) --> 317 { domain_error(http_server_address, Address) }.
325create_server(Goal, Address, Options) :- 326 get_time(StartTime), 327 memberchk(queue(Queue), Options), 328 scheme(Scheme, Options), 329 autoload_https(Scheme), 330 address_port(Address, Port), 331 make_addr_atom(Scheme, Port, Alias), 332 thread_self(Initiator), 333 thread_create(accept_server(Goal, Initiator, Options), _, 334 [ alias(Alias) 335 ]), 336 thread_get_message(server_started), 337 assert(current_server(Port, Goal, Alias, Queue, Scheme, StartTime)). 338 339scheme(Scheme, Options) :- 340 option(scheme(Scheme), Options), 341 !. 342scheme(Scheme, Options) :- 343 ( option(ssl(_), Options) 344 ; option(ssl_instance(_), Options) 345 ), 346 !, 347 Scheme = https. 348scheme(http, _). 349 350autoload_https(https) :- 351 \+ clause(accept_hook(_Goal, _Options), _), 352 exists_source(library(http/http_ssl_plugin)), 353 !, 354 use_module(library(http/http_ssl_plugin)). 355autoload_https(_).
363http_current_server(Goal, Port) :-
364 current_server(Port, Goal, _, _, _, _).
http
or https
.380http_server_property(_:Port, Property) :- 381 integer(Port), 382 !, 383 server_property(Property, Port). 384http_server_property(Port, Property) :- 385 server_property(Property, Port). 386 387server_property(goal(Goal), Port) :- 388 current_server(Port, Goal, _, _, _, _). 389server_property(scheme(Scheme), Port) :- 390 current_server(Port, _, _, _, Scheme, _). 391server_property(start_time(Time), Port) :- 392 current_server(Port, _, _, _, _, Time).
405http_workers(Port, Workers) :- 406 integer(Workers), 407 !, 408 must_be(ground, Port), 409 ( current_server(Port, _, _, Queue, _, _) 410 -> resize_pool(Queue, Workers) 411 ; existence_error(http_server, Port) 412 ). 413http_workers(Port, Workers) :- 414 current_server(Port, _, _, Queue, _, _), 415 aggregate_all(count, queue_worker(Queue, _Worker), Workers).
428http_add_worker(Port, Options) :- 429 must_be(ground, Port), 430 current_server(Port, _, _, Queue, _, _), 431 !, 432 queue_options(Queue, QueueOptions), 433 merge_options(Options, QueueOptions, WorkerOptions), 434 atom_concat(Queue, '_', AliasBase), 435 create_workers(1, 1, Queue, AliasBase, WorkerOptions). 436http_add_worker(Port, _) :- 437 existence_error(http_server, Port).
447http_current_worker(Port, ThreadID) :-
448 current_server(Port, _, _, Queue, _, _),
449 queue_worker(Queue, ThreadID).
457accept_server(Goal, Initiator, Options) :- 458 Ex = http_stop(Stopper), 459 catch(accept_server2(Goal, Initiator, Options), Ex, true), 460 thread_self(Thread), 461 debug(http(stop), '[~p]: accept server received ~p', [Thread, Ex]), 462 retract(current_server(_Port, _, Thread, Queue, _Scheme, _StartTime)), 463 close_pending_accepts(Queue), 464 close_server_socket(Options), 465 thread_send_message(Stopper, http_stopped). 466 467accept_server2(Goal, Initiator, Options) :- 468 thread_send_message(Initiator, server_started), 469 repeat, 470 ( catch(accept_server3(Goal, Options), E, true) 471 -> ( var(E) 472 -> fail 473 ; accept_rethrow_error(E) 474 -> throw(E) 475 ; print_message(error, E), 476 fail 477 ) 478 ; print_message(error, % internal error 479 goal_failed(accept_server3(Goal, Options))), 480 fail 481 ). 482 483accept_server3(Goal, Options) :- 484 accept_hook(Goal, Options), 485 !. 486accept_server3(Goal, Options) :- 487 memberchk(tcp_socket(Socket), Options), 488 memberchk(queue(Queue), Options), 489 debug(http(connection), 'Waiting for connection', []), 490 tcp_accept(Socket, Client, Peer), 491 sig_atomic(send_to_worker(Queue, Client, Goal, Peer)), 492 http_enough_workers(Queue, accept, Peer). 493 494send_to_worker(Queue, Client, Goal, Peer) :- 495 debug(http(connection), 'New HTTP connection from ~p', [Peer]), 496 thread_send_message(Queue, tcp_client(Client, Goal, Peer)). 497 498accept_rethrow_error(http_stop(_)). 499accept_rethrow_error('$aborted').
506close_server_socket(Options) :- 507 close_hook(Options), 508 !. 509close_server_socket(Options) :- 510 memberchk(tcp_socket(Socket), Options), 511 !, 512 tcp_close_socket(Socket).
516close_pending_accepts(Queue) :- 517 ( thread_get_message(Queue, Msg, [timeout(0)]) 518 -> close_client(Msg), 519 close_pending_accepts(Queue) 520 ; true 521 ). 522 523close_client(tcp_client(Client, _Goal, _0Peer)) => 524 debug(http(stop), 'Closing connection from ~p during shut-down', [_0Peer]), 525 tcp_close_socket(Client). 526close_client(Msg) => 527 ( discard_client_hook(Msg) 528 -> true 529 ; print_message(warning, http_close_client(Msg)) 530 ).
540http_stop_server(Host:Port, Options) :- % e.g., localhost:4000 541 ground(Host), 542 !, 543 http_stop_server(Port, Options). 544http_stop_server(Port, _Options) :- 545 http_workers(Port, 0), % checks Port is ground 546 current_server(Port, _, Thread, Queue, _Scheme, _Start), 547 retractall(queue_options(Queue, _)), 548 debug(http(stop), 'Signalling HTTP server thread ~p to stop', [Thread]), 549 thread_self(Stopper), 550 thread_signal(Thread, throw(http_stop(Stopper))), 551 ( thread_get_message(Stopper, http_stopped, [timeout(0.1)]) 552 -> true 553 ; catch(connect(localhost:Port), _, true) 554 ), 555 thread_join(Thread, _0Status), 556 debug(http(stop), 'Joined HTTP server thread ~p (~p)', [Thread, _0Status]), 557 message_queue_destroy(Queue). 558 559connect(Address) :- 560 setup_call_cleanup( 561 tcp_socket(Socket), 562 tcp_connect(Socket, Address), 563 tcp_close_socket(Socket)).
571http_enough_workers(Queue, _Why, _Peer) :- 572 message_queue_property(Queue, waiting(_0)), 573 !, 574 debug(http(scheduler), '~D waiting for work; ok', [_0]). 575http_enough_workers(Queue, Why, Peer) :- 576 message_queue_property(Queue, size(Size)), 577 ( enough(Size, Why) 578 -> debug(http(scheduler), '~D in queue; ok', [Size]) 579 ; current_server(Port, _, _, Queue, _, _), 580 Data = _{ port:Port, 581 reason:Why, 582 peer:Peer, 583 waiting:Size, 584 queue:Queue 585 }, 586 debug(http(scheduler), 'Asking to reschedule: ~p', [Data]), 587 catch(http:schedule_workers(Data), 588 Error, 589 print_message(error, Error)) 590 -> true 591 ; true 592 ). 593 594enough(0, _). 595enough(1, keep_alive). % I will be ready myself
accept
for a new connection or keep_alive
if a
worker tries to reschedule itself.
Note that, when called with reason:accept
, we are called in
the time critical main accept loop. An implementation of this
hook shall typically send the event to thread dedicated to
dynamic worker-pool management.
625 /******************************* 626 * WORKER QUEUE OPERATIONS * 627 *******************************/
634create_workers(Options) :- 635 option(workers(N), Options, 5), 636 option(queue(Queue), Options), 637 catch(message_queue_create(Queue), _, true), 638 atom_concat(Queue, '_', AliasBase), 639 create_workers(1, N, Queue, AliasBase, Options), 640 assert(queue_options(Queue, Options)). 641 642create_workers(I, N, _, _, _) :- 643 I > N, 644 !. 645create_workers(I, N, Queue, AliasBase, Options) :- 646 gensym(AliasBase, Alias), 647 thread_create(http_worker(Options), Id, 648 [ alias(Alias) 649 | Options 650 ]), 651 assertz(queue_worker(Queue, Id)), 652 I2 is I + 1, 653 create_workers(I2, N, Queue, AliasBase, Options).
661resize_pool(Queue, Size) :-
662 findall(W, queue_worker(Queue, W), Workers),
663 length(Workers, Now),
664 ( Now < Size
665 -> queue_options(Queue, Options),
666 atom_concat(Queue, '_', AliasBase),
667 I0 is Now+1,
668 create_workers(I0, Size, Queue, AliasBase, Options)
669 ; Now == Size
670 -> true
671 ; Now > Size
672 -> Excess is Now - Size,
673 thread_self(Me),
674 forall(between(1, Excess, _), thread_send_message(Queue, quit(Me))),
675 forall(between(1, Excess, _), thread_get_message(quitted(_)))
676 ).
If the message quit(Sender)
is read from the queue, the worker
stops.
687http_worker(Options) :- 688 debug(http(scheduler), 'New worker', []), 689 prolog_listen(this_thread_exit, done_worker), 690 option(queue(Queue), Options), 691 option(max_idle_time(MaxIdle), Options, infinite), 692 thread_repeat_wait(get_work(Queue, Message, MaxIdle)), 693 debug(http(worker), 'Waiting for a job ...', []), 694 debug(http(worker), 'Got job ~p', [Message]), 695 ( Message = quit(Sender) 696 -> !, 697 thread_self(Self), 698 thread_detach(Self), 699 ( Sender == idle 700 -> true 701 ; retract(queue_worker(Queue, Self)), 702 thread_send_message(Sender, quitted(Self)) 703 ) 704 ; open_client(Message, Queue, Goal, In, Out, 705 Options, ClientOptions), 706 ( catch(http_process(Goal, In, Out, ClientOptions), 707 Error, true) 708 -> true 709 ; Error = goal_failed(http_process/4) 710 ), 711 ( var(Error) 712 -> fail 713 ; current_message_level(Error, Level), 714 print_message(Level, Error), 715 memberchk(peer(Peer), ClientOptions), 716 close_connection(Peer, In, Out), 717 fail 718 ) 719 ). 720 721get_work(Queue, Message, infinite) :- 722 !, 723 thread_get_message(Queue, Message). 724get_work(Queue, Message, MaxIdle) :- 725 ( thread_get_message(Queue, Message, [timeout(MaxIdle)]) 726 -> true 727 ; Message = quit(idle) 728 ).
737open_client(requeue(In, Out, Goal, ClOpts), 738 _, Goal, In, Out, Opts, ClOpts) :- 739 !, 740 memberchk(peer(Peer), ClOpts), 741 option(keep_alive_timeout(KeepAliveTMO), Opts, 2), 742 check_keep_alive_connection(In, KeepAliveTMO, Peer, In, Out). 743open_client(Message, Queue, Goal, In, Out, Opts, 744 [ pool(client(Queue, Goal, In, Out)), 745 timeout(Timeout) 746 | Options 747 ]) :- 748 catch(open_client(Message, Goal, In, Out, Options, Opts), 749 E, report_error(E)), 750 option(timeout(Timeout), Opts, 60), 751 ( debugging(http(connection)) 752 -> memberchk(peer(Peer), Options), 753 debug(http(connection), 'Opened connection from ~p', [Peer]) 754 ; true 755 ).
761open_client(Message, Goal, In, Out, ClientOptions, Options) :- 762 open_client_hook(Message, Goal, In, Out, ClientOptions, Options), 763 !. 764open_client(tcp_client(Socket, Goal, Peer), Goal, In, Out, 765 [ peer(Peer), 766 protocol(http) 767 ], _) :- 768 tcp_open_socket(Socket, In, Out). 769 770report_error(E) :- 771 print_message(error, E), 772 fail.
781check_keep_alive_connection(In, TMO, Peer, In, Out) :-
782 stream_property(In, timeout(Old)),
783 set_stream(In, timeout(TMO)),
784 debug(http(keep_alive), 'Waiting for keep-alive ...', []),
785 catch(peek_code(In, Code), E, true),
786 ( var(E), % no exception
787 Code \== -1 % no end-of-file
788 -> set_stream(In, timeout(Old)),
789 debug(http(keep_alive), '\tre-using keep-alive connection', [])
790 ; ( Code == -1
791 -> debug(http(keep_alive), '\tRemote closed keep-alive connection', [])
792 ; debug(http(keep_alive), '\tTimeout on keep-alive connection', [])
793 ),
794 close_connection(Peer, In, Out),
795 fail
796 ).
805done_worker :- 806 thread_self(Self), 807 thread_detach(Self), 808 retract(queue_worker(Queue, Self)), 809 thread_property(Self, status(Status)), 810 !, 811 ( catch(recreate_worker(Status, Queue), _, fail) 812 -> print_message(informational, 813 httpd_restarted_worker(Self)) 814 ; done_status_message_level(Status, Level), 815 print_message(Level, 816 httpd_stopped_worker(Self, Status)) 817 ). 818done_worker :- % received quit(Sender) 819 thread_self(Self), 820 thread_property(Self, status(Status)), 821 done_status_message_level(Status, Level), 822 print_message(Level, 823 httpd_stopped_worker(Self, Status)). 824 825done_status_message_level(true, silent) :- !. 826done_status_message_level(exception('$aborted'), silent) :- !. 827done_status_message_level(_, informational).
The first clause deals with the possibility that we cannot write to
user_error
. This is possible when Prolog is started as a service
using some service managers. Would be nice if we could write an
error, but where?
842recreate_worker(exception(error(io_error(write,user_error),_)), _Queue) :- 843 halt(2). 844recreate_worker(exception(Error), Queue) :- 845 recreate_on_error(Error), 846 queue_options(Queue, Options), 847 atom_concat(Queue, '_', AliasBase), 848 create_workers(1, 1, Queue, AliasBase, Options). 849 850recreate_on_error('$aborted'). 851recreate_on_error(time_limit_exceeded).
860:- multifile 861 message_level/2. 862 863message_level(error(io_error(read, _), _), silent). 864message_level(error(socket_error(epipe,_), _), silent). 865message_level(error(http_write_short(_Obj,_Written), _), silent). 866message_level(error(timeout_error(read, _), _), informational). 867message_level(keep_alive_timeout, silent). 868 869current_message_level(Term, Level) :- 870 ( message_level(Term, Level) 871 -> true 872 ; Level = error 873 ).
881http_requeue(Header) :- 882 requeue_header(Header, ClientOptions), 883 memberchk(pool(client(Queue, Goal, In, Out)), ClientOptions), 884 memberchk(peer(Peer), ClientOptions), 885 http_enough_workers(Queue, keep_alive, Peer), 886 thread_send_message(Queue, requeue(In, Out, Goal, ClientOptions)), 887 !. 888http_requeue(Header) :- 889 debug(http(error), 'Re-queue failed: ~p', [Header]), 890 fail. 891 892requeue_header([], []). 893requeue_header([H|T0], [H|T]) :- 894 requeue_keep(H), 895 !, 896 requeue_header(T0, T). 897requeue_header([_|T0], T) :- 898 requeue_header(T0, T). 899 900requeue_keep(pool(_)). 901requeue_keep(peer(_)). 902requeue_keep(protocol(_)).
909http_process(Goal, In, Out, Options) :- 910 debug(http(server), 'Running server goal ~p on ~p -> ~p', 911 [Goal, In, Out]), 912 option(timeout(TMO), Options, 60), 913 set_stream(In, timeout(TMO)), 914 set_stream(Out, timeout(TMO)), 915 http_wrapper(Goal, In, Out, Connection, 916 [ request(Request) 917 | Options 918 ]), 919 next(Connection, Request). 920 921next(Connection, Request) :- 922 next_(Connection, Request), !. 923next(Connection, Request) :- 924 print_message(warning, goal_failed(next(Connection,Request))). 925 926next_(switch_protocol(SwitchGoal, _SwitchOptions), Request) :- 927 !, 928 memberchk(pool(client(_Queue, _Goal, In, Out)), Request), 929 ( catch(call(SwitchGoal, In, Out), E, 930 ( print_message(error, E), 931 fail)) 932 -> true 933 ; http_close_connection(Request) 934 ). 935next_(spawned(ThreadId), _) :- 936 !, 937 debug(http(spawn), 'Handler spawned to thread ~w', [ThreadId]). 938next_(Connection, Request) :- 939 downcase_atom(Connection, 'keep-alive'), 940 http_requeue(Request), 941 !. 942next_(_, Request) :- 943 http_close_connection(Request).
950http_close_connection(Request) :-
951 memberchk(pool(client(_Queue, _Goal, In, Out)), Request),
952 memberchk(peer(Peer), Request),
953 close_connection(Peer, In, Out).
960close_connection(Peer, In, Out) :-
961 debug(http(connection), 'Closing connection from ~p', [Peer]),
962 catch(close(In, [force(true)]), _, true),
963 catch(close(Out, [force(true)]), _, true).
If a pool does not exist, this predicate calls the multifile hook http:create_pool/1 to create it. If this predicate succeeds the operation is retried.
981http_spawn(Goal, Options) :- 982 select_option(pool(Pool), Options, ThreadOptions), 983 !, 984 current_output(CGI), 985 catch(thread_create_in_pool(Pool, 986 wrap_spawned(CGI, Goal), Id, 987 [ detached(true) 988 | ThreadOptions 989 ]), 990 Error, 991 true), 992 ( var(Error) 993 -> http_spawned(Id) 994 ; Error = error(resource_error(threads_in_pool(_)), _) 995 -> throw(http_reply(busy)) 996 ; Error = error(existence_error(thread_pool, Pool), _), 997 create_pool(Pool) 998 -> http_spawn(Goal, Options) 999 ; throw(Error) 1000 ). 1001http_spawn(Goal, Options) :- 1002 current_output(CGI), 1003 thread_create(wrap_spawned(CGI, Goal), Id, 1004 [ detached(true) 1005 | Options 1006 ]), 1007 http_spawned(Id). 1008 1009wrap_spawned(CGI, Goal) :- 1010 set_output(CGI), 1011 http_wrap_spawned(Goal, Request, Connection), 1012 next(Connection, Request).
1022create_pool(Pool) :- 1023 E = error(permission_error(create, thread_pool, Pool), _), 1024 catch(http:create_pool(Pool), E, true). 1025create_pool(Pool) :- 1026 print_message(informational, httpd(created_pool(Pool))), 1027 thread_pool_create(Pool, 10, []). 1028 1029 1030 /******************************* 1031 * WAIT POLICIES * 1032 *******************************/ 1033 1034:- meta_predicate 1035 thread_repeat_wait( ).
repeat, thread_idle(Goal)
, choosing whether to use a
long
or short
idle time based on the average firing rate.1042thread_repeat_wait(Goal) :- 1043 new_rate_mma(5, 1000, State), 1044 repeat, 1045 update_rate_mma(State, MMA), 1046 long(MMA, IsLong), 1047 ( IsLong == brief 1048 -> call(Goal) 1049 ; thread_idle(Goal, IsLong) 1050 ). 1051 1052long(MMA, brief) :- 1053 MMA < 0.05, 1054 !. 1055long(MMA, short) :- 1056 MMA < 1, 1057 !. 1058long(_, long).
1072new_rate_mma(N, Resolution, rstate(Base, 0, MaxI, Resolution, N, 0)) :- 1073 current_prolog_flag(max_tagged_integer, MaxI), 1074 get_time(Base). 1075 1076update_rate_mma(State, MMAr) :- 1077 State = rstate(Base, Last, MaxI, Resolution, N, MMA0), 1078 get_time(Now), 1079 Stamp is round((Now-Base)*Resolution), 1080 ( Stamp > MaxI 1081 -> nb_setarg(1, State, Now), 1082 nb_setarg(2, State, 0) 1083 ; true 1084 ), 1085 Diff is Stamp-Last, 1086 nb_setarg(2, State, Stamp), 1087 MMA is round(((N-1)*MMA0+Diff)/N), 1088 nb_setarg(6, State, MMA), 1089 MMAr is MMA/float(Resolution). 1090 1091 1092 /******************************* 1093 * MESSAGES * 1094 *******************************/ 1095 1096:- multifile 1097 prolog:message/3. 1098 1099prologmessage(httpd_started_server(Port, Options)) --> 1100 [ 'Started server at '-[] ], 1101 http_root(Port, Options). 1102prologmessage(httpd_stopped_worker(Self, Status)) --> 1103 [ 'Stopped worker ~p: ~p'-[Self, Status] ]. 1104prologmessage(httpd_restarted_worker(Self)) --> 1105 [ 'Replaced aborted worker ~p'-[Self] ]. 1106prologmessage(httpd(created_pool(Pool))) --> 1107 [ 'Created thread-pool ~p of size 10'-[Pool], nl, 1108 'Create this pool at startup-time or define the hook ', nl, 1109 'http:create_pool/1 to avoid this message and create a ', nl, 1110 'pool that fits the usage-profile.' 1111 ]. 1112 1113http_root(Address, Options) --> 1114 { landing_page(Address, URI, Options) }, 1115 [ '~w'-[URI] ]. 1116 1117landing_page(Host:Port, URI, Options) :- 1118 !, 1119 must_be(atom, Host), 1120 must_be(integer, Port), 1121 http_server_property(Port, scheme(Scheme)), 1122 ( default_port(Scheme, Port) 1123 -> format(atom(Base), '~w://~w', [Scheme, Host]) 1124 ; format(atom(Base), '~w://~w:~w', [Scheme, Host, Port]) 1125 ), 1126 entry_page(Base, URI, Options). 1127landing_page(unix_socket(Path), URI, _Options) :- 1128 !, 1129 format(string(URI), 'Unix domain socket "~w"', [Path]). 1130landing_page(Port, URI, Options) :- 1131 landing_page(localhost:Port, URI, Options). 1132 1133default_port(http, 80). 1134default_port(https, 443). 1135 1136entry_page(Base, URI, Options) :- 1137 option(entry_page(Entry), Options), 1138 !, 1139 uri_resolve(Entry, Base, URI). 1140entry_page(Base, URI, _) :- 1141 http_absolute_location(root(.), Entry, []), 1142 uri_resolve(Entry, Base, URI)
Threaded HTTP server
Most code doesn't need to use this directly; instead use library(http/http_server), which combines this library with the typical HTTP libraries that most servers need.
This library defines the HTTP server frontend of choice for SWI-Prolog. It is based on the multi-threading capabilities of SWI-Prolog and thus exploits multiple cores to serve requests concurrently. The server scales well and can cooperate with library(thread_pool) to control the number of concurrent requests of a given type. For example, it can be configured to handle 200 file download requests concurrently, 2 requests that potentially uses a lot of memory and 8 requests that use a lot of CPU resources.
On Unix systems, this library can be combined with library(http/http_unix_daemon) to realise a proper Unix service process that creates a web server at port 80, runs under a specific account, optionally detaches from the controlling terminal, etc.
Combined with library(http/http_ssl_plugin) from the SSL package, this library can be used to create an HTTPS server. See <plbase>/doc/packages/examples/ssl/https for an example server using a self-signed SSL certificate. */