View source with raw comments or as raw
    1/*  Part of SWI-Prolog
    2
    3    Author:        Jan Wielemaker
    4    E-mail:        J.Wielemaker@vu.nl
    5    WWW:           http://www.swi-prolog.org
    6    Copyright (c)  2007-2020, University of Amsterdam
    7                              VU University Amsterdam
    8                              CWI, Amsterdam
    9    All rights reserved.
   10
   11    Redistribution and use in source and binary forms, with or without
   12    modification, are permitted provided that the following conditions
   13    are met:
   14
   15    1. Redistributions of source code must retain the above copyright
   16       notice, this list of conditions and the following disclaimer.
   17
   18    2. Redistributions in binary form must reproduce the above copyright
   19       notice, this list of conditions and the following disclaimer in
   20       the documentation and/or other materials provided with the
   21       distribution.
   22
   23    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
   24    "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
   25    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
   26    FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
   27    COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
   28    INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
   29    BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
   30    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
   31    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   32    LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
   33    ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
   34    POSSIBILITY OF SUCH DAMAGE.
   35*/
   36
   37:- module(thread,
   38          [ concurrent/3,               % +Threads, :Goals, +Options
   39            concurrent_maplist/2,       % :Goal, +List
   40            concurrent_maplist/3,       % :Goal, ?List1, ?List2
   41            concurrent_maplist/4,       % :Goal, ?List1, ?List2, ?List3
   42            concurrent_forall/2,        % :Generate, :Test
   43            concurrent_forall/3,        % :Generate, :Test, +Options
   44            concurrent_and/2,           % :Generator,:Test
   45            concurrent_and/3,           % :Generator,:Test,+Options
   46            first_solution/3,           % -Var, :Goals, +Options
   47
   48            call_in_thread/2            % +Thread, :Goal
   49          ]).   50:- autoload(library(apply),[maplist/2,maplist/3,maplist/4,maplist/5]).   51:- autoload(library(error),[must_be/2]).   52:- autoload(library(lists),[subtract/3,same_length/2]).   53:- autoload(library(option),[option/3]).   54:- autoload(library(ordsets), [ord_intersection/3]).   55:- autoload(library(debug), [debug/3, assertion/1]).   56
   57%:- debug(concurrent).
   58
   59:- meta_predicate
   60    concurrent(+, :, +),
   61    concurrent_maplist(1, +),
   62    concurrent_maplist(2, ?, ?),
   63    concurrent_maplist(3, ?, ?, ?),
   64    concurrent_forall(0, 0),
   65    concurrent_forall(0, 0, +),
   66    concurrent_and(0, 0),
   67    concurrent_and(0, 0, +),
   68    first_solution(-, :, +),
   69    call_in_thread(+, 0).   70
   71
   72:- predicate_options(concurrent/3, 3,
   73                     [ pass_to(system:thread_create/3, 3)
   74                     ]).   75:- predicate_options(first_solution/3, 3,
   76                     [ on_fail(oneof([stop,continue])),
   77                       on_error(oneof([stop,continue])),
   78                       pass_to(system:thread_create/3, 3)
   79                     ]).

High level thread primitives

This module defines simple to use predicates for running goals concurrently. Where the core multi-threaded API is targeted at communicating long-living threads, the predicates here are defined to run goals concurrently without having to deal with thread creation and maintenance explicitely.

Note that these predicates run goals concurrently and therefore these goals need to be thread-safe. As the predicates in this module also abort branches of the computation that are no longer needed, predicates that have side-effect must act properly. In a nutshell, this has the following consequences:

author
- Jan Wielemaker */
 concurrent(+N, :Goals, +Options) is semidet
Run Goals in parallel using N threads. This call blocks until all work has been done. The Goals must be independent. They should not communicate using shared variables or any form of global data. All Goals must be thread-safe.

Execution succeeds if all goals have succeeded. If one goal fails or throws an exception, other workers are abandoned as soon as possible and the entire computation fails or re-throws the exception. Note that if multiple goals fail or raise an error it is not defined which error or failure is reported.

