View source with raw comments or as raw
    1/*  Part of SWI-Prolog
    2
    3    Author:        Jeffrey Rosenwald and Jan Wielemaker
    4    E-mail:        jeffrose@acm.org
    5    WWW:           http://www.swi-prolog.org
    6    Copyright (c)  2012-2013, Jeffrey Rosenwald
    7		   2018-2020, CWI Amsterdam
    8    All rights reserved.
    9
   10    Redistribution and use in source and binary forms, with or without
   11    modification, are permitted provided that the following conditions
   12    are met:
   13
   14    1. Redistributions of source code must retain the above copyright
   15       notice, this list of conditions and the following disclaimer.
   16
   17    2. Redistributions in binary form must reproduce the above copyright
   18       notice, this list of conditions and the following disclaimer in
   19       the documentation and/or other materials provided with the
   20       distribution.
   21
   22    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
   23    "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
   24    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
   25    FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
   26    COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
   27    INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
   28    BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
   29    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
   30    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   31    LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
   32    ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
   33    POSSIBILITY OF SUCH DAMAGE.
   34*/
   35
   36:- module(udp_broadcast,
   37          [ udp_broadcast_initialize/2,         % +IPAddress, +Options
   38            udp_broadcast_close/1,		% +Scope
   39
   40            udp_peer_add/2,                     % +Scope, +IP
   41            udp_peer_del/2,                     % +Scope, ?IP
   42            udp_peer/2                          % +Scope, -IP
   43          ]).   44:- autoload(library(apply),[maplist/2,maplist/3]).   45:- autoload(library(backcomp),[thread_at_exit/1]).   46:- autoload(library(broadcast),
   47	    [broadcast_request/1,broadcast/1,listening/3,listen/3]).   48:- autoload(library(debug),[debug/3]).   49:- autoload(library(error),
   50	    [must_be/2,syntax_error/1,domain_error/2,existence_error/2]).   51:- autoload(library(option),[option/3]).   52:- autoload(library(socket),
   53	    [ tcp_close_socket/1,
   54	      udp_socket/1,
   55	      tcp_bind/2,
   56	      tcp_getopt/2,
   57	      tcp_setopt/2,
   58	      udp_receive/4,
   59	      udp_send/4
   60	    ]).   61
   62
   63% :- debug(udp(broadcast)).

A UDP broadcast proxy

SWI-Prolog's broadcast library provides a means that may be used to facilitate publish and subscribe communication regimes between anonymous members of a community of interest. The members of the community are however, necessarily limited to a single instance of Prolog. The UDP broadcast library removes that restriction. With this library loaded, any member on your local IP subnetwork that also has this library loaded may hear and respond to your broadcasts.

This library support three styles of networking as described below. Each of these networks have their own advantages and disadvantages. Please study the literature to understand the consequences.

broadcast
Broadcast messages are sent to the LAN subnet. The broadcast implementation uses two UDP ports: a public to address the whole group and a private one to address a specific node. Broadcasting is generally a good choice if the subnet is small and traffic is low.
unicast
Unicast sends copies of packages to known peers. Unicast networks can easily be routed. The unicast version uses a single UDP port per node. Unicast is generally a good choice for a small party, in particular if the peers are in different networks.
multicast
Multicast is like broadcast, but it can be configured to work accross networks and may work more efficiently on VLAN networks. Like the broadcast setup, two UDP ports are used. Multicasting can in general deliver the most efficient LAN and WAN networks, but requires properly configured routing between the peers.

After initialization and, in the case of a unicast network managing the set of peers, communication happens through broadcast/1, broadcast_request/1 and listen/1,2,3.

A broadcast/1 or broadcast_request/1 of the shape udp(Scope, Term) or udp(Scope, Term, TimeOut) is forwarded over the UDP network to all peers that joined the same Scope. To prevent the potential for feedback loops, only the plain Term is broadcasted locally. The timeout is optional. It specifies the amount to time to wait for replies to arrive in response to a broadcast_request/1. The default period is 0.250 seconds. The timeout is ignored for broadcasts.

An example of three separate processes cooperating in the same scope called peers:

Process A:

   ?- listen(number(X), between(1, 5, X)).
   true.

   ?-

