1/* Part of SWI-Prolog 2 3 Author: Jan Wielemaker 4 E-mail: J.Wielemaker@vu.nl 5 WWW: http://www.swi-prolog.org 6 Copyright (c) 2014-2020, VU University Amsterdam 7 CWI Amsterdam 8 All rights reserved. 9 10 Redistribution and use in source and binary forms, with or without 11 modification, are permitted provided that the following conditions 12 are met: 13 14 1. Redistributions of source code must retain the above copyright 15 notice, this list of conditions and the following disclaimer. 16 17 2. Redistributions in binary form must reproduce the above copyright 18 notice, this list of conditions and the following disclaimer in 19 the documentation and/or other materials provided with the 20 distribution. 21 22 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 23 "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 24 LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 25 FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 26 COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 27 INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 28 BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 29 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 30 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 31 LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 32 ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 33 POSSIBILITY OF SUCH DAMAGE. 34*/ 35 36:- module(hub, 37 [ hub_create/3, % +HubName, -Hub, +Options 38 hub_add/3, % +HubName, +Websocket, ?Id 39 hub_member/2, % +HubName, ?Id 40 hub_send/2, % +ClientId, +Message 41 hub_broadcast/2, % +HubName, +Message 42 hub_broadcast/3, % +HubName, +Message, +Condition 43 current_hub/2 % ?HubName, ?Hub 44 ]). 45:- use_module(library(debug)). 46:- use_module(library(error)). 47:- use_module(library(apply)). 48:- use_module(library(gensym)). 49:- if(exists_source(library(uuid))). 50:- use_module(library(uuid)). 51:- endif. 52:- use_module(library(ordsets)). 53:- use_module(library(http/websocket)). 54 55:- meta_predicate 56 hub_broadcast( , , ).
95:- dynamic 96 hub/2, % Hub, Queues ... 97 websocket/5. % Hub, Socket, Queue, Lock, Id 98 99:- volatile hub/2, websocket/5.
thread(s)
can listen.After creating a hub, the application normally creates a thread that listens to Hub.queues.event and exposes some mechanisms to establish websockets and add them to the hub using hub_add/3.
118hub_create(HubName, Hub, _Options) :-
119 must_be(atom, HubName),
120 message_queue_create(WaitQueue),
121 message_queue_create(ReadyQueue),
122 message_queue_create(EventQueue),
123 message_queue_create(BroadcastQueue),
124 Hub = hub{name:HubName,
125 queues:_{wait:WaitQueue,
126 ready:ReadyQueue,
127 event:EventQueue,
128 broadcast:BroadcastQueue
129 }},
130 assertz(hub(HubName, Hub)).
137current_hub(HubName, Hub) :- 138 hub(HubName, Hub). 139 140 141 /******************************* 142 * WAITERS * 143 *******************************/ 144 145/* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - 146The task of this layer is to wait for (a potentially large number of) 147websockets. Whenever there is data on one of these sockets, the socket 148is handed to Hub.queues.ready. This is realised using wait_for_input/3, 149which allows a single thread to wait for many sockets. But ... on 150Windows it allows to wait for at most 64 sockets. In addition, there is 151no way to add an additional input for control messages because Windows 152select() can only wait for sockets. On Unix we could use pipe/2 to add 153the control channal. On Windows we would need an additional network 154service, giving rise its own problems with allocation, firewalls and 155security. 156 157So, instead we keep a queue of websockets that need to be waited for. 158Whenever we add a websocket, we create a waiter thread that will 159typically start waiting for this socket. In addition, we schedule any 160waiting thread that has less than the maximum number of sockets to 161timeout at as good as we can the same time. All of them will hunt for 162the same set of queues, but they have to wait for each other and 163therefore most of the time one thread will walk away with all websockets 164and the others commit suicide because there is nothing to wait for. 165- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */ 166 167:- meta_predicate 168 hub_thread( , , ).
175hub_add(HubName, WebSocket, Id) :-
176 must_be(atom, HubName),
177 hub(HubName, Hub),
178 ( var(Id)
179 -> uuid(Id)
180 ; true
181 ),
182 message_queue_create(OutputQueue),
183 mutex_create(Lock),
184 % asserta/1 allows for reuse of Id
185 asserta(websocket(HubName, WebSocket, OutputQueue, Lock, Id)),
186 thread_send_message(Hub.queues.wait, WebSocket),
187 thread_send_message(Hub.queues.event,
188 hub{joined:Id}),
189 debug(hub(gate), 'Joined ~w: ~w', [HubName, Id]),
190 create_wait_thread(Hub).
196hub_member(HubName, Id) :- 197 websocket(HubName, _WebSocket, _OutputQueue, _Lock, Id). 198 199:- if(\+current_predicate(uuid/1)). 200% FIXME: Proper pure Prolog random UUID implementation 201uuid(UUID) :- 202 A is random(1<<63), 203 format(atom(UUID), '~d', [A]). 204:- endif. 205 206create_wait_thread(Hub) :- 207 hub_thread(wait_for_sockets(Hub), Hub, hub_wait_). 208 209wait_for_sockets(Hub) :- 210 wait_for_sockets(Hub, 64). 211 212wait_for_sockets(Hub, Max) :- 213 Queues = Hub.queues, 214 repeat, 215 get_messages(Queues.wait, Max, List), 216 ( List \== [] 217 -> create_new_waiter_if_needed(Hub), 218 sort(List, Set), 219 ( debugging(hub(wait)) 220 -> length(Set, Len), 221 debug(hub(wait), 'Waiting for ~d queues', [Len]) 222 ; true 223 ), 224 wait_for_set(Set, Left, ReadySet, Max), 225 ( ReadySet \== [] 226 -> debug(hub(ready), 'Data on ~p', [ReadySet]), 227 Ready = Queues.ready, 228 maplist(thread_send_message(Ready), ReadySet), 229 create_reader_threads(Hub), 230 ord_subtract(Set, ReadySet, NotReadySet) 231 ; NotReadySet = Left % timeout 232 ), 233 ( NotReadySet \== [] 234 -> debug(hub(wait), 'Re-scheduling: ~p', [NotReadySet]), 235 Wait = Queues.wait, 236 maplist(thread_send_message(Wait), NotReadySet), 237 fail 238 ; true 239 ) 240 ; ! 241 ). 242 243create_new_waiter_if_needed(Hub) :- 244 message_queue_property(Hub.queues.wait, size(0)), 245 !. 246create_new_waiter_if_needed(Hub) :- 247 create_wait_thread(Hub).
254wait_for_set([], [], [], _) :- 255 !. 256wait_for_set(Set0, Set, ReadySet, Max) :- 257 wait_timeout(Set0, Max, Timeout), 258 catch(wait_for_input(Set0, ReadySet, Timeout), 259 error(existence_error(stream, S), _), true), 260 ( var(S) 261 -> Set = Set0 262 ; delete(Set0, S, Set1), 263 wait_for_set(Set1, Set, ReadySet, Max) 264 ).
273:- dynamic 274 scheduled_timeout/1. 275 276wait_timeout(List, Max, Timeout) :- 277 length(List, Max), 278 !, 279 Timeout = infinite. 280wait_timeout(_, _, Timeout) :- 281 get_time(Now), 282 ( scheduled_timeout(SchedAt) 283 -> ( SchedAt > Now 284 -> At = SchedAt 285 ; retractall(scheduled_timeout(_)), 286 At is ceiling(Now) + 1, 287 asserta(scheduled_timeout(At)) 288 ) 289 ; At is ceiling(Now) + 1, 290 asserta(scheduled_timeout(At)) 291 ), 292 Timeout is At - Now.
302get_messages(Q, N, List) :- 303 with_mutex(hub_wait, 304 get_messages_sync(Q, N, List)). 305 306get_messages_sync(Q, N, [H|T]) :- 307 succ(N2, N), 308 thread_get_message(Q, H, [timeout(0.01)]), 309 !, 310 get_messages_sync(Q, N2, T). 311get_messages_sync(_, _, []). 312 313 314 /******************************* 315 * READERS * 316 *******************************/ 317 318/* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - 319The next layer consists of `readers'. Whenever one or more websockets 320have data, the socket is added to Hub.queues.ready and 321create_reader_threads/1 is called. This examines the number of ready 322sockets and fires a number of threads to handle the read requests. 323Multiple threads are mainly needed for the case that a client signals to 324be ready, but only provides an incomplete message, causing the 325ws_receive/2 to block. 326 327Each of the threads reads the next message and sends this to 328Hub.queues.event. The websocket is then rescheduled to listen for new 329events. This read either fires a thread to listen for the new waiting 330socket using create_wait_thread/1 or, if there are no more websockets, 331does this job itself. This deals with the common scenario that one 332client wakes up, starts a thread to read its event and waits for new 333messages on the same websockets. 334- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */ 335 336create_reader_threads(Hub) :- 337 message_queue_property(Hub.queues.ready, size(Ready)), 338 Threads is ceiling(sqrt(Ready)), 339 forall(between(1, Threads, _), 340 create_reader_thread(Hub)). 341 342create_reader_thread(Hub) :- 343 hub_thread(read_message(Hub), Hub, hub_read_ws_). 344 345read_message(Hub) :- 346 Queues = Hub.queues, 347 thread_get_message(Queues.ready, WS, [timeout(0)]), 348 !, 349 catch(ws_receive(WS, Message), Error, true), 350 ( var(Error), 351 websocket(HubName, WS, _, _, Id) 352 -> ( Message.get(opcode) == close 353 -> close_client(WS, Message) 354 ; Event = Message.put(_{client:Id, hub:HubName}), 355 debug(hub(event), 'Event: ~p', [Event]), 356 thread_send_message(Queues.event, Event), 357 ( Message.get(opcode) == close 358 -> CloseError = error(_,_), 359 catch(ws_close(WS, 1000, ""), CloseError, 360 ws_warning(CloseError)) 361 ; thread_send_message(Queues.wait, WS) 362 ), 363 ( message_queue_property(Queues.ready, size(0)) 364 -> !, 365 wait_for_sockets(Hub) 366 ; create_wait_thread(Hub), 367 read_message(Hub) 368 ) 369 ) 370 ; websocket(_, WS, _, _, _) 371 -> io_read_error(WS, Error), 372 read_message(Hub) 373 ; read_message(Hub) % already destroyed 374 ). 375read_message(_). 376 377ws_warning(error(Formal, _)) :- 378 silent(Formal), 379 !. 380ws_warning(Error) :- 381 print_message(warning, Error). 382 383silent(socket_error(epipe, _)).
392io_read_error(WebSocket, Error) :- 393 debug(hub(gate), 'Got read error on ~w: ~p', 394 [WebSocket, Error]), 395 retract(websocket(HubName, WebSocket, _Queue, _Lock, Id)), 396 !, 397 E = error(_,_), 398 catch(ws_close(WebSocket, 1011, Error), E, 399 ws_warning(E)), 400 hub(HubName, Hub), 401 thread_send_message(Hub.queues.event, 402 hub{left:Id, 403 hub:HubName, 404 reason:read, 405 error:Error}). 406io_read_error(_, _). % already considered gone 407 408close_client(WebSocket, Message) :- 409 Message.get(data) == end_of_file, 410 !, 411 io_read_error(WebSocket, end_of_file). 412close_client(WebSocket, Message) :- 413 retract(websocket(HubName, WebSocket, _Queue, _Lock, Id)), 414 !, 415 E = error(_,_), 416 catch(ws_close(WebSocket, 1000, "Bye"), E, 417 ws_warning(E)), 418 hub(HubName, Hub), 419 thread_send_message(Hub.queues.event, 420 hub{left:Id, 421 hub:HubName, 422 reason:close, 423 data:Message.data 424 }).
left
message and pass the error
such that the client can re-send it when appropriate.433io_write_error(WebSocket, Message, Error) :- 434 debug(hub(gate), 'Got write error on ~w: ~p', 435 [WebSocket, Error]), 436 retract(websocket(HubName, WebSocket, _Queue, _Lock, Id)), 437 !, 438 catch(ws_close(WebSocket, 1011, Error), _, true), 439 ( websocket(_, _, _, _, Id) 440 -> true 441 ; hub(HubName, Hub), 442 thread_send_message(Hub.queues.event, 443 hub{left:Id, 444 hub:HubName, 445 reason:write(Message), 446 error:Error}) 447 ). 448io_write_error(_, _, _). % already considered gone 449 450 451 /******************************* 452 * SENDING MESSAGES * 453 *******************************/ 454 455/* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - 456My initial thought about sending messages was to add a tuple 457WebSocket-Message to an output queue and have a dynamic number of 458threads sending these messages to the websockets. But, it is desirable 459that, if multiple messages are sent to a particular client, they arrive 460in this order. As multiple threads are performing this task, this is not 461easy to guarantee. Therefore, we create an output queue and a mutex for 462each client. An output thread will walk along the websockets, looking 463for one that has pending messages. It then grabs the lock associated 464with the client and sends all waiting output messages. 465 466The price is that we might peek a significant number of message queues 467before we find one that contains messages. If this proves to be a 468significant problem, we could maintain a queue of queues holding 469messages. 470- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
480hub_send(ClientId, Message) :- 481 websocket(HubName, _WS, ClientQueue, _Lock, ClientId), 482 hub(HubName, Hub), 483 ( is_list(Message) 484 -> maplist(queue_output(ClientQueue), Message) 485 ; queue_output(ClientQueue, Message) 486 ), 487 create_output_thread(Hub, ClientQueue). 488 489create_output_thread(Hub, Queue) :- 490 hub_thread(broadcast_from_queue(Queue, [timeout(0)]), 491 Hub, hub_out_q_).
call(Condition, Id)
succeeds. Note that this process is
asynchronous: this predicate returns immediately after putting
all requests in a broadcast queue. If a message cannot be
delivered due to a network error, the hub is informed through
io_error/3.503hub_broadcast(HubName, Message) :- 504 hub_broadcast(HubName, Message, all). 505 506all(_). 507 508hub_broadcast(HubName, Message, Condition) :- 509 must_be(atom, HubName), 510 hub(HubName, Hub), 511 State = count(0), 512 forall(( websocket(HubName, _WS, ClientQueue, _Lock, Id), 513 call(Condition, Id) 514 ), 515 ( queue_output(ClientQueue, Message), 516 inc_count(State) 517 )), 518 State = count(Count), 519 create_broadcast_threads(Hub, Count). 520 521queue_output(Queue, Message) :- 522 thread_send_message(Queue, Message). 523 524inc_count(State) :- 525 arg(1, State, C0), 526 C1 is C0+1, 527 nb_setarg(1, State, C1). 528 529create_broadcast_threads(Hub, Count) :- 530 Threads is ceiling(sqrt(Count)), 531 forall(between(1, Threads, _), 532 create_broadcast_thread(Hub)). 533 534create_broadcast_thread(Hub) :- 535 current_prolog_flag(threads, true), 536 !, 537 hub_thread(broadcast_from_queues(Hub, [timeout(0)]), 538 Hub, hub_out_all_). 539create_broadcast_thread(_). % we are shutting down
545broadcast_from_queues(Hub, Options) :-
546 forall(websocket(Hub.name, _WebSocket, Queue, _Lock, _Id),
547 broadcast_from_queue(Queue, Options)).
559broadcast_from_queue(Queue, _Options) :- 560 message_queue_property(Queue, size(0)), 561 !. 562broadcast_from_queue(Queue, Options) :- 563 websocket(_Hub, _WebSocket, Queue, Lock, _Id), 564 !, 565 ( setup_call_cleanup( 566 mutex_trylock(Lock), 567 broadcast_from_queue_sync(Queue, Options), 568 mutex_unlock(Lock)) 569 -> true 570 ; true 571 ). 572broadcast_from_queue(_, _). 573 574% Note that we re-fetch websocket/5, such that we terminate if something 575% closed the websocket. 576 577broadcast_from_queue_sync(Queue, Options) :- 578 repeat, 579 ( websocket(_Hub, WebSocket, Queue, _Lock, _Id), 580 thread_get_message(Queue, Message, Options) 581 -> debug(hub(broadcast), 582 'To: ~p messages: ~p', [WebSocket, Message]), 583 catch(ws_send(WebSocket, Message), E, 584 io_write_error(WebSocket, Message, E)), 585 fail 586 ; ! 587 ).
hub(thread)
is enabled.594hub_thread(Goal, _, Task) :- 595 debugging(hub(thread)), 596 !, 597 gensym(Task, Alias), 598 thread_create(Goal, _, [detached(true), alias(Alias)]). 599hub_thread(Goal, _, _) :- 600 thread_create(Goal, _, [detached(true)])
Manage a hub for websockets
This library manages a hub that consists of clients that are connected using a websocket. Messages arriving at any of the websockets are sent to the event queue of the hub. In addition, the hub provides a broadcast interface. A typical usage scenario for a hub is a chat server A scenario for realizing an chat server is:
read
orwrite
and Error is the Prolog I/O exception.The
thread(s)
can talk to clients using two predicates:A hub consists of (currenty) four message queues and a simple dynamic fact. Threads that are needed for the communication tasks are created on demand and die if no more work needs to be done.