On successful completion, variable bindings are returned. Note however that threads have independent stacks and therefore the goal is copied to the worker thread and the result is copied back to the caller of concurrent/3.

Choosing the right number of threads is not always obvious. Here are some scenarios:

Arguments:
N- Number of worker-threads to create. Using 1, no threads are created. If N is larger than the number of Goals we create exactly as many threads as there are Goals.
Goals- List of callable terms.
Options- Passed to thread_create/3 for creating the workers. Only options changing the stack-sizes can be used. In particular, do not pass the detached or alias options.
See also
- In many cases, concurrent_maplist/2 and friends is easier to program and is tractable to program analysis.
  158concurrent(1, M:List, _) :-
  159    !,
  160    maplist(once_in_module(M), List).
  161concurrent(N, M:List, Options) :-
  162    must_be(positive_integer, N),
  163    must_be(list(callable), List),
  164    length(List, JobCount),
  165    message_queue_create(Done),
  166    message_queue_create(Queue),
  167    WorkerCount is min(N, JobCount),
  168    create_workers(WorkerCount, Queue, Done, Workers, Options),
  169    submit_goals(List, 1, M, Queue, VarList),
  170    forall(between(1, WorkerCount, _),
  171           thread_send_message(Queue, done)),
  172    VT =.. [vars|VarList],
  173    concur_wait(JobCount, Done, VT, cleanup(Workers, Queue),
  174                Result, [], Exitted),
  175    subtract(Workers, Exitted, RemainingWorkers),
  176    concur_cleanup(Result, RemainingWorkers, [Queue, Done]),
  177    (   Result == true
  178    ->  true
  179    ;   Result = false
  180    ->  fail
  181    ;   Result = exception(Error)
  182    ->  throw(Error)
  183    ).
  184
  185once_in_module(M, Goal) :-
  186    call(M:Goal), !.
 submit_goals(+List, +Id0, +Module, +Queue, -Vars) is det
Send all jobs from List to Queue. Each goal is added to Queue as a term goal(Id, Goal, Vars). Vars is unified with a list of lists of free variables appearing in each goal.
  194submit_goals([], _, _, _, []).
  195submit_goals([H|T], I, M, Queue, [Vars|VT]) :-
  196    term_variables(H, Vars),
  197    thread_send_message(Queue, goal(I, M:H, Vars)),
  198    I2 is I + 1,
  199    submit_goals(T, I2, M, Queue, VT).
 concur_wait(+N, +Done:queue, +VT:compound, +Cleanup, -Result, +Exitted0, -Exitted) is semidet
Wait for completion, failure or error.
Arguments:
Exited- List of thread-ids with threads that completed before all work was done.
  210concur_wait(0, _, _, _, true, Exited, Exited) :- !.
  211concur_wait(N, Done, VT, Cleanup, Status, Exitted0, Exitted) :-
  212    debug(concurrent, 'Concurrent: waiting for workers ...', []),
  213    catch(thread_get_message(Done, Exit), Error,
  214          concur_abort(Error, Cleanup, Done, Exitted0)),
  215    debug(concurrent, 'Waiting: received ~p', [Exit]),
  216    (   Exit = done(Id, Vars)
  217    ->  debug(concurrent, 'Concurrent: Job ~p completed with ~p', [Id, Vars]),
  218        arg(Id, VT, Vars),
  219        N2 is N - 1,
  220        concur_wait(N2, Done, VT, Cleanup, Status, Exitted0, Exitted)
  221    ;   Exit = finished(Thread)
  222    ->  thread_join(Thread, JoinStatus),
  223        debug(concurrent, 'Concurrent: waiter ~p joined: ~p',
  224              [Thread, JoinStatus]),
  225        (   JoinStatus == true
  226        ->  concur_wait(N, Done, VT, Cleanup, Status, [Thread|Exitted0], Exitted)
  227        ;   Status = JoinStatus,
  228            Exitted = [Thread|Exitted0]
  229        )
  230    ).
  231
  232concur_abort(Error, cleanup(Workers, Queue), Done, Exitted) :-
  233    debug(concurrent, 'Concurrent: got ~p', [Error]),
  234    subtract(Workers, Exitted, RemainingWorkers),
  235    concur_cleanup(Error, RemainingWorkers, [Queue, Done]),
  236    throw(Error).
  237
  238create_workers(N, Queue, Done, [Id|Ids], Options) :-
  239    N > 0,
  240    !,
  241    thread_create(worker(Queue, Done), Id,
  242                  [ at_exit(thread_send_message(Done, finished(Id)))
  243                  | Options
  244                  ]),
  245    N2 is N - 1,
  246    create_workers(N2, Queue, Done, Ids, Options).
  247create_workers(_, _, _, [], _).
 worker(+WorkQueue, +DoneQueue) is det