Process B:

   ?- listen(number(X), between(7, 9, X)).
   true.

   ?-

Process C:

   ?- findall(X, broadcast_request(udp(peers, number(X))), Xs).
   Xs = [1, 2, 3, 4, 5, 7, 8, 9].

   ?-

It is also possible to carry on a private dialog with a single responder. To do this, you supply a compound of the form, Term:PortId, to a UDP scoped broadcast/1 or broadcast_request/1, where PortId is the ip-address and port-id of the intended listener. If you supply an unbound variable, PortId, to broadcast_request, it will be unified with the address of the listener that responds to Term. You may send a directed broadcast to a specific member by simply providing this address in a similarly structured compound to a UDP scoped broadcast/1. The message is sent via unicast to that member only by way of the member's broadcast listener. It is received by the listener just as any other broadcast would be. The listener does not know the difference.

For example, in order to discover who responded with a particular value:

Host B Process 1:

   ?- listen(number(X), between(1, 5, X)).
   true.

   ?-

Host A Process 1:


   ?- listen(number(X), between(7, 9, X)).
   true.

   ?-

Host A Process 2:

   ?- listen(number(X), between(1, 5, X)).
   true.

   ?- bagof(X, broadcast_request(udp(peers,number(X):From,1)), Xs).
   From = ip(192, 168, 1, 103):34855,
   Xs = [7, 8, 9] ;
   From = ip(192, 168, 1, 103):56331,
   Xs = [1, 2, 3, 4, 5] ;
   From = ip(192, 168, 1, 104):3217,
   Xs = [1, 2, 3, 4, 5].

All incomming trafic is handled by a single thread with the alias udp_inbound_proxy. This thread also performs the internal dispatching using broadcast/1 and broadcast_request/1. Future versions may provide for handling these requests in separate threads.

Caveats

While the implementation is mostly transparent, there are some important and subtle differences that must be taken into consideration:

author
- Jeffrey Rosenwald (JeffRose@acm.org), Jan Wielemaker
See also
- tipc.pl */
license
- BSD-2
  279:- multifile
  280    udp_term_string_hook/3,                     % +Scope, ?Term, ?String
  281    udp_unicast_join_hook/3,                    % +Scope, +From, +Data
  282    black_list/1.                               % +Term
  283
  284:- meta_predicate
  285    safely(0),
  286    safely_det(0).  287
  288safely(Predicate) :-
  289    Err = error(_,_),
  290    catch(Predicate, Err,
  291          print_message_fail(Err)).
  292
  293safely_det(Predicate) :-
  294    Err = error(_,_),
  295    catch(Predicate, Err,
  296          print_message_fail(Err)),
  297    !.
  298safely_det(_).
  299
  300print_message_fail(Term) :-
  301    print_message(error, Term),
  302    fail.
  303
  304udp_broadcast_address(IPAddress, Subnet, BroadcastAddress) :-
  305    IPAddress = ip(A1, A2, A3, A4),
  306    Subnet = ip(S1, S2, S3, S4),
  307    BroadcastAddress = ip(B1, B2, B3, B4),
  308
  309    B1 is A1 \/ (S1 xor 255),
  310    B2 is A2 \/ (S2 xor 255),
  311    B3 is A3 \/ (S3 xor 255),
  312    B4 is A4 \/ (S4 xor 255).
 udp_broadcast_service(?Scope, ?Address) is nondet
