1/* Part of SWI-Prolog 2 3 Author: Jeffrey Rosenwald and Jan Wielemaker 4 E-mail: jeffrose@acm.org 5 WWW: http://www.swi-prolog.org 6 Copyright (c) 2012-2013, Jeffrey Rosenwald 7 2018-2020, CWI Amsterdam 8 All rights reserved. 9 10 Redistribution and use in source and binary forms, with or without 11 modification, are permitted provided that the following conditions 12 are met: 13 14 1. Redistributions of source code must retain the above copyright 15 notice, this list of conditions and the following disclaimer. 16 17 2. Redistributions in binary form must reproduce the above copyright 18 notice, this list of conditions and the following disclaimer in 19 the documentation and/or other materials provided with the 20 distribution. 21 22 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 23 "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 24 LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 25 FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 26 COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 27 INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 28 BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 29 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 30 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 31 LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 32 ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 33 POSSIBILITY OF SUCH DAMAGE. 34*/ 35 36:- module(udp_broadcast, 37 [ udp_broadcast_initialize/2, % +IPAddress, +Options 38 udp_broadcast_close/1, % +Scope 39 40 udp_peer_add/2, % +Scope, +IP 41 udp_peer_del/2, % +Scope, ?IP 42 udp_peer/2 % +Scope, -IP 43 ]). 44:- autoload(library(apply),[maplist/2,maplist/3]). 45:- autoload(library(backcomp),[thread_at_exit/1]). 46:- autoload(library(broadcast), 47 [broadcast_request/1,broadcast/1,listening/3,listen/3]). 48:- use_module(library(debug),[debug/3]). 49:- autoload(library(error), 50 [must_be/2,syntax_error/1,domain_error/2,existence_error/2]). 51:- autoload(library(option),[option/3]). 52:- autoload(library(socket), 53 [ tcp_close_socket/1, 54 udp_socket/1, 55 tcp_bind/2, 56 tcp_getopt/2, 57 tcp_setopt/2, 58 udp_receive/4, 59 udp_send/4 60 ]). 61 62 63% :- debug(udp(broadcast)).
279:- multifile 280 udp_term_string_hook/3, % +Scope, ?Term, ?String 281 udp_unicast_join_hook/3, % +Scope, +From, +Data 282 black_list/1. % +Term 283 284:- meta_predicate 285 safely( ), 286 safely_det( ). 287 288safely(Predicate) :- 289 Err = error(_,_), 290 catch(Predicate, Err, 291 print_message_fail(Err)). 292 293safely_det(Predicate) :- 294 Err = error(_,_), 295 catch(Predicate, Err, 296 print_message_fail(Err)), 297 !. 298safely_det(_). 299 300print_message_fail(Term) :- 301 print_message(error, Term), 302 fail. 303 304udp_broadcast_address(IPAddress, Subnet, BroadcastAddress) :- 305 IPAddress = ip(A1, A2, A3, A4), 306 Subnet = ip(S1, S2, S3, S4), 307 BroadcastAddress = ip(B1, B2, B3, B4), 308 309 B1 is A1 \/ (S1 xor 255), 310 B2 is A2 \/ (S2 xor 255), 311 B3 is A3 \/ (S3 xor 255), 312 B4 is A4 \/ (S4 xor 255).
udp_subnet
.321:- dynamic 322 udp_scope/2, 323 udp_scope_peer/2. 324:- volatile 325 udp_scope/2, 326 udp_scope_peer/2. 327% 328% Here's a UDP proxy to Prolog's broadcast library 329% 330% A sender may extend a broadcast to a subnet of a UDP network by 331% specifying a =|udp_subnet|= scoping qualifier in his/her broadcast. 332% The qualifier has the effect of selecting the appropriate multi-cast 333% address for the transmission. Thus, the sender of the message has 334% control over the scope of his/her traffic on a per-message basis. 335% 336% All in-scope listeners receive the broadcast and simply rebroadcast 337% the message locally. All broadcast replies, if any, are sent directly 338% to the sender via the port-id that was received with the broadcast. 339% 340% Each listener exposes two UDP ports, a shared public port that is 341% bound to a well-known port number and a private port that uniquely 342% indentifies the listener. Broadcasts are received on the public port 343% and replies are sent on the private port. Directed broadcasts 344% (unicasts) are received on the private port and replies are sent on 345% the private port. 346 347% Thread 1 listens for directed traffic on the private port. 348% 349 350:- dynamic 351 udp_private_socket/3, % Port, Socket, FileNo 352 udp_public_socket/4, % Scope, Port, Socket, FileNo 353 udp_closed/1. % Scope 354 355udp_inbound_proxy(Master) :- 356 thread_at_exit(inbound_proxy_died), 357 make_private_socket, 358 thread_send_message(Master, udp_inbound_ready), 359 udp_inbound_proxy_loop. 360 361udp_inbound_proxy_loop :- 362 forall(udp_scope(Scope, ScopeData), 363 make_public_socket(ScopeData, Scope)), 364 retractall(udp_closed(_)), 365 findall(FileNo, udp_socket_file_no(FileNo), FileNos), 366 catch(dispatch_inbound(FileNos), 367 E, dispatch_exception(E)), 368 udp_inbound_proxy_loop. 369 370dispatch_exception(E) :- 371 E = error(_,_), 372 !, 373 print_message(warning, E). 374dispatch_exception(_).
384make_private_socket :- 385 udp_private_socket(_Port, S, _F), 386 !, 387 ( ( udp_scope(Scope, broadcast(_,_,_)) 388 ; udp_scope(Scope, multicast(_,_)) 389 ), 390 \+ udp_closed(Scope) 391 -> true 392 ; tcp_close_socket(S), 393 retractall(udp_private_socket(_,_,_)) 394 ). 395make_private_socket :- 396 udp_scope(_, broadcast(_,_,_)), 397 !, 398 udp_socket(S), 399 tcp_bind(S, Port), 400 tcp_getopt(S, file_no(F)), 401 tcp_setopt(S, broadcast), 402 assertz(udp_private_socket(Port, S, F)). 403make_private_socket :- 404 udp_scope(_, multicast(_,_)), 405 !, 406 udp_socket(S), 407 tcp_bind(S, Port), 408 tcp_getopt(S, file_no(F)), 409 assertz(udp_private_socket(Port, S, F)). 410make_private_socket.
416make_public_socket(_, Scope) :- 417 udp_public_socket(Scope, _Port, S, _), 418 !, 419 ( udp_closed(Scope) 420 -> tcp_close_socket(S), 421 retractall(udp_public_socket(Scope, _, _, _)) 422 ; true 423 ). 424make_public_socket(broadcast(_SubNet, _Broadcast, Port), Scope) :- 425 udp_socket(S), 426 tcp_setopt(S, reuseaddr), 427 tcp_bind(S, Port), 428 tcp_getopt(S, file_no(F)), 429 assertz(udp_public_socket(Scope, Port, S, F)). 430make_public_socket(multicast(Group, Port), Scope) :- 431 udp_socket(S), 432 tcp_setopt(S, reuseaddr), 433 tcp_bind(S, Port), 434 tcp_setopt(S, ip_add_membership(Group)), 435 tcp_getopt(S, file_no(F)), 436 assertz(udp_public_socket(Scope, Port, S, F)). 437make_public_socket(unicast(Port), Scope) :- 438 udp_socket(S), 439 tcp_bind(S, Port), 440 tcp_getopt(S, file_no(F)), 441 assertz(udp_public_socket(Scope, Port, S, F)). 442 443udp_socket_file_no(FileNo) :- 444 udp_private_socket(_,_,FileNo). 445udp_socket_file_no(FileNo) :- 446 udp_public_socket(_,_,_,FileNo).
456dispatch_inbound(FileNos) :- 457 debug(udp(broadcast), 'Waiting for ~p', [FileNos]), 458 wait_for_input(FileNos, Ready, infinite), 459 debug(udp(broadcast), 'Ready: ~p', [Ready]), 460 maplist(dispatch_ready, Ready), 461 dispatch_inbound(FileNos). 462 463dispatch_ready(FileNo) :- 464 udp_private_socket(_Port, Private, FileNo), 465 !, 466 udp_receive(Private, Data, From, [max_message_size(65535)]), 467 debug(udp(broadcast), 'Inbound on private port', []), 468 ( in_scope(Scope, From), 469 udp_term_string(Scope, Term, Data) % only accept valid data 470 -> ld_dispatch(Private, Term, From, Scope) 471 ; true 472 ). 473dispatch_ready(FileNo) :- 474 udp_public_socket(Scope, _PublicPort, Public, FileNo), 475 !, 476 udp_receive(Public, Data, From, [max_message_size(65535)]), 477 debug(udp(broadcast), 'Inbound on public port from ~p for scope ~p', 478 [From, Scope]), 479 ( in_scope(Scope, From), 480 udp_term_string(Scope, Term, Data) % only accept valid data 481 -> ( udp_scope(Scope, unicast(_)) 482 -> ld_dispatch(Public, Term, From, Scope) 483 ; udp_private_socket(_PrivatePort, Private, _FileNo), 484 ld_dispatch(Private, Term, From, Scope) 485 ) 486 ; udp_scope(Scope, unicast(_)), 487 udp_term_string(Scope, Term, Data), 488 unicast_out_of_scope_request(Scope, From, Term) 489 -> true 490 ; true 491 ). 492 493in_scope(Scope, Address) :- 494 udp_scope(Scope, ScopeData), 495 in_scope(ScopeData, Scope, Address), 496 !. 497in_scope(Scope, From) :- 498 debug(udp(broadcast), 'Out-of-scope ~p datagram from ~p', 499 [Scope, From]), 500 fail. 501 502in_scope(broadcast(Subnet, Broadcast, _PublicPort), _Scope, IP:_FromPort) :- 503 udp_broadcast_address(IP, Subnet, Broadcast). 504in_scope(multicast(_Group, _Port), _Scope, _From). 505in_scope(unicast(_PublicPort), Scope, IP:_) :- 506 udp_peer(Scope, IP:_).
515ld_dispatch(_S, Term, From, _Scope) :- 516 debug(udp(broadcast), 'ld_dispatch(~p) from ~p', [Term, From]), 517 fail. 518ld_dispatch(_S, Term, _From, _Scope) :- 519 blacklisted(Term), !. 520ld_dispatch(S, request(Key, Term), From, Scope) :- 521 !, 522 forall(safely(broadcast_request(Term)), 523 safely((udp_term_string(Scope, reply(Key,Term), Message), 524 udp_send(S, Message, From, [])))). 525ld_dispatch(_S, send(Term), _From, _Scope) :- 526 !, 527 safely_det(broadcast(Term)). 528ld_dispatch(_S, reply(Key, Term), From, _Scope) :- 529 ( reply_queue(Key, Queue) 530 -> safely(thread_send_message(Queue, Term:From)) 531 ; true 532 ). 533 534blacklisted(send(Term)) :- black_list(Term). 535blacklisted(request(_,Term)) :- black_list(Term). 536blacklisted(reply(_,Term)) :- black_list(Term).
552reload_udp_proxy :- 553 reload_outbound_proxy, 554 reload_inbound_proxy. 555 556reload_outbound_proxy :- 557 listening(udp_broadcast, udp(_,_), _), 558 !. 559reload_outbound_proxy :- 560 listen(udp_broadcast, udp(Scope,Message), 561 udp_broadcast(Message, Scope, 0.25)), 562 listen(udp_broadcast, udp(Scope,Message,Timeout), 563 udp_broadcast(Message, Scope, Timeout)), 564 listen(udp_broadcast, udp_subnet(Message), % backward compatibility 565 udp_broadcast(Message, subnet, 0.25)), 566 listen(udp_broadcast, udp_subnet(Message,Timeout), 567 udp_broadcast(Message, subnet, Timeout)). 568 569reload_inbound_proxy :- 570 catch(thread_signal(udp_inbound_proxy, throw(udp_reload)), 571 error(existence_error(thread, _),_), 572 fail), 573 !. 574reload_inbound_proxy :- 575 thread_self(Me), 576 thread_create(udp_inbound_proxy(Me), _, 577 [ alias(udp_inbound_proxy), 578 detached(true) 579 ]), 580 thread_get_message(Me, udp_inbound_ready, [timeout(10)]). 581 582inbound_proxy_died :- 583 thread_self(Self), 584 thread_property(Self, status(Status)), 585 ( catch(recreate_proxy(Status), _, fail) 586 -> print_message(informational, 587 httpd_restarted_worker(Self)) 588 ; done_status_message_level(Status, Level), 589 print_message(Level, 590 httpd_stopped_worker(Self, Status)) 591 ). 592 593recreate_proxy(exception(Error)) :- 594 recreate_on_error(Error), 595 reload_inbound_proxy. 596 597recreate_on_error('$aborted'). % old 598recreate_on_error(unwind(abort)). 599recreate_on_error(time_limit_exceeded). 600 601done_status_message_level(true, silent) :- !. 602done_status_message_level(exception('$aborted'), silent) :- !. 603done_status_message_level(exception(unwind(abort)), silent) :- !. 604done_status_message_level(_, informational).
611udp_broadcast_close(Scope) :- 612 udp_scope(Scope, _ScopeData), 613 !, 614 assert(udp_closed(Scope)), 615 reload_udp_proxy. 616udp_broadcast_close(_).
Term:Address
to send Term to a specific address or query
the address from which term is answered or it is a plain Term.
If Term is nonground, it is considered is a request (see broadcast_request/1) and the predicate succeeds for each answer received within TimeOut seconds. If Term is ground it is considered an asynchronous broadcast and udp_broadcast/3 is deterministic.
630udp_broadcast(Term:To, Scope, _Timeout) :- 631 ground(Term), ground(To), % broadcast to single listener 632 !, 633 udp_basic_broadcast(send(Term), Scope, single(To)). 634udp_broadcast(Term, Scope, _Timeout) :- 635 ground(Term), % broadcast to all listeners 636 !, 637 udp_basic_broadcast(send(Term), Scope, broadcast). 638udp_broadcast(Term:To, Scope, Timeout) :- 639 ground(To), % request to single listener 640 !, 641 setup_call_cleanup( 642 request_queue(Id, Queue), 643 ( udp_basic_broadcast(request(Id, Term), Scope, single(To)), 644 udp_br_collect_replies(Queue, Timeout, Term:To) 645 ), 646 destroy_request_queue(Queue)). 647udp_broadcast(Term:From, Scope, Timeout) :- 648 !, % request to all listeners, collect sender 649 setup_call_cleanup( 650 request_queue(Id, Queue), 651 ( udp_basic_broadcast(request(Id, Term), Scope, broadcast), 652 udp_br_collect_replies(Queue, Timeout, Term:From) 653 ), 654 destroy_request_queue(Queue)). 655udp_broadcast(Term, Scope, Timeout) :- % request to all listeners 656 udp_broadcast(Term:_, Scope, Timeout). 657 658:- dynamic 659 reply_queue/2. 660 661request_queue(Id, Queue) :- 662 Id is random(1<<63), 663 message_queue_create(Queue), 664 asserta(reply_queue(Id, Queue)). 665 666destroy_request_queue(Queue) :- % leave queue to GC 667 retractall(reply_queue(_, Queue)).
This predicate succeeds with a choice point. Committing the choice point closes S.
680udp_basic_broadcast(Term, Scope, Dest) :- 681 debug(udp(broadcast), 'UDP proxy outbound ~p to ~p', [Term, Dest]), 682 udp_term_string(Scope, Term, String), 683 udp_send_message(Dest, String, Scope). 684 685udp_send_message(single(Address), String, Scope) :- 686 ( udp_scope(Scope, unicast(_)) 687 -> udp_public_socket(Scope, _Port, S, _) 688 ; udp_private_socket(_Port, S, _F) 689 ), 690 safely(udp_send(S, String, Address, [])). 691udp_send_message(broadcast, String, Scope) :- 692 ( udp_scope(Scope, unicast(_)) 693 -> udp_public_socket(Scope, _Port, S, _), 694 forall(udp_peer(Scope, Address), 695 ( debug(udp(broadcast), 'Unicast to ~p', [Address]), 696 safely(udp_send(S, String, Address, [])))) 697 ; udp_scope(Scope, broadcast(_SubNet, Broadcast, Port)) 698 -> udp_private_socket(_PrivatePort, S, _F), 699 udp_send(S, String, Broadcast:Port, []) 700 ; udp_scope(Scope, multicast(Group, Port)) 701 -> udp_private_socket(_PrivatePort, S, _F), 702 udp_send(S, String, Group:Port, []) 703 ). 704 705% ! udp_br_collect_replies(+Queue, +TimeOut, -TermAndFrom) is nondet. 706% 707% Collect replies on Socket for TimeOut seconds. Succeed for each 708% received message. 709 710udp_br_collect_replies(Queue, Timeout, Reply) :- 711 get_time(Start), 712 Deadline is Start+Timeout, 713 repeat, 714 ( thread_get_message(Queue, Reply, 715 [ deadline(Deadline) 716 ]) 717 -> true 718 ; !, 719 fail 720 ).
ip(A,B,C,D)
or an atom or string of the format A.B.C.D
. Options processed:
subnet
.For compatibility reasons Options may be the subnet mask.
749udp_broadcast_initialize(IP, Options) :- 750 with_mutex(udp_broadcast, 751 udp_broadcast_initialize_sync(IP, Options)). 752 753udp_broadcast_initialize_sync(IP, Options) :- 754 nonvar(Options), 755 Options = ip(_,_,_,_), 756 !, 757 udp_broadcast_initialize(IP, [subnet_mask(Options)]). 758udp_broadcast_initialize_sync(IP, Options) :- 759 to_ip4(IP, IPAddress), 760 option(method(Method), Options, broadcast), 761 must_be(oneof([broadcast, multicast, unicast]), Method), 762 udp_broadcast_initialize_sync(Method, IPAddress, Options), 763 reload_udp_proxy. 764 765udp_broadcast_initialize_sync(broadcast, IPAddress, Options) :- 766 option(subnet_mask(Subnet), Options, _), 767 mk_subnet(Subnet, IPAddress, Subnet4), 768 option(port(Port), Options, 20005), 769 option(scope(Scope), Options, subnet), 770 771 udp_broadcast_address(IPAddress, Subnet4, Broadcast), 772 udp_broadcast_close(Scope), 773 assertz(udp_scope(Scope, broadcast(Subnet4, Broadcast, Port))). 774udp_broadcast_initialize_sync(unicast, _IPAddress, Options) :- 775 option(port(Port), Options, 20005), 776 option(scope(Scope), Options, subnet), 777 udp_broadcast_close(Scope), 778 assertz(udp_scope(Scope, unicast(Port))). 779udp_broadcast_initialize_sync(multicast, IPAddress, Options) :- 780 option(port(Port), Options, 20005), 781 option(scope(Scope), Options, subnet), 782 udp_broadcast_close(Scope), 783 multicast_address(IPAddress), 784 assertz(udp_scope(Scope, multicast(IPAddress, Port))). 785 786to_ip4(Atomic, ip(A,B,C,D)) :- 787 atomic(Atomic), 788 !, 789 ( split_string(Atomic, ".", "", Strings), 790 maplist(number_string, [A,B,C,D], Strings) 791 -> true 792 ; syntax_error(illegal_ip_address) 793 ). 794to_ip4(IP, IP). 795 796mk_subnet(Var, IP, Subnet) :- 797 var(Var), 798 !, 799 ( default_subnet(IP, Subnet) 800 -> true 801 ; domain_error(ip_with_subnet, IP) 802 ). 803mk_subnet(Subnet, _, Subnet4) :- 804 to_ip4(Subnet, Subnet4).
813default_subnet(ip(A,_,_,_), ip(A,0,0,0)) :- 814 between(0,127, A), !. 815default_subnet(ip(A,B,_,_), ip(A,B,0,0)) :- 816 between(128,191, A), !. 817default_subnet(ip(A,B,C,_), ip(A,B,C,0)) :- 818 between(192,223, A), !. 819 820multicast_address(ip(A,_,_,_)) :- 821 between(224, 239, A), 822 !. 823multicast_address(IP) :- 824 domain_error(multicast_network, IP). 825 826 827 /******************************* 828 * UNICAST PEERS * 829 *******************************/
841udp_peer_add(Scope, Address) :- 842 must_be(ground, Address), 843 peer_address(Address, Scope, Canonical), 844 ( udp_scope_peer(Scope, Canonical) 845 -> true 846 ; assertz(udp_scope_peer(Scope, Canonical)) 847 ). 848 849udp_peer_del(Scope, Address) :- 850 peer_address(Address, Scope, Canonical), 851 retractall(udp_scope_peer(Scope, Canonical)). 852 853udp_peer(Scope, IPAddress) :- 854 udp_scope_peer(Scope, IPAddress). 855 856peer_address(IP:Port, _Scope, IPAddress:Port) :- 857 !, 858 to_ip4(IP, IPAddress). 859peer_address(IP, Scope, IPAddress:Port) :- 860 ( udp_scope(Scope, unicast(Port)) 861 -> true 862 ; existence_error(udp_scope, Scope) 863 ), 864 to_ip4(IP, IPAddress). 865 866 867 868 /******************************* 869 * HOOKS * 870 *******************************/
%prolog\n
, followed by the Prolog term in quoted notation while
ignoring operators. This hook may use alternative serialization such
as fast_term_serialized/2, use library(ssl) to realise encrypted
messages, etc.
In mode (+,-), Term is written with the options ignore_ops(true)
and
quoted(true)
.
This predicate first calls udp_term_string_hook/3.
910udp_term_string(Scope, Term, String) :- 911 catch(udp_term_string_hook(Scope, Term, String), udp(Error), true), 912 !, 913 ( var(Error) 914 -> true 915 ; Error == invalid_message 916 -> fail 917 ; throw(udp(Error)) 918 ). 919udp_term_string(_Scope, Term, String) :- 920 ( var(String) 921 -> format(string(String), '%-prolog-\n~W', 922 [ Term, 923 [ ignore_ops(true), 924 quoted(true) 925 ] 926 ]) 927 ; sub_string(String, 0, _, _, '%-prolog-\n'), 928 term_string(Term, String, 929 [ syntax_errors(quiet) 930 ]) 931 ).
This hook is intended to initiate a new node joining the network of
peers. We could in theory also omit the in-scope test and use a
normal broadcast to join. Using a different channal however provides
a basic level of security. A possibe implementation is below. The
first fragment is a hook added to the server, the second is a
predicate added to a client and the last initiates the request in
the client. The excanged term (join(X)
) can be used to exchange a
welcome handshake.
:- multifile udp_broadcast:udp_unicast_join_hook/3. udp_broadcast:udp_unicast_join_hook(Scope, From, join(welcome)) :- udp_peer_add(Scope, From),
join_request(Scope, Address, Reply) :- udp_peer_add(Scope, Address), broadcast_request(udp(Scope, join(X))).
?- join_request(myscope, "1.2.3.4":10001, Reply). Reply = welcome.
969unicast_out_of_scope_request(Scope, From, send(Term)) :- 970 udp_unicast_join_hook(Scope, From, Term). 971unicast_out_of_scope_request(Scope, From, request(Key, Term)) :- 972 udp_unicast_join_hook(Scope, From, Term), 973 udp_public_socket(Scope, _Port, Socket, _FileNo), 974 safely((udp_term_string(Scope, reply(Key,Term), Message), 975 udp_send(Socket, Message, From, [])))
A UDP broadcast proxy
SWI-Prolog's broadcast library provides a means that may be used to facilitate publish and subscribe communication regimes between anonymous members of a community of interest. The members of the community are however, necessarily limited to a single instance of Prolog. The UDP broadcast library removes that restriction. With this library loaded, any member on your local IP subnetwork that also has this library loaded may hear and respond to your broadcasts.
This library support three styles of networking as described below. Each of these networks have their own advantages and disadvantages. Please study the literature to understand the consequences.
After initialization and, in the case of a unicast network managing the set of peers, communication happens through broadcast/1, broadcast_request/1 and listen/1,2,3.
A broadcast/1 or broadcast_request/1 of the shape
udp(Scope, Term)
orudp(Scope, Term, TimeOut)
is forwarded over the UDP network to all peers that joined the same Scope. To prevent the potential for feedback loops, only the plain Term is broadcasted locally. The timeout is optional. It specifies the amount to time to wait for replies to arrive in response to a broadcast_request/1. The default period is 0.250 seconds. The timeout is ignored for broadcasts.An example of three separate processes cooperating in the same scope called
peers
:It is also possible to carry on a private dialog with a single responder. To do this, you supply a compound of the form, Term:PortId, to a UDP scoped broadcast/1 or broadcast_request/1, where PortId is the ip-address and port-id of the intended listener. If you supply an unbound variable, PortId, to broadcast_request, it will be unified with the address of the listener that responds to Term. You may send a directed broadcast to a specific member by simply providing this address in a similarly structured compound to a UDP scoped broadcast/1. The message is sent via unicast to that member only by way of the member's broadcast listener. It is received by the listener just as any other broadcast would be. The listener does not know the difference.
For example, in order to discover who responded with a particular value:
All incomming trafic is handled by a single thread with the alias
udp_inbound_proxy
. This thread also performs the internal dispatching using broadcast/1 and broadcast_request/1. Future versions may provide for handling these requests in separate threads.Caveats
While the implementation is mostly transparent, there are some important and subtle differences that must be taken into consideration:
udp_subnet
scope is not reentrant. If a listener performs a broadcast_request/1 with UDP scope recursively, then disaster looms certain. This caveat does not apply to a UDP scoped broadcast/1, which can safely be performed from a listener context.tipc.pl
*/