Process jobs from WorkQueue and send the results to DoneQueue.
  254worker(Queue, Done) :-
  255    thread_get_message(Queue, Message),
  256    debug(concurrent, 'Worker: received ~p', [Message]),
  257    (   Message = goal(Id, Goal, Vars)
  258    ->  (   Goal
  259        ->  thread_send_message(Done, done(Id, Vars)),
  260            worker(Queue, Done)
  261        )
  262    ;   true
  263    ).
 concur_cleanup(+Result, +Workers:list, +Queues:list) is det
Cleanup the concurrent workers and message queues. If Result is not true, signal all workers to make them stop prematurely. If result is true we assume all workers have been instructed to stop or have stopped themselves.
  273concur_cleanup(Result, Workers, Queues) :-
  274    !,
  275    (   Result == true
  276    ->  true
  277    ;   kill_workers(Workers)
  278    ),
  279    join_all(Workers),
  280    maplist(message_queue_destroy, Queues).
  281
  282kill_workers([]).
  283kill_workers([Id|T]) :-
  284    debug(concurrent, 'Signalling ~w', [Id]),
  285    catch(thread_signal(Id, abort), _, true),
  286    kill_workers(T).
  287
  288join_all([]).
  289join_all([Id|T]) :-
  290    thread_join(Id, _),
  291    join_all(T).
  292
  293
  294		 /*******************************
  295		 *             FORALL		*
  296		 *******************************/
 concurrent_forall(:Generate, :Test) is semidet
 concurrent_forall(:Generate, :Test, +Options) is semidet