provides the UDP broadcast address for a given Scope. At present, only one scope is supported, udp_subnet.
 udp_scope(?ScopeName, ?ScopeDef)
  321:- dynamic
  322    udp_scope/2,
  323    udp_scope_peer/2.  324:- volatile
  325    udp_scope/2,
  326    udp_scope_peer/2.  327%
  328%  Here's a UDP proxy to Prolog's broadcast library
  329%
  330%  A sender may extend a broadcast  to  a   subnet  of  a UDP network by
  331%  specifying a =|udp_subnet|= scoping qualifier   in his/her broadcast.
  332%  The qualifier has the effect of  selecting the appropriate multi-cast
  333%  address for the transmission. Thus,  the   sender  of the message has
  334%  control over the scope of his/her traffic on a per-message basis.
  335%
  336%  All in-scope listeners receive the   broadcast and simply rebroadcast
  337%  the message locally. All broadcast replies, if any, are sent directly
  338%  to the sender via the port-id that   was received with the broadcast.
  339%
  340%  Each listener exposes two UDP ports,  a   shared  public port that is
  341%  bound to a well-known port number and   a  private port that uniquely
  342%  indentifies the listener. Broadcasts are received  on the public port
  343%  and replies are  sent  on  the   private  port.  Directed  broadcasts
  344%  (unicasts) are received on the private port   and replies are sent on
  345%  the private port.
  346
  347%  Thread 1 listens for directed traffic on the private port.
  348%
  349
  350:- dynamic
  351    udp_private_socket/3,                       % Port, Socket, FileNo
  352    udp_public_socket/4,                        % Scope, Port, Socket, FileNo
  353    udp_closed/1.				% Scope
  354
  355udp_inbound_proxy(Master) :-
  356    thread_at_exit(inbound_proxy_died),
  357    make_private_socket,
  358    thread_send_message(Master, udp_inbound_ready),
  359    udp_inbound_proxy_loop.
  360
  361udp_inbound_proxy_loop :-
  362    forall(udp_scope(Scope, ScopeData),
  363           make_public_socket(ScopeData, Scope)),
  364    retractall(udp_closed(_)),
  365    findall(FileNo, udp_socket_file_no(FileNo), FileNos),
  366    catch(dispatch_inbound(FileNos),
  367          E, dispatch_exception(E)),
  368    udp_inbound_proxy_loop.
  369
  370dispatch_exception(E) :-
  371    E = error(_,_),
  372    !,
  373    print_message(warning, E).
  374dispatch_exception(_).
 make_private_socket is det
Create our private socket. This socket is used for messages that are directed to me. Note that we only need this for broadcast networks. If we use a unicast network we use our public port to contact this specific server.
  384make_private_socket :-
  385    udp_private_socket(_Port, S, _F),
  386    !,
  387    (   (   udp_scope(Scope, broadcast(_,_,_))
  388        ;   udp_scope(Scope, multicast(_,_))
  389        ),
  390        \+ udp_closed(Scope)
  391    ->  true
  392    ;   tcp_close_socket(S),
  393        retractall(udp_private_socket(_,_,_))
  394    ).
  395make_private_socket :-
  396    udp_scope(_, broadcast(_,_,_)),
  397    !,
  398    udp_socket(S),
  399    tcp_bind(S, Port),
  400    tcp_getopt(S, file_no(F)),
  401    tcp_setopt(S, broadcast),
  402    assertz(udp_private_socket(Port, S, F)).
  403make_private_socket :-
  404    udp_scope(_, multicast(_,_)),
  405    !,
  406    udp_socket(S),
  407    tcp_bind(S, Port),
  408    tcp_getopt(S, file_no(F)),
  409    assertz(udp_private_socket(Port, S, F)).
  410make_private_socket.
 make_public_socket(+ScopeData, +Scope)
Create the public port Scope.
  416make_public_socket(_, Scope) :-
  417    udp_public_socket(Scope, _Port, S, _),
  418    !,
  419    (   udp_closed(Scope)
  420    ->  tcp_close_socket(S),
  421        retractall(udp_public_socket(Scope, _, _, _))
  422    ;   true
  423    ).
  424make_public_socket(broadcast(_SubNet, _Broadcast, Port), Scope) :-
  425    udp_socket(S),
  426    tcp_setopt(S, reuseaddr),
  427    tcp_bind(S, Port),
  428    tcp_getopt(S, file_no(F)),
  429    assertz(udp_public_socket(Scope, Port, S, F)).
  430make_public_socket(multicast(Group, Port), Scope) :-
  431    udp_socket(S),
  432    tcp_setopt(S, reuseaddr),
  433    tcp_bind(S, Port),
  434    tcp_setopt(S, ip_add_membership(Group)),
  435    tcp_getopt(S, file_no(F)),
  436    assertz(udp_public_socket(Scope, Port, S, F)).
  437make_public_socket(unicast(Port), Scope) :-
  438    udp_socket(S),
  439    tcp_bind(S, Port),
  440    tcp_getopt(S, file_no(F)),
  441    assertz(udp_public_socket(Scope, Port, S, F)).
  442
  443udp_socket_file_no(FileNo) :-
  444    udp_private_socket(_,_,FileNo).
  445udp_socket_file_no(FileNo) :-
  446    udp_public_socket(_,_,_,FileNo).
 dispatch_inbound(+FileNos)
