cdf4e29a3e0461dd37f236412e8d02adebb4efd2
[goals.git] / src / jobs.ml
1 (* Goals parallel jobs.
2  * Copyright (C) 2020 Richard W.M. Jones
3  * Copyright (C) 2020 Red Hat Inc.
4  *
5  * This program is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published by
7  * the Free Software Foundation; either version 2 of the License, or
8  * (at your option) any later version.
9  *
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18  *)
19
20 open Utils
21
22 module type Key = sig
23   type t
24   val compare : t -> t -> int
25   val to_string : t -> string
26 end
27
28 module type Jobs = sig
29   type key
30   type group
31   val new_group : unit -> group
32   val start : group -> key -> (unit -> unit) -> unit
33   val wait : group -> unit
34   val stop_all : unit -> unit
35 end
36
37 module Make (K : Key) = struct
38   type key = K.t
39
40   type state = Waiting | Running | Done
41   type job = {
42     mutable state : state;
43     f : unit -> unit;           (* The function to run the job. *)
44     mutable exn : exn option;   (* If the job raised an exception. *)
45   }
46
47   type queue = {
48     (* Lock preventing multiple jobs with the same key from
49      * running at the same time.
50      *)
51     q_lock : Mutex.t;
52     mutable q : job list;       (* List of jobs on this queue. *)
53   }
54   let new_queue () = { q_lock = Mutex.create (); q = [] }
55
56   (* All of the shared state below is protected by this lock that
57    * you must hold before using any of it.
58    *)
59   let lock = Mutex.create ()
60
61   (* Jobs are queued on separate queues according to their key.
62    * qs is a map of the key to the list of jobs in that queue.
63    *)
64   module Qs = Map.Make (K)
65   let qs = ref Qs.empty
66
67   (* Threads which are idle wait on this condition.  This is
68    * signalled when:
69    *  - a new job is added (idle threads may be able to run it)
70    *  - a job finishes (idle threads may be able to run another
71    *    job which has the same key as the one which finished)
72    *)
73   let idle = Condition.create ()
74
75   (* Threads which are running or idle but NOT waiting.  This
76    * starts as one because the main thread is running.  A thread
77    * which is waiting is essentially blocking another job which
78    * could run, so we should start a new thread.  A thread which
79    * is idle on the other hand is not blocking anything from
80    * running, it's idle because there is nothing that can be run.
81    *
82    * We aim to keep this <= Cmdline.nr_jobs.
83    *)
84   let ready = ref 1
85
86   (* If stop_all is called, this is set to true and we stop
87    * running new jobs.
88    *)
89   let stop = ref false
90
91   (* The worker thread. *)
92   let rec worker _ =
93     let id = Thread.id (Thread.self ()) in
94     Mutex.lock lock;
95     incr ready;
96     while not !stop && !ready <= Cmdline.nr_jobs () do
97       (* See if there's any queue with a job which is ready to run. *)
98       Cmdline.debug "thread %d: checking for a runnable queue" id;
99       match get_runnable_queue () with
100       | None ->
101          (* Nothing that we can run, go idle.  This also drops
102           * the lock so other threads can examine the queue.
103           *)
104          Cmdline.debug "thread %d: idle" id;
105          Condition.wait idle lock;
106       | Some q ->
107          (* Note that q.q_lock is now held by this thread, and q.q
108           * is non-empty.  Pick the job off the head of this queue.
109           *)
110          let job = List.hd q.q in
111          q.q <- List.tl q.q;
112
113          (* Run the job, dropping the main lock while running. *)
114          job.state <- Running;
115          Mutex.unlock lock;
116          Cmdline.debug "thread %d: running job" id;
117          let exn = try job.f (); None with exn -> Some exn in
118          Cmdline.debug "thread %d: finished job" id;
119          Mutex.lock lock;
120          job.state <- Done;
121          job.exn <- exn;
122          (* Since we have finished a job, it may be that other
123           * idle threads could now run (if a job with the same
124           * key is waiting).
125           *)
126          Mutex.unlock q.q_lock;
127          Condition.broadcast idle
128     done;
129     decr ready;
130     Mutex.unlock lock
131
132   (* Check all the queues to see if there is any job which can run.
133    * The lock must be held when calling this function.  This
134    * locks the queue if it finds one.
135    *)
136   and get_runnable_queue () =
137     try
138       let qs = List.map snd (Qs.bindings !qs) in
139       Some (List.find is_runnable_queue qs)
140     with
141       Not_found -> None
142
143   (* Return true iff the queue contains jobs and no existing job
144    * from this queue is already running.  This locks the queue
145    * if it returns true.
146    *)
147   and is_runnable_queue = function
148     | { q = [] } -> false
149     | { q_lock } -> Mutex.try_lock q_lock
150
151   (* A group is simply a list of jobs. *)
152   type group = job list ref
153   let new_group () = ref []
154
155   (* Submit a new job. *)
156   let start group key f =
157     let id = Thread.id (Thread.self ()) in
158     Cmdline.debug "thread %d: submitting new job" id;
159     Mutex.lock lock;
160     let job = { state = Waiting; f; exn = None } in
161     group := job :: !group;
162
163     (* Put the job on the queue associated with this key. *)
164     let q =
165       try Qs.find key !qs
166       with Not_found ->
167         let q = new_queue () in (* Allocate a new queue for this key. *)
168         qs := Qs.add key q !qs;
169         q in
170     q.q <- q.q @ [job];
171
172     (* Wake up any idle threads. *)
173     Condition.signal idle;
174     Mutex.unlock lock
175
176   (* Wait for all jobs in the group to be done. *)
177   let rec wait group =
178     let id = Thread.id (Thread.self ()) in
179     Mutex.lock lock;
180     while not !stop && not (all_done group); do
181       decr ready;
182       (* Start more threads if fewer than nr_jobs threads are ready. *)
183       let needed = Cmdline.nr_jobs () - !ready in
184       if not !stop && needed > 0 then
185         ignore (Array.init needed (Thread.create worker));
186
187       Cmdline.debug "thread %d: waiting for group to complete" id;
188       Condition.wait idle lock;
189       incr ready
190     done;
191     Mutex.unlock lock;
192
193     (* If any job in the group raised an exception, we re-raise it here.
194      * We can only reraise the first exception though so if there are
195      * multiple failures then the exceptions are lost, but that doesn't
196      * really matter as long as goals exits with an error.  Note that if
197      * we are being called from a worker, then the exception we raise
198      * here may be saved and reraised in another wait.
199      *)
200     List.iter (
201       fun job ->
202         match job.exn with
203         | None -> ()
204         | Some exn ->
205            Cmdline.debug "thread %d: raising exception in wait: %s"
206              id (Printexc.to_string exn);
207            stop := true;
208            raise exn
209     ) !group;
210
211     (* It can happen that we didn't finish all jobs, especially if
212      * the stop flag was set in another thread.  In this case we
213      * shouldn't just return as if everything is fine because it
214      * will cause the job to continue.  Instead make sure we
215      * raise an error in this case.
216      *)
217     if not (all_done group) then
218       failwithf "job cancelled because of earlier error"
219
220   (* Test if all jobs in a group are done.  Note you must hold
221    * the lock.
222    *)
223   and all_done group = List.for_all (fun { state } -> state = Done) !group
224
225   let stop_all () =
226     Mutex.lock lock;
227     (* All threads will exit after running jobs if this is set. *)
228     stop := true;
229     while !ready > 1 do
230       Condition.wait idle lock;
231     done;
232     Mutex.unlock lock
233
234 end