True when Test is true for all solutions of Generate. This has the same semantics as forall/2, but the Test goals are executed in multiple threads. Notable a failing Test or a Test throwing an exception signals the calling thread which in turn aborts all workers and fails or re-throws the generated error. Options:
threads(+Count)
Number of threads to use. The default is determined by the Prolog flag cpu_count.
To be done
- Ideally we would grow the set of workers dynamically, similar to dynamic scheduling of HTTP worker threads. This would avoid creating threads that are never used if Generate is too slow or does not provide enough answers and would further raise the number of threads if Test is I/O bound rather than CPU bound.
  317:- dynamic
  318    fa_aborted/1.  319
  320concurrent_forall(Generate, Test) :-
  321    concurrent_forall(Generate, Test, []).
  322
  323concurrent_forall(Generate, Test, Options) :-
  324    jobs(Jobs, Options),
  325    Jobs > 1,
  326    !,
  327    term_variables(Generate, GVars),
  328    term_variables(Test, TVars),
  329    sort(GVars, GVarsS),
  330    sort(TVars, TVarsS),
  331    ord_intersection(GVarsS, TVarsS, Shared),
  332    Templ =.. [v|Shared],
  333    MaxSize is Jobs*4,
  334    message_queue_create(Q, [max_size(MaxSize)]),
  335    length(Workers, Jobs),
  336    thread_self(Me),
  337    maplist(thread_create(fa_worker(Q, Me, Templ, Test)), Workers),
  338    catch(( forall(Generate,
  339                   thread_send_message(Q, job(Templ))),
  340            forall(between(1, Jobs, _),
  341                   thread_send_message(Q, done)),
  342            maplist(thread_join, Workers),
  343            message_queue_destroy(Q)
  344          ),
  345          Error,
  346          fa_cleanup(Error, Workers, Q)).
  347concurrent_forall(Generate, Test, _) :-
  348    forall(Generate, Test).
  349
  350fa_cleanup(Error, Workers, Q) :-
  351    maplist(safe_abort, Workers),
  352    debug(concurrent(fail), 'Joining workers', []),
  353    maplist(safe_join, Workers),
  354    debug(concurrent(fail), 'Destroying queue', []),
  355    retractall(fa_aborted(Q)),
  356    message_queue_destroy(Q),
  357    (   Error = fa_worker_failed(Test, Why)
  358    ->  debug(concurrent(fail), 'Test ~p failed: ~p', [Test, Why]),
  359        (   Why == false
  360        ->  fail
  361        ;   Why = error(E)
  362        ->  throw(E)
  363        ;   assertion(fail)
  364        )
  365    ;   throw(Error)
  366    ).
  367
  368fa_worker(Queue, Main, Templ, Test) :-
  369    repeat,
  370    thread_get_message(Queue, Msg),
  371    (   Msg == done
  372    ->  !
  373    ;   Msg = job(Templ),
  374        debug(concurrent, 'Running test ~p', [Test]),
  375        (   catch_with_backtrace(Test, E, true)
  376        ->  (   var(E)
  377            ->  fail
  378            ;   fa_stop(Queue, Main, fa_worker_failed(Test, error(E)))
  379            )
  380        ;   !,
  381            fa_stop(Queue, Main, fa_worker_failed(Test, false))
  382        )
  383    ).
  384
  385fa_stop(Queue, Main, Why) :-
  386    with_mutex('$concurrent_forall',
  387               fa_stop_sync(Queue, Main, Why)).
  388
  389fa_stop_sync(Queue, _Main, _Why) :-
  390    fa_aborted(Queue),
  391    !.
  392fa_stop_sync(Queue, Main, Why) :-
  393    asserta(fa_aborted(Queue)),
  394    debug(concurrent(fail), 'Stop due to ~p. Signalling ~q', [Why, Main]),
  395    thread_signal(Main, throw(Why)).
  396
  397jobs(Jobs, Options) :-
  398    (   option(threads(Jobs), Options)
  399    ->  true
  400    ;   current_prolog_flag(cpu_count, Jobs)
  401    ->  true
  402    ;   Jobs = 1
  403    ).
  404
  405safe_abort(Thread) :-
  406    catch(thread_signal(Thread, abort), error(_,_), true).
  407safe_join(Thread) :-
  408    E = error(_,_),
  409    catch(thread_join(Thread, _Status), E, true).
  410
  411
  412		 /*******************************
  413		 *              AND		*
  414		 *******************************/
 concurrent_and(:Generator, :Test)
 concurrent_and(:Generator, :Test, +Options)
Concurrent version of (Generator,Test). This predicate creates a thread providing solutions for Generator that are handed to a pool of threads that run Test for the different instantiations provided by Generator concurrently. The predicate is logically equivalent to a simple conjunction except for two aspects: (1) terms are copied from Generator to the test Test threads while answers are copied back to the calling thread and (2) answers may be produced out of order.

If the evaluation of some Test raises an exception, concurrent_and/2,3 is terminated with this exception. If the caller commits after a given answer or raises an exception while concurrent_and/2,3 is active with pending choice points, all involved resources are reclaimed.

Options:

threads(+Count)
Create a worker pool holding Count threads. The default is the Prolog flag cpu_count.