Dispatch inbound traffic. This loop uses wait_for_input/3 to wait for one or more UDP sockets and dispatches the requests using the internal broadcast service. For an incomming broadcast request we send the reply only to the requester and therefore we must use a socket that is not in broadcast mode.
  456dispatch_inbound(FileNos) :-
  457    debug(udp(broadcast), 'Waiting for ~p', [FileNos]),
  458    wait_for_input(FileNos, Ready, infinite),
  459    debug(udp(broadcast), 'Ready: ~p', [Ready]),
  460    maplist(dispatch_ready, Ready),
  461    dispatch_inbound(FileNos).
  462
  463dispatch_ready(FileNo) :-
  464    udp_private_socket(_Port, Private, FileNo),
  465    !,
  466    udp_receive(Private, Data, From, [max_message_size(65535)]),
  467    debug(udp(broadcast), 'Inbound on private port', []),
  468    (   in_scope(Scope, From),
  469        udp_term_string(Scope, Term, Data) % only accept valid data
  470    ->  ld_dispatch(Private, Term, From, Scope)
  471    ;   true
  472    ).
  473dispatch_ready(FileNo) :-
  474    udp_public_socket(Scope, _PublicPort, Public, FileNo),
  475    !,
  476    udp_receive(Public, Data, From, [max_message_size(65535)]),
  477    debug(udp(broadcast), 'Inbound on public port from ~p for scope ~p',
  478          [From, Scope]),
  479    (   in_scope(Scope, From),
  480        udp_term_string(Scope, Term, Data) % only accept valid data
  481    ->  (   udp_scope(Scope, unicast(_))
  482        ->  ld_dispatch(Public, Term, From, Scope)
  483        ;   udp_private_socket(_PrivatePort, Private, _FileNo),
  484            ld_dispatch(Private, Term, From, Scope)
  485        )
  486    ;   udp_scope(Scope, unicast(_)),
  487        udp_term_string(Scope, Term, Data),
  488        unicast_out_of_scope_request(Scope, From, Term)
  489    ->  true
  490    ;   true
  491    ).
  492
  493in_scope(Scope, Address) :-
  494    udp_scope(Scope, ScopeData),
  495    in_scope(ScopeData, Scope, Address),
  496    !.
  497in_scope(Scope, From) :-
  498    debug(udp(broadcast), 'Out-of-scope ~p datagram from ~p',
  499          [Scope, From]),
  500    fail.
  501
  502in_scope(broadcast(Subnet, Broadcast, _PublicPort), _Scope, IP:_FromPort) :-
  503    udp_broadcast_address(IP, Subnet, Broadcast).
  504in_scope(multicast(_Group, _Port), _Scope, _From).
  505in_scope(unicast(_PublicPort), Scope, IP:_) :-
  506    udp_peer(Scope, IP:_).
 ld_dispatch(+PrivateSocket, +Term, +From, +Scope)
Locally dispatch Term received from From. If it concerns a broadcast request, send the replies to PrivateSocket to From. The multifile hook black_list/1 can be used to ignore certain messages.
  515ld_dispatch(_S, Term, From, _Scope) :-
  516    debug(udp(broadcast), 'ld_dispatch(~p) from ~p', [Term, From]),
  517    fail.
  518ld_dispatch(_S, Term, _From, _Scope) :-
  519    blacklisted(Term), !.
  520ld_dispatch(S, request(Key, Term), From, Scope) :-
  521    !,
  522    forall(safely(broadcast_request(Term)),
  523           safely((udp_term_string(Scope, reply(Key,Term), Message),
  524                   udp_send(S, Message, From, [])))).
  525ld_dispatch(_S, send(Term), _From, _Scope) :-
  526    !,
  527    safely_det(broadcast(Term)).
  528ld_dispatch(_S, reply(Key, Term), From, _Scope) :-
  529    (   reply_queue(Key, Queue)
  530    ->  safely(thread_send_message(Queue, Term:From))
  531    ;   true
  532    ).
  533
  534blacklisted(send(Term))      :- black_list(Term).
  535blacklisted(request(_,Term)) :- black_list(Term).
  536blacklisted(reply(_,Term))   :- black_list(Term).
 reload_udp_proxy
