Implement parallel jobs (-j option).
[goals.git] / src / jobs.ml
1 (* Goals parallel jobs.
2  * Copyright (C) 2020 Richard W.M. Jones
3  * Copyright (C) 2020 Red Hat Inc.
4  *
5  * This program is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published by
7  * the Free Software Foundation; either version 2 of the License, or
8  * (at your option) any later version.
9  *
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18  *)
19
20 module type Key = sig
21   type t
22   val compare : t -> t -> int
23   val to_string : t -> string
24 end
25
26 module type Jobs = sig
27   type key
28   type group
29   val new_group : unit -> group
30   val start : group -> key -> (unit -> unit) -> unit
31   val wait : group -> unit
32 end
33
34 module Make (K : Key) = struct
35   type key = K.t
36
37   type state = Waiting | Running | Done
38   type job = {
39     mutable state : state;
40     f : unit -> unit;           (* The function to run the job. *)
41   }
42
43   type queue = {
44     (* Lock preventing multiple jobs with the same key from
45      * running at the same time.
46      *)
47     q_lock : Mutex.t;
48     mutable q : job list;       (* List of jobs on this queue. *)
49   }
50   let new_queue () = { q_lock = Mutex.create (); q = [] }
51
52   (* All of the shared state below is protected by this lock that
53    * you must hold before using any of it.
54    *)
55   let lock = Mutex.create ()
56
57   (* Jobs are queued on separate queues according to their key.
58    * qs is a map of the key to the list of jobs in that queue.
59    *)
60   module Qs = Map.Make (K)
61   let qs = ref Qs.empty
62
63   (* Threads which are idle wait on this condition.  This is
64    * signalled when:
65    *  - a new job is added (idle threads may be able to run it)
66    *  - a job finishes (idle threads may be able to run another
67    *    job which has the same key as the one which finished)
68    *)
69   let idle = Condition.create ()
70
71   (* Threads which are running or idle but NOT waiting.  This
72    * starts as one because the main thread is running.  A thread
73    * which is waiting is essentially blocking another job which
74    * could run, so we should start a new thread.  A thread which
75    * is idle on the other hand is not blocking anything from
76    * running, it's idle because there is nothing that can be run.
77    *
78    * We aim to keep this <= Cmdline.nr_jobs.
79    *)
80   let ready = ref 1
81
82   (* The worker thread. *)
83   let rec worker _ =
84     let id = Thread.id (Thread.self ()) in
85     Mutex.lock lock;
86     incr ready;
87     while !ready <= Cmdline.nr_jobs do
88       (* See if there's any queue with a job which is ready to run. *)
89       Cmdline.debug "thread %d: checking for a runnable queue" id;
90       match get_runnable_queue () with
91       | None ->
92          (* Nothing that we can run, go idle.  This also drops
93           * the lock so other threads can examine the queue.
94           *)
95          Cmdline.debug "thread %d: idle" id;
96          Condition.wait idle lock;
97       | Some q ->
98          (* Note that q.q_lock is now held by this thread, and q.q
99           * is non-empty.  Pick the job off the head of this queue.
100           *)
101          let job = List.hd q.q in
102          q.q <- List.tl q.q;
103
104          (* Run the job, dropping the main lock while running. *)
105          job.state <- Running;
106          Mutex.unlock lock;
107          Cmdline.debug "thread %d: running job" id;
108          job.f ();
109          Cmdline.debug "thread %d: finished job" id;
110          Mutex.lock lock;
111          job.state <- Done;
112          (* Since we have finished a job, it may be that other
113           * idle threads could now run (if a job with the same
114           * key is waiting).
115           *)
116          Mutex.unlock q.q_lock;
117          Condition.broadcast idle
118     done;
119     decr ready;
120     Mutex.unlock lock
121
122   (* Check all the queues to see if there is any job which can run.
123    * The lock must be held when calling this function.  This
124    * locks the queue if it finds one.
125    *)
126   and get_runnable_queue () =
127     try
128       let qs = List.map snd (Qs.bindings !qs) in
129       Some (List.find is_runnable_queue qs)
130     with
131       Not_found -> None
132
133   (* Return true iff the queue contains jobs and no existing job
134    * from this queue is already running.  This locks the queue
135    * if it returns true.
136    *)
137   and is_runnable_queue = function
138     | { q = [] } -> false
139     | { q_lock } -> Mutex.try_lock q_lock
140
141   (* A group is simply a list of jobs. *)
142   type group = job list ref
143   let new_group () = ref []
144
145   (* Submit a new job. *)
146   let start group key f =
147     let id = Thread.id (Thread.self ()) in
148     Cmdline.debug "thread %d: submitting new job" id;
149     Mutex.lock lock;
150     let job = { state = Waiting; f } in
151     group := job :: !group;
152
153     (* Put the job on the queue associated with this key. *)
154     let q =
155       try Qs.find key !qs
156       with Not_found ->
157         let q = new_queue () in (* Allocate a new queue for this key. *)
158         qs := Qs.add key q !qs;
159         q in
160     q.q <- q.q @ [job];
161
162     (* Wake up any idle threads. *)
163     Condition.signal idle;
164     Mutex.unlock lock
165
166   (* Wait for all jobs in the group to be done. *)
167   let rec wait group =
168     let id = Thread.id (Thread.self ()) in
169     Mutex.lock lock;
170     while not (all_done group); do
171       decr ready;
172       (* Start more threads if fewer than nr_jobs threads are ready. *)
173       let needed = Cmdline.nr_jobs - !ready in
174       if needed > 0 then
175         ignore (Array.init needed (Thread.create worker));
176
177       Cmdline.debug "thread %d: waiting for group to complete" id;
178       Condition.wait idle lock;
179       incr ready
180     done;
181     Mutex.unlock lock
182
183   (* Test if all jobs in a group are done.  Note you must hold
184    * the lock.
185    *)
186   and all_done group = List.for_all (fun { state } -> state = Done) !group
187
188 end