This predicate was proposed by Jan Burse as balance((Generator,Test)).

  443concurrent_and(Gen, Test) :-
  444    concurrent_and(Gen, Test, []).
  445
  446concurrent_and(Gen, Test, Options) :-
  447    jobs(Jobs, Options),
  448    MaxSize is Jobs*4,
  449    message_queue_create(JobQueue, [max_size(MaxSize)]),
  450    message_queue_create(AnswerQueue),
  451    ca_template(Gen, Test, Templ),
  452    term_variables(Gen+Test, AllVars),
  453    ReplyTempl =.. [v|AllVars],
  454    length(Workers, Jobs),
  455    Alive is 1<<Jobs-1,
  456    maplist(thread_create(ca_worker(JobQueue, AnswerQueue,
  457                                    Templ, Test, ReplyTempl)),
  458            Workers),
  459    thread_create(ca_generator(Gen, Templ, JobQueue, AnswerQueue),
  460                  GenThread),
  461    State = state(Alive),
  462    call_cleanup(
  463        ca_gather(State, AnswerQueue, ReplyTempl, Workers),
  464        ca_cleanup(GenThread, Workers, JobQueue, AnswerQueue)).
  465
  466ca_gather(State, AnswerQueue, ReplyTempl, Workers) :-
  467    repeat,
  468       thread_get_message(AnswerQueue, Msg),
  469       (   Msg = true(ReplyTempl)
  470       ->  true
  471       ;   Msg = done(Worker)
  472       ->  nth0(Done, Workers, Worker),
  473           arg(1, State, Alive0),
  474           Alive1 is Alive0 /\ \(1<<Done),
  475           debug(concurrent(and), 'Alive = ~2r', [Alive1]),
  476           (   Alive1 =:= 0
  477           ->  !,
  478               fail
  479           ;   nb_setarg(1, State, Alive1),
  480               fail
  481           )
  482       ;   Msg = error(E)
  483       ->  throw(E)
  484       ).
  485
  486ca_template(Gen, Test, Templ) :-
  487    term_variables(Gen,  GVars),
  488    term_variables(Test, TVars),
  489    sort(GVars, GVarsS),
  490    sort(TVars, TVarsS),
  491    ord_intersection(GVarsS, TVarsS, Shared),
  492    ord_union(GVarsS, Shared, TemplVars),
  493    Templ =.. [v|TemplVars].
  494
  495ca_worker(JobQueue, AnswerQueue, Templ, Test, ReplyTempl) :-
  496    thread_self(Me),
  497    EG = error(existence_error(message_queue, _), _),
  498    repeat,
  499    catch(thread_get_message(JobQueue, Req), EG, Req=all_done),
  500    (   Req = job(Templ)
  501    ->  (   catch(Test, E, true),
  502            (   var(E)
  503            ->  thread_send_message(AnswerQueue, true(ReplyTempl))
  504            ;   thread_send_message(AnswerQueue, error(E))
  505            ),
  506            fail
  507        )
  508    ;   Req == done
  509    ->  !,
  510        message_queue_destroy(JobQueue),
  511        thread_send_message(AnswerQueue, done(Me))
  512    ;   assertion(Req == all_done)
  513    ->  !,
  514        thread_send_message(AnswerQueue, done(Me))
  515    ).
  516
  517ca_generator(Gen, Templ, JobQueue, AnswerQueue) :-
  518    (   catch(Gen, E, true),
  519        (   var(E)
  520        ->  thread_send_message(JobQueue, job(Templ))
  521        ;   thread_send_message(AnswerQueue, error(E))
  522        ),
  523        fail
  524    ;   thread_send_message(JobQueue, done)
  525    ).
  526
  527ca_cleanup(GenThread, Workers, JobQueue, AnswerQueue) :-
  528    safe_abort(GenThread),
  529    safe_join(GenThread),
  530    maplist(safe_abort, Workers),
  531    maplist(safe_join, Workers),
  532    message_queue_destroy(AnswerQueue),
  533    catch(message_queue_destroy(JobQueue), error(_,_), true).
  534
  535
  536                 /*******************************
  537                 *             MAPLIST          *
  538                 *******************************/
 concurrent_maplist(:Goal, +List) is semidet
 concurrent_maplist(:Goal, +List1, +List2) is semidet
 concurrent_maplist(:Goal, +List1, +List2, +List3) is semidet
Concurrent version of maplist/2. This predicate uses concurrent/3, using multiple worker threads. The number of threads is the minimum of the list length and the number of cores available. The number of cores is determined using the prolog flag cpu_count. If this flag is absent or 1 or List has less than two elements, this predicate calls the corresponding maplist/N version using a wrapper based on once/1. Note that all goals are executed as if wrapped in once/1 and therefore these predicates are semidet.

