1/* Part of SWI-Prolog 2 3 Author: Jan Wielemaker 4 E-mail: J.Wielemaker@vu.nl 5 WWW: http://www.swi-prolog.org 6 Copyright (c) 2002-2022, University of Amsterdam 7 VU University Amsterdam 8 CWI, Amsterdam 9 All rights reserved. 10 11 Redistribution and use in source and binary forms, with or without 12 modification, are permitted provided that the following conditions 13 are met: 14 15 1. Redistributions of source code must retain the above copyright 16 notice, this list of conditions and the following disclaimer. 17 18 2. Redistributions in binary form must reproduce the above copyright 19 notice, this list of conditions and the following disclaimer in 20 the documentation and/or other materials provided with the 21 distribution. 22 23 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 24 "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 25 LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 26 FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 27 COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 28 INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 29 BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 30 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 31 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 32 LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 33 ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 34 POSSIBILITY OF SUCH DAMAGE. 35*/ 36 37:- module(thread_httpd, 38 [ http_current_server/2, % ?:Goal, ?Port 39 http_server_property/2, % ?Port, ?Property 40 http_server/2, % :Goal, +Options 41 http_workers/2, % +Port, ?WorkerCount 42 http_add_worker/2, % +Port, +Options 43 http_current_worker/2, % ?Port, ?ThreadID 44 http_stop_server/2, % +Port, +Options 45 http_spawn/2, % :Goal, +Options 46 47 http_requeue/1, % +Request 48 http_close_connection/1, % +Request 49 http_enough_workers/3 % +Queue, +Why, +Peer 50 ]). 51:- use_module(library(debug)). 52:- use_module(library(error)). 53:- use_module(library(option)). 54:- use_module(library(socket)). 55:- use_module(library(thread_pool)). 56:- use_module(library(gensym)). 57:- use_module(http_wrapper). 58:- use_module(http_path). 59 60:- autoload(library(uri), [uri_resolve/3]). 61 62:- predicate_options(http_server/2, 2, 63 [ port(any), 64 unix_socket(atom), 65 entry_page(atom), 66 tcp_socket(any), 67 workers(positive_integer), 68 timeout(number), 69 keep_alive_timeout(number), 70 silent(boolean), 71 ssl(list(any)), % if http/http_ssl_plugin is loaded 72 pass_to(system:thread_create/3, 3) 73 ]). 74:- predicate_options(http_spawn/2, 2, 75 [ pool(atom), 76 pass_to(system:thread_create/3, 3), 77 pass_to(thread_pool:thread_create_in_pool/4, 4) 78 ]). 79:- predicate_options(http_add_worker/2, 2, 80 [ timeout(number), 81 keep_alive_timeout(number), 82 max_idle_time(number), 83 pass_to(system:thread_create/3, 3) 84 ]).
112:- meta_predicate 113 http_server( , ), 114 http_current_server( , ), 115 http_spawn( , ). 116 117:- dynamic 118 current_server/6, % Port, Goal, Thread, Queue, Scheme, StartTime 119 queue_worker/2, % Queue, ThreadID 120 queue_options/2. % Queue, Options 121 122:- multifile 123 make_socket_hook/3, 124 accept_hook/2, 125 close_hook/1, 126 open_client_hook/6, 127 discard_client_hook/1, 128 http:create_pool/1, 129 http:schedule_workers/1. 130 131:- meta_predicate 132 thread_repeat_wait( ).
main
thread.
If you need to control resource usage you may consider the
spawn
option of http_handler/3 and library(thread_pool).true
(default false
), do not print an informational
message that the server was started.A typical initialization for an HTTP server that uses http_dispatch/1 to relay requests to predicates is:
:- use_module(library(http/thread_httpd)). :- use_module(library(http/http_dispatch)). start_server(Port) :- http_server(http_dispatch, [port(Port)]).
Note that multiple servers can coexist in the same Prolog process. A notable application of this is to have both an HTTP and HTTPS server, where the HTTP server redirects to the HTTPS server for handling sensitive requests.
197http_server(Goal, M:Options0) :- 198 server_address(Address, Options0), 199 !, 200 make_socket(Address, M:Options0, Options), 201 create_workers(Options), 202 create_server(Goal, Address, Options), 203 ( option(silent(true), Options0) 204 -> true 205 ; print_message(informational, 206 httpd_started_server(Address, Options0)) 207 ). 208http_server(_Goal, _:Options0) :- 209 existence_error(server_address, Options0). 210 211server_address(Address, Options) :- 212 ( option(port(Port), Options) 213 -> Address = Port 214 ; option(unix_socket(Path), Options) 215 -> Address = unix_socket(Path) 216 ). 217 218address_port(_IFace:Port, Port) :- !. 219address_port(unix_socket(Path), Path) :- !. 220address_port(Address, Address) :- !. 221 222tcp_address(Port) :- 223 var(Port), 224 !. 225tcp_address(Port) :- 226 integer(Port), 227 !. 228tcp_address(_Iface:_Port). 229 230address_domain(localhost:_Port, Domain) => 231 Domain = inet. 232address_domain(Iface:_Port, Domain) => 233 ( catch(ip_name(IP, Iface), error(_,_), fail), 234 functor(IP, ip, 8) 235 -> Domain = inet6 236 ; Domain = inet 237 ). 238address_domain(_, Domain) => 239 Domain = inet.
queue(QueueId)
.
250make_socket(Address, M:Options0, Options) :- 251 tcp_address(Address), 252 make_socket_hook(Address, M:Options0, Options), 253 !. 254make_socket(Address, _:Options0, Options) :- 255 option(tcp_socket(_), Options0), 256 !, 257 make_addr_atom('httpd', Address, Queue), 258 Options = [ queue(Queue) 259 | Options0 260 ]. 261make_socket(Address, _:Options0, Options) :- 262 tcp_address(Address), 263 !, 264 address_domain(Address, Domain), 265 socket_create(Socket, [domain(Domain)]), 266 tcp_setopt(Socket, reuseaddr), 267 tcp_bind(Socket, Address), 268 tcp_listen(Socket, 64), 269 make_addr_atom('httpd', Address, Queue), 270 Options = [ queue(Queue), 271 tcp_socket(Socket) 272 | Options0 273 ]. 274:- if(current_predicate(unix_domain_socket/1)). 275make_socket(Address, _:Options0, Options) :- 276 Address = unix_socket(Path), 277 !, 278 unix_domain_socket(Socket), 279 tcp_bind(Socket, Path), 280 tcp_listen(Socket, 64), 281 make_addr_atom('httpd', Address, Queue), 282 Options = [ queue(Queue), 283 tcp_socket(Socket) 284 | Options0 285 ]. 286:- endif.
293make_addr_atom(Scheme, Address, Atom) :- 294 phrase(address_parts(Address), Parts), 295 atomic_list_concat([Scheme,@|Parts], Atom). 296 297address_parts(Var) --> 298 { var(Var), 299 !, 300 instantiation_error(Var) 301 }. 302address_parts(Atomic) --> 303 { atomic(Atomic) }, 304 !, 305 [Atomic]. 306address_parts(Host:Port) --> 307 !, 308 address_parts(Host), [:], address_parts(Port). 309address_parts(ip(A,B,C,D)) --> 310 !, 311 [ A, '.', B, '.', C, '.', D ]. 312address_parts(unix_socket(Path)) --> 313 [Path]. 314address_parts(Address) --> 315 { domain_error(http_server_address, Address) }.
323create_server(Goal, Address, Options) :- 324 get_time(StartTime), 325 memberchk(queue(Queue), Options), 326 scheme(Scheme, Options), 327 autoload_https(Scheme), 328 address_port(Address, Port), 329 make_addr_atom(Scheme, Port, Alias), 330 thread_self(Initiator), 331 thread_create(accept_server(Goal, Initiator, Options), _, 332 [ alias(Alias) 333 ]), 334 thread_get_message(server_started), 335 assert(current_server(Port, Goal, Alias, Queue, Scheme, StartTime)). 336 337scheme(Scheme, Options) :- 338 option(scheme(Scheme), Options), 339 !. 340scheme(Scheme, Options) :- 341 ( option(ssl(_), Options) 342 ; option(ssl_instance(_), Options) 343 ), 344 !, 345 Scheme = https. 346scheme(http, _). 347 348autoload_https(https) :- 349 \+ clause(accept_hook(_Goal, _Options), _), 350 exists_source(library(http/http_ssl_plugin)), 351 !, 352 use_module(library(http/http_ssl_plugin)). 353autoload_https(_).
361http_current_server(Goal, Port) :-
362 current_server(Port, Goal, _, _, _, _).
http
or https
.378http_server_property(_:Port, Property) :- 379 integer(Port), 380 !, 381 server_property(Property, Port). 382http_server_property(Port, Property) :- 383 server_property(Property, Port). 384 385server_property(goal(Goal), Port) :- 386 current_server(Port, Goal, _, _, _, _). 387server_property(scheme(Scheme), Port) :- 388 current_server(Port, _, _, _, Scheme, _). 389server_property(start_time(Time), Port) :- 390 current_server(Port, _, _, _, _, Time).
400http_workers(Port, Workers) :- 401 must_be(ground, Port), 402 current_server(Port, _, _, Queue, _, _), 403 !, 404 ( integer(Workers) 405 -> resize_pool(Queue, Workers) 406 ; findall(W, queue_worker(Queue, W), WorkerIDs), 407 length(WorkerIDs, Workers) 408 ). 409http_workers(Port, _) :- 410 existence_error(http_server, Port).
423http_add_worker(Port, Options) :- 424 must_be(ground, Port), 425 current_server(Port, _, _, Queue, _, _), 426 !, 427 queue_options(Queue, QueueOptions), 428 merge_options(Options, QueueOptions, WorkerOptions), 429 atom_concat(Queue, '_', AliasBase), 430 create_workers(1, 1, Queue, AliasBase, WorkerOptions). 431http_add_worker(Port, _) :- 432 existence_error(http_server, Port).
442http_current_worker(Port, ThreadID) :-
443 current_server(Port, _, _, Queue, _, _),
444 queue_worker(Queue, ThreadID).
452accept_server(Goal, Initiator, Options) :- 453 catch(accept_server2(Goal, Initiator, Options), http_stop, true), 454 thread_self(Thread), 455 debug(http(stop), '[~p]: accept server received http_stop', [Thread]), 456 retract(current_server(_Port, _, Thread, Queue, _Scheme, _StartTime)), 457 close_pending_accepts(Queue), 458 close_server_socket(Options). 459 460accept_server2(Goal, Initiator, Options) :- 461 thread_send_message(Initiator, server_started), 462 repeat, 463 ( catch(accept_server3(Goal, Options), E, true) 464 -> ( var(E) 465 -> fail 466 ; accept_rethrow_error(E) 467 -> throw(E) 468 ; print_message(error, E), 469 fail 470 ) 471 ; print_message(error, % internal error 472 goal_failed(accept_server3(Goal, Options))), 473 fail 474 ). 475 476accept_server3(Goal, Options) :- 477 accept_hook(Goal, Options), 478 !. 479accept_server3(Goal, Options) :- 480 memberchk(tcp_socket(Socket), Options), 481 memberchk(queue(Queue), Options), 482 debug(http(connection), 'Waiting for connection', []), 483 tcp_accept(Socket, Client, Peer), 484 sig_atomic(send_to_worker(Queue, Client, Goal, Peer)), 485 http_enough_workers(Queue, accept, Peer). 486 487send_to_worker(Queue, Client, Goal, Peer) :- 488 debug(http(connection), 'New HTTP connection from ~p', [Peer]), 489 thread_send_message(Queue, tcp_client(Client, Goal, Peer)). 490 491accept_rethrow_error(http_stop). 492accept_rethrow_error('$aborted').
499close_server_socket(Options) :- 500 close_hook(Options), 501 !. 502close_server_socket(Options) :- 503 memberchk(tcp_socket(Socket), Options), 504 !, 505 tcp_close_socket(Socket).
509close_pending_accepts(Queue) :- 510 ( thread_get_message(Queue, Msg, [timeout(0)]) 511 -> close_client(Msg), 512 close_pending_accepts(Queue) 513 ; true 514 ). 515 516close_client(tcp_client(Client, _Goal, _0Peer)) => 517 debug(http(stop), 'Closing connection from ~p during shut-down', [_0Peer]), 518 tcp_close_socket(Client). 519close_client(Msg) => 520 ( discard_client_hook(Msg) 521 -> true 522 ; print_message(warning, http_close_client(Msg)) 523 ).
533http_stop_server(Host:Port, Options) :- % e.g., localhost:4000 534 ground(Host), 535 !, 536 http_stop_server(Port, Options). 537http_stop_server(Port, _Options) :- 538 http_workers(Port, 0), % checks Port is ground 539 current_server(Port, _, Thread, Queue, _Scheme, _Start), 540 retractall(queue_options(Queue, _)), 541 debug(http(stop), 'Signalling HTTP server thread ~p to stop', [Thread]), 542 thread_signal(Thread, throw(http_stop)), 543 catch(connect(localhost:Port), _, true), 544 thread_join(Thread, _0Status), 545 debug(http(stop), 'Joined HTTP server thread ~p (~p)', [Thread, _0Status]), 546 message_queue_destroy(Queue). 547 548connect(Address) :- 549 setup_call_cleanup( 550 tcp_socket(Socket), 551 tcp_connect(Socket, Address), 552 tcp_close_socket(Socket)).
560http_enough_workers(Queue, _Why, _Peer) :- 561 message_queue_property(Queue, waiting(_0)), 562 !, 563 debug(http(scheduler), '~D waiting for work; ok', [_0]). 564http_enough_workers(Queue, Why, Peer) :- 565 message_queue_property(Queue, size(Size)), 566 ( enough(Size, Why) 567 -> debug(http(scheduler), '~D in queue; ok', [Size]) 568 ; current_server(Port, _, _, Queue, _, _), 569 Data = _{ port:Port, 570 reason:Why, 571 peer:Peer, 572 waiting:Size, 573 queue:Queue 574 }, 575 debug(http(scheduler), 'Asking to reschedule: ~p', [Data]), 576 catch(http:schedule_workers(Data), 577 Error, 578 print_message(error, Error)) 579 -> true 580 ; true 581 ). 582 583enough(0, _). 584enough(1, keep_alive). % I will be ready myself
accept
for a new connection or keep_alive
if a
worker tries to reschedule itself.
Note that, when called with reason:accept
, we are called in
the time critical main accept loop. An implementation of this
hook shall typically send the event to thread dedicated to
dynamic worker-pool management.
614 /******************************* 615 * WORKER QUEUE OPERATIONS * 616 *******************************/
623create_workers(Options) :- 624 option(workers(N), Options, 5), 625 option(queue(Queue), Options), 626 catch(message_queue_create(Queue), _, true), 627 atom_concat(Queue, '_', AliasBase), 628 create_workers(1, N, Queue, AliasBase, Options), 629 assert(queue_options(Queue, Options)). 630 631create_workers(I, N, _, _, _) :- 632 I > N, 633 !. 634create_workers(I, N, Queue, AliasBase, Options) :- 635 gensym(AliasBase, Alias), 636 thread_create(http_worker(Options), Id, 637 [ alias(Alias) 638 | Options 639 ]), 640 assertz(queue_worker(Queue, Id)), 641 I2 is I + 1, 642 create_workers(I2, N, Queue, AliasBase, Options).
650resize_pool(Queue, Size) :-
651 findall(W, queue_worker(Queue, W), Workers),
652 length(Workers, Now),
653 ( Now < Size
654 -> queue_options(Queue, Options),
655 atom_concat(Queue, '_', AliasBase),
656 I0 is Now+1,
657 create_workers(I0, Size, Queue, AliasBase, Options)
658 ; Now == Size
659 -> true
660 ; Now > Size
661 -> Excess is Now - Size,
662 thread_self(Me),
663 forall(between(1, Excess, _), thread_send_message(Queue, quit(Me))),
664 forall(between(1, Excess, _), thread_get_message(quitted(_)))
665 ).
If the message quit(Sender)
is read from the queue, the worker
stops.
676http_worker(Options) :- 677 debug(http(scheduler), 'New worker', []), 678 prolog_listen(this_thread_exit, done_worker), 679 option(queue(Queue), Options), 680 option(max_idle_time(MaxIdle), Options, infinite), 681 thread_repeat_wait(get_work(Queue, Message, MaxIdle)), 682 debug(http(worker), 'Waiting for a job ...', []), 683 debug(http(worker), 'Got job ~p', [Message]), 684 ( Message = quit(Sender) 685 -> !, 686 thread_self(Self), 687 thread_detach(Self), 688 ( Sender == idle 689 -> true 690 ; retract(queue_worker(Queue, Self)), 691 thread_send_message(Sender, quitted(Self)) 692 ) 693 ; open_client(Message, Queue, Goal, In, Out, 694 Options, ClientOptions), 695 ( catch(http_process(Goal, In, Out, ClientOptions), 696 Error, true) 697 -> true 698 ; Error = goal_failed(http_process/4) 699 ), 700 ( var(Error) 701 -> fail 702 ; current_message_level(Error, Level), 703 print_message(Level, Error), 704 memberchk(peer(Peer), ClientOptions), 705 close_connection(Peer, In, Out), 706 fail 707 ) 708 ). 709 710get_work(Queue, Message, infinite) :- 711 !, 712 thread_get_message(Queue, Message). 713get_work(Queue, Message, MaxIdle) :- 714 ( thread_get_message(Queue, Message, [timeout(MaxIdle)]) 715 -> true 716 ; Message = quit(idle) 717 ).
726open_client(requeue(In, Out, Goal, ClOpts), 727 _, Goal, In, Out, Opts, ClOpts) :- 728 !, 729 memberchk(peer(Peer), ClOpts), 730 option(keep_alive_timeout(KeepAliveTMO), Opts, 2), 731 check_keep_alive_connection(In, KeepAliveTMO, Peer, In, Out). 732open_client(Message, Queue, Goal, In, Out, Opts, 733 [ pool(client(Queue, Goal, In, Out)), 734 timeout(Timeout) 735 | Options 736 ]) :- 737 catch(open_client(Message, Goal, In, Out, Options, Opts), 738 E, report_error(E)), 739 option(timeout(Timeout), Opts, 60), 740 ( debugging(http(connection)) 741 -> memberchk(peer(Peer), Options), 742 debug(http(connection), 'Opened connection from ~p', [Peer]) 743 ; true 744 ).
750open_client(Message, Goal, In, Out, ClientOptions, Options) :- 751 open_client_hook(Message, Goal, In, Out, ClientOptions, Options), 752 !. 753open_client(tcp_client(Socket, Goal, Peer), Goal, In, Out, 754 [ peer(Peer), 755 protocol(http) 756 ], _) :- 757 tcp_open_socket(Socket, In, Out). 758 759report_error(E) :- 760 print_message(error, E), 761 fail.
770check_keep_alive_connection(In, TMO, Peer, In, Out) :-
771 stream_property(In, timeout(Old)),
772 set_stream(In, timeout(TMO)),
773 debug(http(keep_alive), 'Waiting for keep-alive ...', []),
774 catch(peek_code(In, Code), E, true),
775 ( var(E), % no exception
776 Code \== -1 % no end-of-file
777 -> set_stream(In, timeout(Old)),
778 debug(http(keep_alive), '\tre-using keep-alive connection', [])
779 ; ( Code == -1
780 -> debug(http(keep_alive), '\tRemote closed keep-alive connection', [])
781 ; debug(http(keep_alive), '\tTimeout on keep-alive connection', [])
782 ),
783 close_connection(Peer, In, Out),
784 fail
785 ).
794done_worker :- 795 thread_self(Self), 796 thread_detach(Self), 797 retract(queue_worker(Queue, Self)), 798 thread_property(Self, status(Status)), 799 !, 800 ( catch(recreate_worker(Status, Queue), _, fail) 801 -> print_message(informational, 802 httpd_restarted_worker(Self)) 803 ; done_status_message_level(Status, Level), 804 print_message(Level, 805 httpd_stopped_worker(Self, Status)) 806 ). 807done_worker :- % received quit(Sender) 808 thread_self(Self), 809 thread_property(Self, status(Status)), 810 done_status_message_level(Status, Level), 811 print_message(Level, 812 httpd_stopped_worker(Self, Status)). 813 814done_status_message_level(true, silent) :- !. 815done_status_message_level(exception('$aborted'), silent) :- !. 816done_status_message_level(_, informational).
The first clause deals with the possibility that we cannot write to
user_error
. This is possible when Prolog is started as a service
using some service managers. Would be nice if we could write an
error, but where?
831recreate_worker(exception(error(io_error(write,user_error),_)), _Queue) :- 832 halt(2). 833recreate_worker(exception(Error), Queue) :- 834 recreate_on_error(Error), 835 queue_options(Queue, Options), 836 atom_concat(Queue, '_', AliasBase), 837 create_workers(1, 1, Queue, AliasBase, Options). 838 839recreate_on_error('$aborted'). 840recreate_on_error(time_limit_exceeded).
849:- multifile 850 message_level/2. 851 852message_level(error(io_error(read, _), _), silent). 853message_level(error(socket_error(epipe,_), _), silent). 854message_level(error(http_write_short(_Obj,_Written), _), silent). 855message_level(error(timeout_error(read, _), _), informational). 856message_level(keep_alive_timeout, silent). 857 858current_message_level(Term, Level) :- 859 ( message_level(Term, Level) 860 -> true 861 ; Level = error 862 ).
870http_requeue(Header) :- 871 requeue_header(Header, ClientOptions), 872 memberchk(pool(client(Queue, Goal, In, Out)), ClientOptions), 873 memberchk(peer(Peer), ClientOptions), 874 http_enough_workers(Queue, keep_alive, Peer), 875 thread_send_message(Queue, requeue(In, Out, Goal, ClientOptions)), 876 !. 877http_requeue(Header) :- 878 debug(http(error), 'Re-queue failed: ~p', [Header]), 879 fail. 880 881requeue_header([], []). 882requeue_header([H|T0], [H|T]) :- 883 requeue_keep(H), 884 !, 885 requeue_header(T0, T). 886requeue_header([_|T0], T) :- 887 requeue_header(T0, T). 888 889requeue_keep(pool(_)). 890requeue_keep(peer(_)). 891requeue_keep(protocol(_)).
898http_process(Goal, In, Out, Options) :- 899 debug(http(server), 'Running server goal ~p on ~p -> ~p', 900 [Goal, In, Out]), 901 option(timeout(TMO), Options, 60), 902 set_stream(In, timeout(TMO)), 903 set_stream(Out, timeout(TMO)), 904 http_wrapper(Goal, In, Out, Connection, 905 [ request(Request) 906 | Options 907 ]), 908 next(Connection, Request). 909 910next(Connection, Request) :- 911 next_(Connection, Request), !. 912next(Connection, Request) :- 913 print_message(warning, goal_failed(next(Connection,Request))). 914 915next_(switch_protocol(SwitchGoal, _SwitchOptions), Request) :- 916 !, 917 memberchk(pool(client(_Queue, _Goal, In, Out)), Request), 918 ( catch(call(SwitchGoal, In, Out), E, 919 ( print_message(error, E), 920 fail)) 921 -> true 922 ; http_close_connection(Request) 923 ). 924next_(spawned(ThreadId), _) :- 925 !, 926 debug(http(spawn), 'Handler spawned to thread ~w', [ThreadId]). 927next_(Connection, Request) :- 928 downcase_atom(Connection, 'keep-alive'), 929 http_requeue(Request), 930 !. 931next_(_, Request) :- 932 http_close_connection(Request).
939http_close_connection(Request) :-
940 memberchk(pool(client(_Queue, _Goal, In, Out)), Request),
941 memberchk(peer(Peer), Request),
942 close_connection(Peer, In, Out).
949close_connection(Peer, In, Out) :-
950 debug(http(connection), 'Closing connection from ~p', [Peer]),
951 catch(close(In, [force(true)]), _, true),
952 catch(close(Out, [force(true)]), _, true).
If a pool does not exist, this predicate calls the multifile hook http:create_pool/1 to create it. If this predicate succeeds the operation is retried.
970http_spawn(Goal, Options) :- 971 select_option(pool(Pool), Options, ThreadOptions), 972 !, 973 current_output(CGI), 974 catch(thread_create_in_pool(Pool, 975 wrap_spawned(CGI, Goal), Id, 976 [ detached(true) 977 | ThreadOptions 978 ]), 979 Error, 980 true), 981 ( var(Error) 982 -> http_spawned(Id) 983 ; Error = error(resource_error(threads_in_pool(_)), _) 984 -> throw(http_reply(busy)) 985 ; Error = error(existence_error(thread_pool, Pool), _), 986 create_pool(Pool) 987 -> http_spawn(Goal, Options) 988 ; throw(Error) 989 ). 990http_spawn(Goal, Options) :- 991 current_output(CGI), 992 thread_create(wrap_spawned(CGI, Goal), Id, 993 [ detached(true) 994 | Options 995 ]), 996 http_spawned(Id). 997 998wrap_spawned(CGI, Goal) :- 999 set_output(CGI), 1000 http_wrap_spawned(Goal, Request, Connection), 1001 next(Connection, Request).
1011create_pool(Pool) :- 1012 E = error(permission_error(create, thread_pool, Pool), _), 1013 catch(http:create_pool(Pool), E, true). 1014create_pool(Pool) :- 1015 print_message(informational, httpd(created_pool(Pool))), 1016 thread_pool_create(Pool, 10, []). 1017 1018 1019 /******************************* 1020 * WAIT POLICIES * 1021 *******************************/ 1022 1023:- meta_predicate 1024 thread_repeat_wait( ).
repeat, thread_idle(Goal)
, choosing whether to use a
long
or short
idle time based on the average firing rate.1031thread_repeat_wait(Goal) :- 1032 new_rate_mma(5, 1000, State), 1033 repeat, 1034 update_rate_mma(State, MMA), 1035 long(MMA, IsLong), 1036 ( IsLong == brief 1037 -> call(Goal) 1038 ; thread_idle(Goal, IsLong) 1039 ). 1040 1041long(MMA, brief) :- 1042 MMA < 0.05, 1043 !. 1044long(MMA, short) :- 1045 MMA < 1, 1046 !. 1047long(_, long).
1061new_rate_mma(N, Resolution, rstate(Base, 0, MaxI, Resolution, N, 0)) :- 1062 current_prolog_flag(max_tagged_integer, MaxI), 1063 get_time(Base). 1064 1065update_rate_mma(State, MMAr) :- 1066 State = rstate(Base, Last, MaxI, Resolution, N, MMA0), 1067 get_time(Now), 1068 Stamp is round((Now-Base)*Resolution), 1069 ( Stamp > MaxI 1070 -> nb_setarg(1, State, Now), 1071 nb_setarg(2, State, 0) 1072 ; true 1073 ), 1074 Diff is Stamp-Last, 1075 nb_setarg(2, State, Stamp), 1076 MMA is round(((N-1)*MMA0+Diff)/N), 1077 nb_setarg(6, State, MMA), 1078 MMAr is MMA/float(Resolution). 1079 1080 1081 /******************************* 1082 * MESSAGES * 1083 *******************************/ 1084 1085:- multifile 1086 prolog:message/3. 1087 1088prologmessage(httpd_started_server(Port, Options)) --> 1089 [ 'Started server at '-[] ], 1090 http_root(Port, Options). 1091prologmessage(httpd_stopped_worker(Self, Status)) --> 1092 [ 'Stopped worker ~p: ~p'-[Self, Status] ]. 1093prologmessage(httpd_restarted_worker(Self)) --> 1094 [ 'Replaced aborted worker ~p'-[Self] ]. 1095prologmessage(httpd(created_pool(Pool))) --> 1096 [ 'Created thread-pool ~p of size 10'-[Pool], nl, 1097 'Create this pool at startup-time or define the hook ', nl, 1098 'http:create_pool/1 to avoid this message and create a ', nl, 1099 'pool that fits the usage-profile.' 1100 ]. 1101 1102http_root(Address, Options) --> 1103 { landing_page(Address, URI, Options) }, 1104 [ '~w'-[URI] ]. 1105 1106landing_page(Host:Port, URI, Options) :- 1107 !, 1108 must_be(atom, Host), 1109 must_be(integer, Port), 1110 http_server_property(Port, scheme(Scheme)), 1111 ( default_port(Scheme, Port) 1112 -> format(atom(Base), '~w://~w', [Scheme, Host]) 1113 ; format(atom(Base), '~w://~w:~w', [Scheme, Host, Port]) 1114 ), 1115 entry_page(Base, URI, Options). 1116landing_page(unix_socket(Path), URI, _Options) :- 1117 !, 1118 format(string(URI), 'Unix domain socket "~w"', [Path]). 1119landing_page(Port, URI, Options) :- 1120 landing_page(localhost:Port, URI, Options). 1121 1122default_port(http, 80). 1123default_port(https, 443). 1124 1125entry_page(Base, URI, Options) :- 1126 option(entry_page(Entry), Options), 1127 !, 1128 uri_resolve(Entry, Base, URI). 1129entry_page(Base, URI, _) :- 1130 http_absolute_location(root(.), Entry, []), 1131 uri_resolve(Entry, Base, URI)
Threaded HTTP server
Most code doesn't need to use this directly; instead use library(http/http_server), which combines this library with the typical HTTP libraries that most servers need.
This library defines the HTTP server frontend of choice for SWI-Prolog. It is based on the multi-threading capabilities of SWI-Prolog and thus exploits multiple cores to serve requests concurrently. The server scales well and can cooperate with library(thread_pool) to control the number of concurrent requests of a given type. For example, it can be configured to handle 200 file download requests concurrently, 2 requests that potentially uses a lot of memory and 8 requests that use a lot of CPU resources.
On Unix systems, this library can be combined with library(http/http_unix_daemon) to realise a proper Unix service process that creates a web server at port 80, runs under a specific account, optionally detaches from the controlling terminal, etc.
Combined with library(http/http_ssl_plugin) from the SSL package, this library can be used to create an HTTPS server. See <plbase>/doc/packages/examples/ssl/https for an example server using a self-signed SSL certificate. */