Update the UDP relaying proxy service. The proxy consists of three forwarding mechanisms:
  552reload_udp_proxy :-
  553    reload_outbound_proxy,
  554    reload_inbound_proxy.
  555
  556reload_outbound_proxy :-
  557    listening(udp_broadcast, udp(_,_), _),
  558    !.
  559reload_outbound_proxy :-
  560    listen(udp_broadcast, udp(Scope,Message),
  561           udp_broadcast(Message, Scope, 0.25)),
  562    listen(udp_broadcast, udp(Scope,Message,Timeout),
  563           udp_broadcast(Message, Scope, Timeout)),
  564    listen(udp_broadcast, udp_subnet(Message),  % backward compatibility
  565           udp_broadcast(Message, subnet, 0.25)),
  566    listen(udp_broadcast, udp_subnet(Message,Timeout),
  567           udp_broadcast(Message, subnet, Timeout)).
  568
  569reload_inbound_proxy :-
  570    catch(thread_signal(udp_inbound_proxy, throw(udp_reload)),
  571          error(existence_error(thread, _),_),
  572          fail),
  573    !.
  574reload_inbound_proxy :-
  575    thread_self(Me),
  576    thread_create(udp_inbound_proxy(Me), _,
  577                  [ alias(udp_inbound_proxy),
  578                    detached(true)
  579                  ]),
  580    thread_get_message(Me, udp_inbound_ready, [timeout(10)]).
  581
  582inbound_proxy_died :-
  583    thread_self(Self),
  584    thread_property(Self, status(Status)),
  585    (   catch(recreate_proxy(Status), _, fail)
  586    ->  print_message(informational,
  587                      httpd_restarted_worker(Self))
  588    ;   done_status_message_level(Status, Level),
  589        print_message(Level,
  590                      httpd_stopped_worker(Self, Status))
  591    ).
  592
  593recreate_proxy(exception(Error)) :-
  594    recreate_on_error(Error),
  595    reload_inbound_proxy.
  596
  597recreate_on_error('$aborted').
  598recreate_on_error(time_limit_exceeded).
  599
  600done_status_message_level(true, silent) :- !.
  601done_status_message_level(exception('$aborted'), silent) :- !.
  602done_status_message_level(_, informational).
 udp_broadcast_close(+Scope)
Close a UDP broadcast scope.
  609udp_broadcast_close(Scope) :-
  610    udp_scope(Scope, _ScopeData),
  611    !,
  612    assert(udp_closed(Scope)),
  613    reload_udp_proxy.
  614udp_broadcast_close(_).
 udp_broadcast(+What, +Scope, +TimeOut)
Send a broadcast request to my UDP peers in Scope. What is either of the shape Term:Address to send Term to a specific address or query the address from which term is answered or it is a plain Term.

If Term is nonground, it is considered is a request (see broadcast_request/1) and the predicate succeeds for each answer received within TimeOut seconds. If Term is ground it is considered an asynchronous broadcast and udp_broadcast/3 is deterministic.

  628udp_broadcast(Term:To, Scope, _Timeout) :-
  629    ground(Term), ground(To),           % broadcast to single listener
  630    !,
  631    udp_basic_broadcast(send(Term), Scope, single(To)).
  632udp_broadcast(Term, Scope, _Timeout) :-
  633    ground(Term),                       % broadcast to all listeners
  634    !,
  635    udp_basic_broadcast(send(Term), Scope, broadcast).
  636udp_broadcast(Term:To, Scope, Timeout) :-
  637    ground(To),                         % request to single listener
  638    !,
  639    setup_call_cleanup(
  640        request_queue(Id, Queue),
  641        ( udp_basic_broadcast(request(Id, Term), Scope, single(To)),
  642          udp_br_collect_replies(Queue, Timeout, Term:To)
  643        ),
  644        destroy_request_queue(Queue)).
  645udp_broadcast(Term:From, Scope, Timeout) :-
  646    !,                                  % request to all listeners, collect sender
  647    setup_call_cleanup(
  648        request_queue(Id, Queue),
  649        ( udp_basic_broadcast(request(Id, Term), Scope, broadcast),
  650          udp_br_collect_replies(Queue, Timeout, Term:From)
  651        ),
  652        destroy_request_queue(Queue)).
  653udp_broadcast(Term, Scope, Timeout) :-  % request to all listeners
  654    udp_broadcast(Term:_, Scope, Timeout).
  655
  656:- dynamic
  657    reply_queue/2.  658
  659request_queue(Id, Queue) :-
  660    Id is random(1<<63),
  661    message_queue_create(Queue),
  662    asserta(reply_queue(Id, Queue)).
  663
  664destroy_request_queue(Queue) :-         % leave queue to GC
  665    retractall(reply_queue(_, Queue)).
 udp_basic_broadcast(+Term, +Dest) is multi