Note that the the overhead of this predicate is considerable and therefore Goal must be fairly expensive before one reaches a speedup.

  557concurrent_maplist(Goal, List) :-
  558    workers(List, WorkerCount),
  559    !,
  560    maplist(ml_goal(Goal), List, Goals),
  561    concurrent(WorkerCount, Goals, []).
  562concurrent_maplist(M:Goal, List) :-
  563    maplist(once_in_module(M, Goal), List).
  564
  565once_in_module(M, Goal, Arg) :-
  566    call(M:Goal, Arg), !.
  567
  568ml_goal(Goal, Elem, call(Goal, Elem)).
  569
  570concurrent_maplist(Goal, List1, List2) :-
  571    same_length(List1, List2),
  572    workers(List1, WorkerCount),
  573    !,
  574    maplist(ml_goal(Goal), List1, List2, Goals),
  575    concurrent(WorkerCount, Goals, []).
  576concurrent_maplist(M:Goal, List1, List2) :-
  577    maplist(once_in_module(M, Goal), List1, List2).
  578
  579once_in_module(M, Goal, Arg1, Arg2) :-
  580    call(M:Goal, Arg1, Arg2), !.
  581
  582ml_goal(Goal, Elem1, Elem2, call(Goal, Elem1, Elem2)).
  583
  584concurrent_maplist(Goal, List1, List2, List3) :-
  585    same_length(List1, List2, List3),
  586    workers(List1, WorkerCount),
  587    !,
  588    maplist(ml_goal(Goal), List1, List2, List3, Goals),
  589    concurrent(WorkerCount, Goals, []).
  590concurrent_maplist(M:Goal, List1, List2, List3) :-
  591    maplist(once_in_module(M, Goal), List1, List2, List3).
  592
  593once_in_module(M, Goal, Arg1, Arg2, Arg3) :-
  594    call(M:Goal, Arg1, Arg2, Arg3), !.
  595
  596ml_goal(Goal, Elem1, Elem2, Elem3, call(Goal, Elem1, Elem2, Elem3)).
  597
  598workers(List, Count) :-
  599    current_prolog_flag(cpu_count, Cores),
  600    Cores > 1,
  601    length(List, Len),
  602    Count is min(Cores,Len),
  603    Count > 1,
  604    !.
  605
  606same_length([], [], []).
  607same_length([_|T1], [_|T2], [_|T3]) :-
  608    same_length(T1, T2, T3).
  609
  610
  611                 /*******************************
  612                 *             FIRST            *
  613                 *******************************/
 first_solution(-X, :Goals, +Options) is semidet
Try alternative solvers concurrently, returning the first answer. In a typical scenario, solving any of the goals in Goals is satisfactory for the application to continue. As soon as one of the tried alternatives is successful, all the others are killed and first_solution/3 succeeds.

For example, if it is unclear whether it is better to search a graph breadth-first or depth-first we can use:

search_graph(Grap, Path) :-
         first_solution(Path, [ breadth_first(Graph, Path),
                                depth_first(Graph, Path)
                              ],
                        []).

Options include thread stack-sizes passed to thread_create, as well as the options on_fail and on_error that specify what to do if a solver fails or triggers an error. By default execution of all solvers is terminated and the result is returned. Sometimes one may wish to continue. One such scenario is if one of the solvers may run out of resources or one of the solvers is known to be incomplete.

