View source with formatted comments or as raw
    1/*  Part of SWI-Prolog
    2
    3    Author:        Jan Wielemaker
    4    E-mail:        J.Wielemaker@vu.nl
    5    WWW:           http://www.swi-prolog.org
    6    Copyright (c)  2008-2016, University of Amsterdam
    7                              VU University Amsterdam
    8    All rights reserved.
    9
   10    Redistribution and use in source and binary forms, with or without
   11    modification, are permitted provided that the following conditions
   12    are met:
   13
   14    1. Redistributions of source code must retain the above copyright
   15       notice, this list of conditions and the following disclaimer.
   16
   17    2. Redistributions in binary form must reproduce the above copyright
   18       notice, this list of conditions and the following disclaimer in
   19       the documentation and/or other materials provided with the
   20       distribution.
   21
   22    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
   23    "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
   24    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
   25    FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
   26    COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
   27    INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
   28    BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
   29    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
   30    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   31    LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
   32    ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
   33    POSSIBILITY OF SUCH DAMAGE.
   34*/
   35
   36:- module(thread_pool,
   37          [ thread_pool_create/3,       % +Pool, +Size, +Options
   38            thread_pool_destroy/1,      % +Pool
   39            thread_create_in_pool/4,    % +Pool, :Goal, -Id, +Options
   40
   41            current_thread_pool/1,      % ?Pool
   42            thread_pool_property/2      % ?Pool, ?Property
   43          ]).   44:- use_module(library(debug),[debug/3]).   45:- autoload(library(error),[must_be/2,type_error/2]).   46:- autoload(library(lists),[member/2,delete/3]).   47:- autoload(library(option),
   48	    [meta_options/3,select_option/4,merge_options/3,option/3]).   49:- autoload(library(rbtrees),
   50	    [ rb_new/1,
   51	      rb_insert_new/4,
   52	      rb_delete/3,
   53	      rb_keys/2,
   54	      rb_lookup/3,
   55	      rb_update/4
   56	    ]).   57
   58
   59/** <module> Resource bounded thread management
   60
   61The module library(thread_pool) manages threads in pools. A pool defines
   62properties of its member threads and the  maximum number of threads that
   63can coexist in the pool. The   call  thread_create_in_pool/4 allocates a
   64thread in the pool, just like  thread_create/3.   If  the  pool is fully
   65allocated it can be asked to wait or raise an error.
   66
   67The library has been designed  to   deal  with  server applications that
   68receive a variety of requests, such as   HTTP servers. Simply starting a
   69thread for each request is a bit too simple minded for such servers:
   70
   71    * Creating many CPU intensive threads often leads to a slow-down
   72    rather than a speedup.
   73    * Creating many memory intensive threads may exhaust resources
   74    * Tasks that require little CPU and memory but take long waiting
   75    for external resources can run many threads.
   76
   77Using this library, one can define a  pool   for  each set of tasks with
   78comparable characteristics and create threads in   this pool. Unlike the
   79worker-pool model, threads are not started immediately. Depending on the
   80design, both approaches can be attractive.
   81
   82The library is implemented by means of   a manager thread with the fixed
   83thread id =|__thread_pool_manager|=. All  state   is  maintained in this
   84manager thread, which receives and  processes   requests  to  create and
   85destroy pools, create  threads  in  a   pool  and  handle  messages from
   86terminated threads. Thread pools are _not_ saved   in  a saved state and
   87must therefore be recreated  using   the  initialization/1  directive or
   88otherwise during startup of the application.
   89
   90@see http_handler/3 and http_spawn/2.
   91*/
   92
   93:- meta_predicate
   94    thread_create_in_pool(+, 0, -, :).   95:- predicate_options(thread_create_in_pool/4, 4,
   96                     [ wait(boolean),
   97                       pass_to(system:thread_create/3, 3)
   98                     ]).   99
  100:- multifile
  101    create_pool/1.  102
  103%!  thread_pool_create(+Pool, +Size, +Options) is det.
  104%
  105%   Create a pool of threads. A pool of threads is a declaration for
  106%   creating threads with shared  properties   (stack  sizes)  and a
  107%   limited  number  of  threads.   Threads    are   created   using
  108%   thread_create_in_pool/4. If all threads in the  pool are in use,
  109%   the   behaviour   depends   on    the     =wait=    option    of
  110%   thread_create_in_pool/4  and  the  =backlog=   option  described
  111%   below.  Options are passed to thread_create/3, except for
  112%
  113%       * backlog(+MaxBackLog)
  114%       Maximum number of requests that can be suspended.  Default
  115%       is =infinite=.  Otherwise it must be a non-negative integer.
  116%       Using backlog(0) will never delay thread creation for this
  117%       pool.
  118%
  119%   The pooling mechanism does _not_   interact  with the =detached=
  120%   state of a thread. Threads can   be  created both =detached= and
  121%   normal and must be joined using   thread_join/2  if they are not
  122%   detached.
  123
  124thread_pool_create(Name, Size, Options) :-
  125    must_be(list, Options),
  126    pool_manager(Manager),
  127    thread_self(Me),
  128    thread_send_message(Manager, create_pool(Name, Size, Options, Me)),
  129    wait_reply.
  130
  131%!  thread_pool_destroy(+Name) is det.
  132%
  133%   Destroy the thread pool named Name.
  134%
  135%   @error  existence_error(thread_pool, Name).
  136
  137thread_pool_destroy(Name) :-
  138    pool_manager(Manager),
  139    thread_self(Me),
  140    thread_send_message(Manager, destroy_pool(Name, Me)),
  141    wait_reply.
  142
  143
  144%!  current_thread_pool(?Name) is nondet.
  145%
  146%   True if Name refers to a defined thread pool.
  147
  148current_thread_pool(Name) :-
  149    pool_manager(Manager),
  150    thread_self(Me),
  151    thread_send_message(Manager, current_pools(Me)),
  152    wait_reply(Pools),
  153    (   atom(Name)
  154    ->  memberchk(Name, Pools)
  155    ;   member(Name, Pools)
  156    ).
  157
  158%!  thread_pool_property(?Name, ?Property) is nondet.
  159%
  160%   True if Property is a property of thread pool Name. Defined
  161%   properties are:
  162%
  163%       * options(Options)
  164%       Thread creation options for this pool
  165%       * free(Size)
  166%       Number of free slots on this pool
  167%       * size(Size)
  168%       Total number of slots on this pool
  169%       * members(ListOfIDs)
  170%       ListOfIDs is the list or threads running in this pool
  171%       * running(Running)
  172%       Number of running threads in this pool
  173%       * backlog(Size)
  174%       Number of delayed thread creations on this pool
  175
  176thread_pool_property(Name, Property) :-
  177    current_thread_pool(Name),
  178    pool_manager(Manager),
  179    thread_self(Me),
  180    thread_send_message(Manager, pool_properties(Me, Name, Property)),
  181    wait_reply(Props),
  182    (   nonvar(Property)
  183    ->  memberchk(Property, Props)
  184    ;   member(Property, Props)
  185    ).
  186
  187
  188%!  thread_create_in_pool(+Pool, :Goal, -Id, +Options) is det.
  189%
  190%   Create  a  thread  in  Pool.  Options  overrule  default  thread
  191%   creation options associated  to  the   pool.  In  addition,  the
  192%   following option is defined:
  193%
  194%       * wait(+Boolean)
  195%       If =true= (default) and the pool is full, wait until a
  196%       member of the pool completes.  If =false=, throw a
  197%       resource_error.
  198%
  199%   @error  resource_error(threads_in_pool(Pool)) is raised if wait
  200%           is =false= or the backlog limit has been reached.
  201%   @error  existence_error(thread_pool, Pool) if Pool does not
  202%           exist.
  203
  204thread_create_in_pool(Pool, Goal, Id, QOptions) :-
  205    meta_options(is_meta, QOptions, Options),
  206    catch(thread_create_in_pool_(Pool, Goal, Id, Options),
  207          Error, true),
  208    (   var(Error)
  209    ->  true
  210    ;   Error = error(existence_error(thread_pool, Pool), _),
  211        create_pool_lazily(Pool)
  212    ->  thread_create_in_pool_(Pool, Goal, Id, Options)
  213    ;   throw(Error)
  214    ).
  215
  216thread_create_in_pool_(Pool, Goal, Id, Options) :-
  217    select_option(wait(Wait), Options, ThreadOptions, true),
  218    pool_manager(Manager),
  219    thread_self(Me),
  220    thread_send_message(Manager,
  221                        create(Pool, Goal, Me, Wait, Id, ThreadOptions)),
  222    wait_reply(Id).
  223
  224is_meta(at_exit).
  225
  226
  227%!  create_pool_lazily(+Pool) is semidet.
  228%
  229%   Call the hook create_pool/1 to create the pool lazily.
  230
  231create_pool_lazily(Pool) :-
  232    with_mutex(Pool,
  233               ( mutex_destroy(Pool),
  234                 create_pool_sync(Pool)
  235               )).
  236
  237create_pool_sync(Pool) :-
  238    current_thread_pool(Pool),
  239    !.
  240create_pool_sync(Pool) :-
  241    create_pool(Pool).
  242
  243
  244                 /*******************************
  245                 *        START MANAGER         *
  246                 *******************************/
  247
  248%!  pool_manager(-ThreadID) is det.
  249%
  250%   ThreadID is the thread (alias) identifier of the manager. Starts
  251%   the manager if it is not running.
  252
  253pool_manager(TID) :-
  254    TID = '__thread_pool_manager',
  255    (   thread_running(TID)
  256    ->  true
  257    ;   with_mutex('__thread_pool', create_pool_manager(TID))
  258    ).
  259
  260thread_running(Thread) :-
  261    catch(thread_property(Thread, status(Status)),
  262          E, true),
  263    (   var(E)
  264    ->  (   Status == running
  265        ->  true
  266        ;   thread_join(Thread, _),
  267            print_message(warning, thread_pool(manager_died(Status))),
  268            fail
  269        )
  270    ;   E = error(existence_error(thread, Thread), _)
  271    ->  fail
  272    ;   throw(E)
  273    ).
  274
  275create_pool_manager(Thread) :-
  276    thread_running(Thread),
  277    !.
  278create_pool_manager(Thread) :-
  279    thread_create(pool_manager_main, _,
  280                  [ alias(Thread),
  281                    inherit_from(main)
  282                  ]).
  283
  284
  285pool_manager_main :-
  286    rb_new(State0),
  287    manage_thread_pool(State0).
  288
  289
  290                 /*******************************
  291                 *        MANAGER LOGIC         *
  292                 *******************************/
  293
  294%!  manage_thread_pool(+State)
  295
  296manage_thread_pool(State0) :-
  297    thread_get_message(Message),
  298    (   update_thread_pool(Message, State0, State)
  299    ->  debug(thread_pool(state), 'Message ~p --> ~p', [Message, State]),
  300        manage_thread_pool(State)
  301    ;   format(user_error, 'Update failed: ~p~n', [Message])
  302    ).
  303
  304
  305update_thread_pool(create_pool(Name, Size, Options, For), State0, State) :-
  306    !,
  307    (   rb_insert_new(State0,
  308                      Name, tpool(Options, Size, Size, WP, WP, []),
  309                      State)
  310    ->  thread_send_message(For, thread_pool(true))
  311    ;   reply_error(For, permission_error(create, thread_pool, Name)),
  312        State = State0
  313    ).
  314update_thread_pool(destroy_pool(Name, For), State0, State) :-
  315    !,
  316    (   rb_delete(State0, Name, State)
  317    ->  thread_send_message(For, thread_pool(true))
  318    ;   reply_error(For, existence_error(thread_pool, Name)),
  319        State = State0
  320    ).
  321update_thread_pool(current_pools(For), State, State) :-
  322    !,
  323    rb_keys(State, Keys),
  324    debug(thread_pool(current), 'Reply to ~w: ~p', [For, Keys]),
  325    reply(For, Keys).
  326update_thread_pool(pool_properties(For, Name, P), State, State) :-
  327    !,
  328    (   rb_lookup(Name, Pool, State)
  329    ->  findall(P, pool_property(P, Pool), List),
  330        reply(For, List)
  331    ;   reply_error(For, existence_error(thread_pool, Name))
  332    ).
  333update_thread_pool(Message, State0, State) :-
  334    arg(1, Message, Name),
  335    (   rb_lookup(Name, Pool0, State0)
  336    ->  update_pool(Message, Pool0, Pool),
  337        rb_update(State0, Name, Pool, State)
  338    ;   State = State0,
  339        (   Message = create(Name, _, For, _, _, _)
  340        ->  reply_error(For, existence_error(thread_pool, Name))
  341        ;   true
  342        )
  343    ).
  344
  345pool_property(options(Options),
  346              tpool(Options, _Free, _Size, _WP, _WPT, _Members)).
  347pool_property(backlog(Size),
  348              tpool(_, _Free, _Size, WP, WPT, _Members)) :-
  349    diff_list_length(WP, WPT, Size).
  350pool_property(free(Free),
  351              tpool(_, Free, _Size, _, _, _)).
  352pool_property(size(Size),
  353              tpool(_, _Free, Size, _, _, _)).
  354pool_property(running(Count),
  355              tpool(_, Free, Size, _, _, _)) :-
  356    Count is Size - Free.
  357pool_property(members(IDList),
  358              tpool(_, _, _, _, _, IDList)).
  359
  360diff_list_length(List, Tail, Size) :-
  361    '$skip_list'(Length, List, Rest),
  362    (   Rest == Tail
  363    ->  Size = Length
  364    ;   type_error(difference_list, List/Tail)
  365    ).
  366
  367
  368%!  update_pool(+Message, +Pool0, -Pool) is det.
  369%
  370%   Deal with create requests and  completion   messages  on a given
  371%   pool.  There are two messages:
  372%
  373%       * create(PoolName, Goal, ForThread, Wait, Id, Options)
  374%       Create a new thread on behalf of ForThread.  There are
  375%       two cases:
  376%            * Free slots: create the thread
  377%            * No free slots: error or add to waiting
  378%       * exitted(PoolName, Thread)
  379%       A thread completed.  If there is a request waiting,
  380%       create a new one.
  381
  382update_pool(create(Name, Goal, For, _, Id, MyOptions),
  383            tpool(Options, Free0, Size, WP, WPT, Members0),
  384            tpool(Options, Free, Size, WP, WPT, Members)) :-
  385    succ(Free, Free0),
  386    !,
  387    merge_options(MyOptions, Options, ThreadOptions),
  388    select_option(at_exit(AtExit), ThreadOptions, ThreadOptions1, true),
  389    catch(thread_create(Goal, Id,
  390                        [ at_exit(worker_exitted(Name, Id, AtExit))
  391                        | ThreadOptions1
  392                        ]),
  393          E, true),
  394    (   var(E)
  395    ->  Members = [Id|Members0],
  396        reply(For, Id)
  397    ;   reply_error(For, E),
  398        Members = Members0
  399    ).
  400update_pool(Create,
  401            tpool(Options, 0, Size, WP, WPT0, Members),
  402            tpool(Options, 0, Size, WP, WPT, Members)) :-
  403    Create = create(Name, _Goal, For, Wait, _, _Options),
  404    !,
  405    option(backlog(BackLog), Options, infinite),
  406    (   can_delay(Wait, BackLog, WP, WPT0)
  407    ->  WPT0 = [Create|WPT],
  408        debug(thread_pool, 'Delaying ~p', [Create])
  409    ;   WPT = WPT0,
  410        reply_error(For, resource_error(threads_in_pool(Name)))
  411    ).
  412update_pool(exitted(_Name, Id),
  413            tpool(Options, Free0, Size, WP0, WPT, Members0),
  414            Pool) :-
  415    succ(Free0, Free),
  416    delete(Members0, Id, Members1),
  417    Pool1 = tpool(Options, Free, Size, WP, WPT, Members1),
  418    (   WP0 == WPT
  419    ->  WP = WP0,
  420        Pool = Pool1
  421    ;   WP0 = [Waiting|WP],
  422        debug(thread_pool, 'Start delayed ~p', [Waiting]),
  423        update_pool(Waiting, Pool1, Pool)
  424    ).
  425
  426
  427can_delay(true, infinite, _, _) :- !.
  428can_delay(true, BackLog, WP, WPT) :-
  429    diff_list_length(WP, WPT, Size),
  430    BackLog > Size.
  431
  432%!  worker_exitted(+PoolName, +WorkerId, :AtExit)
  433%
  434%   It is possible that  '__thread_pool_manager'   no  longer exists
  435%   while closing down the process because   the  manager was killed
  436%   before the worker.
  437%
  438%   @tbd Find a way to discover that we are terminating Prolog.
  439
  440:- public
  441    worker_exitted/3.  442
  443worker_exitted(Name, Id, AtExit) :-
  444    catch(thread_send_message('__thread_pool_manager', exitted(Name, Id)),
  445          _, true),
  446    call(AtExit).
  447
  448
  449                 /*******************************
  450                 *             UTIL             *
  451                 *******************************/
  452
  453reply(To, Term) :-
  454    thread_send_message(To, thread_pool(true(Term))).
  455
  456reply_error(To, Error) :-
  457    thread_send_message(To, thread_pool(error(Error, _))).
  458
  459wait_reply :-
  460    thread_get_message(thread_pool(Result)),
  461    (   Result == true
  462    ->  true
  463    ;   Result == fail
  464    ->  fail
  465    ;   throw(Result)
  466    ).
  467
  468wait_reply(Value) :-
  469    thread_get_message(thread_pool(Reply)),
  470    (   Reply = true(Value0)
  471    ->  Value = Value0
  472    ;   Reply == fail
  473    ->  fail
  474    ;   throw(Reply)
  475    ).
  476
  477
  478                 /*******************************
  479                 *             HOOKS            *
  480                 *******************************/
  481
  482%!  create_pool(+PoolName) is semidet.
  483%
  484%   Hook to create a thread  pool  lazily.   The  hook  is called if
  485%   thread_create_in_pool/4 discovers that the thread  pool does not
  486%   exist. If the  hook   succeeds,  thread_create_in_pool/4 retries
  487%   creating the thread. For  example,  we   can  use  the following
  488%   declaration to create threads in the pool =media=, which holds a
  489%   maximum of 20 threads.
  490%
  491%     ==
  492%     :- multifile thread_pool:create_pool/1.
  493%
  494%     thread_pool:create_pool(media) :-
  495%         thread_pool_create(media, 20, []).
  496%     ==
  497
  498                 /*******************************
  499                 *            MESSAGES          *
  500                 *******************************/
  501:- multifile
  502    prolog:message/3.  503
  504prolog:message(thread_pool(Message)) -->
  505    message(Message).
  506
  507message(manager_died(Status)) -->
  508    [ 'Thread-pool: manager died on status ~p; restarting'-[Status] ]