Create a UDP private socket and use it to send Term to Address. If Address is our broadcast address, set the socket in broadcast mode.

This predicate succeeds with a choice point. Committing the choice point closes S.

Arguments:
Dest- is one of single(Target) or broadcast.
  678udp_basic_broadcast(Term, Scope, Dest) :-
  679    debug(udp(broadcast), 'UDP proxy outbound ~p to ~p', [Term, Dest]),
  680    udp_term_string(Scope, Term, String),
  681    udp_send_message(Dest, String, Scope).
  682
  683udp_send_message(single(Address), String, Scope) :-
  684    (   udp_scope(Scope, unicast(_))
  685    ->  udp_public_socket(Scope, _Port, S, _)
  686    ;   udp_private_socket(_Port, S, _F)
  687    ),
  688    safely(udp_send(S, String, Address, [])).
  689udp_send_message(broadcast, String, Scope) :-
  690    (   udp_scope(Scope, unicast(_))
  691    ->  udp_public_socket(Scope, _Port, S, _),
  692        forall(udp_peer(Scope, Address),
  693               ( debug(udp(broadcast), 'Unicast to ~p', [Address]),
  694                 safely(udp_send(S, String, Address, []))))
  695    ;   udp_scope(Scope, broadcast(_SubNet, Broadcast, Port))
  696    ->  udp_private_socket(_PrivatePort, S, _F),
  697        udp_send(S, String, Broadcast:Port, [])
  698    ;   udp_scope(Scope, multicast(Group, Port))
  699    ->  udp_private_socket(_PrivatePort, S, _F),
  700        udp_send(S, String, Group:Port, [])
  701    ).
  702
  703% ! udp_br_collect_replies(+Queue, +TimeOut, -TermAndFrom) is nondet.
  704%
  705%   Collect replies on Socket for  TimeOut   seconds.  Succeed  for each
  706%   received message.
  707
  708udp_br_collect_replies(Queue, Timeout, Reply) :-
  709    get_time(Start),
  710    Deadline is Start+Timeout,
  711    repeat,
  712       (   thread_get_message(Queue, Reply,
  713                              [ deadline(Deadline)
  714                              ])
  715       ->  true
  716       ;   !,
  717           fail
  718       ).
 udp_broadcast_initialize(+IPAddress, +Options) is semidet
Initialized UDP broadcast bridge. IPAddress is the IP address on the network we want to broadcast on. IP addresses are terms ip(A,B,C,D) or an atom or string of the format A.B.C.D. Options processed:
scope(+ScopeName)
Name of the scope. Default is subnet.
subnet_mask(+SubNet)
Subnet to broadcast on. This uses the same syntax as IPAddress. Default classifies the network as class A, B or C depending on the the first octet and applies the default mask.
port(+Port)
Public port to use. Default is 20005.
method(+Method)
Method to send a message to multiple peers. One of
broadcast
Use UDP broadcast messages to the LAN. This is the default
multicast
Use UDP multicast messages. This can be used on WAN networks, provided the intermediate routers understand multicast.
unicast
Send the messages individually to all registered peers.