on_fail(Action)
If stop (default), terminate all threads and stop with the failure. If continue, keep waiting.
on_error(Action)
As above, re-throwing the error if an error appears.
bug
- first_solution/3 cannot deal with non-determinism. There is no obvious way to fit non-determinism into it. If multiple solutions are needed wrap the solvers in findall/3.
  653first_solution(X, M:List, Options) :-
  654    message_queue_create(Done),
  655    thread_options(Options, ThreadOptions, RestOptions),
  656    length(List, JobCount),
  657    create_solvers(List, M, X, Done, Solvers, ThreadOptions),
  658    wait_for_one(JobCount, Done, Result, RestOptions),
  659    concur_cleanup(kill, Solvers, [Done]),
  660    (   Result = done(_, Var)
  661    ->  X = Var
  662    ;   Result = error(_, Error)
  663    ->  throw(Error)
  664    ).
  665
  666create_solvers([], _, _, _, [], _).
  667create_solvers([H|T], M, X, Done, [Id|IDs], Options) :-
  668    thread_create(solve(M:H, X, Done), Id, Options),
  669    create_solvers(T, M, X, Done, IDs, Options).
  670
  671solve(Goal, Var, Queue) :-
  672    thread_self(Me),
  673    (   catch(Goal, E, true)
  674    ->  (   var(E)
  675        ->  thread_send_message(Queue, done(Me, Var))
  676        ;   thread_send_message(Queue, error(Me, E))
  677        )
  678    ;   thread_send_message(Queue, failed(Me))
  679    ).
  680
  681wait_for_one(0, _, failed, _) :- !.
  682wait_for_one(JobCount, Queue, Result, Options) :-
  683    thread_get_message(Queue, Msg),
  684    LeftCount is JobCount - 1,
  685    (   Msg = done(_, _)
  686    ->  Result = Msg
  687    ;   Msg = failed(_)
  688    ->  (   option(on_fail(stop), Options, stop)
  689        ->  Result = Msg
  690        ;   wait_for_one(LeftCount, Queue, Result, Options)
  691        )
  692    ;   Msg = error(_, _)
  693    ->  (   option(on_error(stop), Options, stop)
  694        ->  Result = Msg
  695        ;   wait_for_one(LeftCount, Queue, Result, Options)
  696        )
  697    ).
 thread_options(+Options, -ThreadOptions, -RestOptions) is det
Split the option list over thread(-size) options and other options.
  705thread_options([], [], []).
  706thread_options([H|T], [H|Th], O) :-
  707    thread_option(H),
  708    !,
  709    thread_options(T, Th, O).
  710thread_options([H|T], Th, [H|O]) :-
  711    thread_options(T, Th, O).
  712
  713thread_option(local(_)).
  714thread_option(global(_)).
  715thread_option(trail(_)).
  716thread_option(argument(_)).
  717thread_option(stack(_)).
 call_in_thread(+Thread, :Goal) is semidet
Run Goal as an interrupt in the context of Thread. This is based on thread_signal/2. If waiting times out, we inject a stop(Reason) exception into Goal. Interrupts can be nested, i.e., it is allowed to run a call_in_thread/2 while the target thread is processing such an interrupt.

This predicate is primarily intended for debugging and inspection tasks.

  731call_in_thread(Thread, Goal) :-
  732    thread_self(Thread),
  733    !,
  734    once(Goal).
  735call_in_thread(Thread, Goal) :-
  736    term_variables(Goal, Vars),
  737    thread_self(Me),
  738    A is random(1 000 000 000),
  739    thread_signal(Thread, run_in_thread(Goal,Vars,A,Me)),
  740    catch(thread_get_message(in_thread(A,Result)),
  741          Error,
  742          forward_exception(Thread, A, Error)),
  743    (   Result = true(Vars)
  744    ->  true
  745    ;   Result = error(Error)
  746    ->  throw(Error)
  747    ;   fail
  748    ).
  749
  750run_in_thread(Goal, Vars, Id, Sender) :-
  751    (   catch_with_backtrace(call(Goal), Error, true)
  752    ->  (   var(Error)
  753        ->  thread_send_message(Sender, in_thread(Id, true(Vars)))
  754        ;   Error = stop(_)
  755        ->  true
  756        ;   thread_send_message(Sender, in_thread(Id, error(Error)))
  757        )
  758    ;   thread_send_message(Sender, in_thread(Id, false))
  759    ).
  760
  761forward_exception(Thread, Id, Error) :-
  762    kill_with(Error, Kill),
  763    thread_signal(Thread, kill_task(Id, Kill)),
  764    throw(Error).
  765
  766kill_with(time_limit_exceeded, stop(time_limit_exceeded)) :-
  767    !.
  768kill_with(_, stop(interrupt)).
  769
  770kill_task(Id, Exception) :-
  771    prolog_current_frame(Frame),
  772    prolog_frame_attribute(Frame, parent_goal,
  773                           run_in_thread(_Goal, _Vars, Id, _Sender)),
  774    !,
  775    throw(Exception).
  776kill_task(_, _)