jobs: Introduce stop_all function to stop job submission on error.
[goals.git] / src / jobs.ml
1 (* Goals parallel jobs.
2  * Copyright (C) 2020 Richard W.M. Jones
3  * Copyright (C) 2020 Red Hat Inc.
4  *
5  * This program is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published by
7  * the Free Software Foundation; either version 2 of the License, or
8  * (at your option) any later version.
9  *
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18  *)
19
20 module type Key = sig
21   type t
22   val compare : t -> t -> int
23   val to_string : t -> string
24 end
25
26 module type Jobs = sig
27   type key
28   type group
29   val new_group : unit -> group
30   val start : group -> key -> (unit -> unit) -> unit
31   val wait : group -> unit
32   val stop_all : unit -> unit
33 end
34
35 module Make (K : Key) = struct
36   type key = K.t
37
38   type state = Waiting | Running | Done
39   type job = {
40     mutable state : state;
41     f : unit -> unit;           (* The function to run the job. *)
42   }
43
44   type queue = {
45     (* Lock preventing multiple jobs with the same key from
46      * running at the same time.
47      *)
48     q_lock : Mutex.t;
49     mutable q : job list;       (* List of jobs on this queue. *)
50   }
51   let new_queue () = { q_lock = Mutex.create (); q = [] }
52
53   (* All of the shared state below is protected by this lock that
54    * you must hold before using any of it.
55    *)
56   let lock = Mutex.create ()
57
58   (* Jobs are queued on separate queues according to their key.
59    * qs is a map of the key to the list of jobs in that queue.
60    *)
61   module Qs = Map.Make (K)
62   let qs = ref Qs.empty
63
64   (* Threads which are idle wait on this condition.  This is
65    * signalled when:
66    *  - a new job is added (idle threads may be able to run it)
67    *  - a job finishes (idle threads may be able to run another
68    *    job which has the same key as the one which finished)
69    *)
70   let idle = Condition.create ()
71
72   (* Threads which are running or idle but NOT waiting.  This
73    * starts as one because the main thread is running.  A thread
74    * which is waiting is essentially blocking another job which
75    * could run, so we should start a new thread.  A thread which
76    * is idle on the other hand is not blocking anything from
77    * running, it's idle because there is nothing that can be run.
78    *
79    * We aim to keep this <= Cmdline.nr_jobs.
80    *)
81   let ready = ref 1
82
83   (* If stop_all is called, this is set to true and we stop
84    * running new jobs.
85    *)
86   let stop = ref false
87
88   (* The worker thread. *)
89   let rec worker _ =
90     let id = Thread.id (Thread.self ()) in
91     Mutex.lock lock;
92     incr ready;
93     while not !stop && !ready <= Cmdline.nr_jobs do
94       (* See if there's any queue with a job which is ready to run. *)
95       Cmdline.debug "thread %d: checking for a runnable queue" id;
96       match get_runnable_queue () with
97       | None ->
98          (* Nothing that we can run, go idle.  This also drops
99           * the lock so other threads can examine the queue.
100           *)
101          Cmdline.debug "thread %d: idle" id;
102          Condition.wait idle lock;
103       | Some q ->
104          (* Note that q.q_lock is now held by this thread, and q.q
105           * is non-empty.  Pick the job off the head of this queue.
106           *)
107          let job = List.hd q.q in
108          q.q <- List.tl q.q;
109
110          (* Run the job, dropping the main lock while running. *)
111          job.state <- Running;
112          Mutex.unlock lock;
113          Cmdline.debug "thread %d: running job" id;
114          job.f ();
115          Cmdline.debug "thread %d: finished job" id;
116          Mutex.lock lock;
117          job.state <- Done;
118          (* Since we have finished a job, it may be that other
119           * idle threads could now run (if a job with the same
120           * key is waiting).
121           *)
122          Mutex.unlock q.q_lock;
123          Condition.broadcast idle
124     done;
125     decr ready;
126     Mutex.unlock lock
127
128   (* Check all the queues to see if there is any job which can run.
129    * The lock must be held when calling this function.  This
130    * locks the queue if it finds one.
131    *)
132   and get_runnable_queue () =
133     try
134       let qs = List.map snd (Qs.bindings !qs) in
135       Some (List.find is_runnable_queue qs)
136     with
137       Not_found -> None
138
139   (* Return true iff the queue contains jobs and no existing job
140    * from this queue is already running.  This locks the queue
141    * if it returns true.
142    *)
143   and is_runnable_queue = function
144     | { q = [] } -> false
145     | { q_lock } -> Mutex.try_lock q_lock
146
147   (* A group is simply a list of jobs. *)
148   type group = job list ref
149   let new_group () = ref []
150
151   (* Submit a new job. *)
152   let start group key f =
153     let id = Thread.id (Thread.self ()) in
154     Cmdline.debug "thread %d: submitting new job" id;
155     Mutex.lock lock;
156     let job = { state = Waiting; f } in
157     group := job :: !group;
158
159     (* Put the job on the queue associated with this key. *)
160     let q =
161       try Qs.find key !qs
162       with Not_found ->
163         let q = new_queue () in (* Allocate a new queue for this key. *)
164         qs := Qs.add key q !qs;
165         q in
166     q.q <- q.q @ [job];
167
168     (* Wake up any idle threads. *)
169     Condition.signal idle;
170     Mutex.unlock lock
171
172   (* Wait for all jobs in the group to be done. *)
173   let rec wait group =
174     let id = Thread.id (Thread.self ()) in
175     Mutex.lock lock;
176     while not (all_done group); do
177       decr ready;
178       (* Start more threads if fewer than nr_jobs threads are ready. *)
179       let needed = Cmdline.nr_jobs - !ready in
180       if needed > 0 then
181         ignore (Array.init needed (Thread.create worker));
182
183       Cmdline.debug "thread %d: waiting for group to complete" id;
184       Condition.wait idle lock;
185       incr ready
186     done;
187     Mutex.unlock lock
188
189   (* Test if all jobs in a group are done.  Note you must hold
190    * the lock.
191    *)
192   and all_done group = List.for_all (fun { state } -> state = Done) !group
193
194   let stop_all () =
195     Mutex.lock lock;
196     (* All threads will exit after running jobs if this is set. *)
197     stop := true;
198     while !ready > 1 do
199       Condition.wait idle lock;
200     done;
201     Mutex.unlock lock
202
203 end