For compatibility reasons Options may be the subnet mask.

  747udp_broadcast_initialize(IP, Options) :-
  748    with_mutex(udp_broadcast,
  749               udp_broadcast_initialize_sync(IP, Options)).
  750
  751udp_broadcast_initialize_sync(IP, Options) :-
  752    nonvar(Options),
  753    Options = ip(_,_,_,_),
  754    !,
  755    udp_broadcast_initialize(IP, [subnet_mask(Options)]).
  756udp_broadcast_initialize_sync(IP, Options) :-
  757    to_ip4(IP, IPAddress),
  758    option(method(Method), Options, broadcast),
  759    must_be(oneof([broadcast, multicast, unicast]), Method),
  760    udp_broadcast_initialize_sync(Method, IPAddress, Options),
  761    reload_udp_proxy.
  762
  763udp_broadcast_initialize_sync(broadcast, IPAddress, Options) :-
  764    option(subnet_mask(Subnet), Options, _),
  765    mk_subnet(Subnet, IPAddress, Subnet4),
  766    option(port(Port), Options, 20005),
  767    option(scope(Scope), Options, subnet),
  768
  769    udp_broadcast_address(IPAddress, Subnet4, Broadcast),
  770    udp_broadcast_close(Scope),
  771    assertz(udp_scope(Scope, broadcast(Subnet4, Broadcast, Port))).
  772udp_broadcast_initialize_sync(unicast, _IPAddress, Options) :-
  773    option(port(Port), Options, 20005),
  774    option(scope(Scope), Options, subnet),
  775    udp_broadcast_close(Scope),
  776    assertz(udp_scope(Scope, unicast(Port))).
  777udp_broadcast_initialize_sync(multicast, IPAddress, Options) :-
  778    option(port(Port), Options, 20005),
  779    option(scope(Scope), Options, subnet),
  780    udp_broadcast_close(Scope),
  781    multicast_address(IPAddress),
  782    assertz(udp_scope(Scope, multicast(IPAddress, Port))).
  783
  784to_ip4(Atomic, ip(A,B,C,D)) :-
  785    atomic(Atomic),
  786    !,
  787    (   split_string(Atomic, ".", "", Strings),
  788        maplist(number_string, [A,B,C,D], Strings)
  789    ->  true
  790    ;   syntax_error(illegal_ip_address)
  791    ).
  792to_ip4(IP, IP).
  793
  794mk_subnet(Var, IP, Subnet) :-
  795    var(Var),
  796    !,
  797    (   default_subnet(IP, Subnet)
  798    ->  true
  799    ;   domain_error(ip_with_subnet, IP)
  800    ).
  801mk_subnet(Subnet, _, Subnet4) :-
  802    to_ip4(Subnet, Subnet4).
  803
  804default_subnet(ip(A,_,_,_), ip(A,0,0,0)) :-
  805    between(1,126, A), !.
  806default_subnet(ip(A,B,_,_), ip(A,B,0,0)) :-
  807    between(128,191, A), !.
  808default_subnet(ip(A,B,C,_), ip(A,B,C,0)) :-
  809    between(192,223, A), !.
  810
  811multicast_address(ip(A,_,_,_)) :-
  812    between(224, 239, A),
  813    !.
  814multicast_address(IP) :-
  815    domain_error(multicast_network, IP).
  816
  817
  818		 /*******************************
  819		 *          UNICAST PEERS	*
  820		 *******************************/
 udp_peer_add(+Scope, +Address) is det
 udp_peer_del(+Scope, ?Address) is det
 udp_peer(?Scope, ?Address) is nondet
Manage and query the set of known peers for a unicast network. Address is either a term IP:Port or a plain IP address. In the latter case the default port registered with the scope is used.
Arguments:
Address- has canonical form ip(A,B,C,D):Port.
  832udp_peer_add(Scope, Address) :-
  833    must_be(ground, Address),
  834    peer_address(Address, Scope, Canonical),
  835    (   udp_scope_peer(Scope, Canonical)
  836    ->  true
  837    ;   assertz(udp_scope_peer(Scope, Canonical))
  838    ).
  839
  840udp_peer_del(Scope, Address) :-
  841    peer_address(Address, Scope, Canonical),
  842    retractall(udp_scope_peer(Scope, Canonical)).
  843
  844udp_peer(Scope, IPAddress) :-
  845    udp_scope_peer(Scope, IPAddress).
  846
  847peer_address(IP:Port, _Scope, IPAddress:Port) :-
  848    !,
  849    to_ip4(IP, IPAddress).
  850peer_address(IP, Scope, IPAddress:Port) :-
  851    (   udp_scope(Scope, unicast(Port))
  852    ->  true
  853    ;   existence_error(udp_scope, Scope)
  854    ),
  855    to_ip4(IP, IPAddress).
  856
  857
  858
  859		 /*******************************
  860		 *             HOOKS		*
  861		 *******************************/
 udp_term_string_hook(+Scope, +Term, -String) is det
udp_term_string_hook(+Scope, -Term, +String) is semidet
Hook for serializing the message Term. The default writes %prolog\n, followed by the Prolog term in quoted notation while ignoring operators. This hook may use alternative serialization such as fast_term_serialized/2, use library(ssl) to realise encrypted messages, etc.
Arguments:
Scope- is the scope for which the message is broadcasted. This can be used to use different serialization for different scopes.
Term- encapsulates the term broadcasted by the application as follows:
send(ApplTerm)
Is sent by broadcast(udp(Scope, ApplTerm))
request(Id, ApplTerm)
Is sent by broadcast_request/1, where Id is a unique large (64 bit) integer.
reply(Id, ApplTerm)
Is sent to reply on a broadcast_request/1 request that has been received. Arguments are the same as above.
throws
- The hook may throw udp(invalid_message) to stop processing the message.
 udp_term_string(+Scope, +Term, -String) is det
udp_term_string(+Scope, -Term, +String) is semidet
Serialize an arbitrary Prolog term as a string. The string is prefixed by a magic key to ensure we only accept messages that are meant for us.

In mode (+,-), Term is written with the options ignore_ops(true) and quoted(true).

This predicate first calls udp_term_string_hook/3.

  901udp_term_string(Scope, Term, String) :-
  902    catch(udp_term_string_hook(Scope, Term, String), udp(Error), true),
  903    !,
  904    (   var(Error)
  905    ->  true
  906    ;   Error == invalid_message
  907    ->  fail
  908    ;   throw(udp(Error))
  909    ).
  910udp_term_string(_Scope, Term, String) :-
  911    (   var(String)
  912    ->  format(string(String), '%-prolog-\n~W',
  913               [ Term,
  914                 [ ignore_ops(true),
  915                   quoted(true)
  916                 ]
  917               ])
  918    ;   sub_string(String, 0, _, _, '%-prolog-\n'),
  919        term_string(Term, String,
  920                    [ syntax_errors(quiet)
  921                    ])
  922    ).
 unicast_out_of_scope_request(+Scope, +From, +Data) is semidet
 udp_unicast_join_hook(+Scope, +From, +Data) is semidet
This multifile hook is called if an UDP package is received on the port of the unicast network identified by Scope. From is the origin IP and port and Data is the message data that is deserialized as defined for the scope (see udp_term_string/3).

This hook is intended to initiate a new node joining the network of peers. We could in theory also omit the in-scope test and use a normal broadcast to join. Using a different channal however provides a basic level of security. A possibe implementation is below. The first fragment is a hook added to the server, the second is a predicate added to a client and the last initiates the request in the client. The excanged term (join(X)) can be used to exchange a welcome handshake.

:- multifile udp_broadcast:udp_unicast_join_hook/3.
udp_broadcast:udp_unicast_join_hook(Scope, From, join(welcome)) :-
    udp_peer_add(Scope, From),
join_request(Scope, Address, Reply) :-
    udp_peer_add(Scope, Address),
    broadcast_request(udp(Scope, join(X))).
?- join_request(myscope, "1.2.3.4":10001, Reply).
Reply = welcome.
  960unicast_out_of_scope_request(Scope, From, send(Term)) :-
  961    udp_unicast_join_hook(Scope, From, Term).
  962unicast_out_of_scope_request(Scope, From, request(Key, Term)) :-
  963    udp_unicast_join_hook(Scope, From, Term),
  964    udp_public_socket(Scope, _Port, Socket, _FileNo),
  965    safely((udp_term_string(Scope, reply(Key,Term), Message),
  966            udp_send(Socket, Message, From, [])))