1 (* Goals parallel jobs.
2 * Copyright (C) 2020 Richard W.M. Jones
3 * Copyright (C) 2020 Red Hat Inc.
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
24 val compare : t -> t -> int
25 val to_string : t -> string
28 module type Jobs = sig
31 val new_group : unit -> group
32 val start : group -> key -> (unit -> unit) -> unit
33 val wait : group -> unit
34 val stop_all : unit -> unit
37 module Make (K : Key) = struct
40 type state = Waiting | Running | Done
42 mutable state : state;
43 f : unit -> unit; (* The function to run the job. *)
44 mutable exn : exn option; (* If the job raised an exception. *)
48 (* Lock preventing multiple jobs with the same key from
49 * running at the same time.
52 mutable q : job list; (* List of jobs on this queue. *)
54 let new_queue () = { q_lock = Mutex.create (); q = [] }
56 (* All of the shared state below is protected by this lock that
57 * you must hold before using any of it.
59 let lock = Mutex.create ()
61 (* Jobs are queued on separate queues according to their key.
62 * qs is a map of the key to the list of jobs in that queue.
64 module Qs = Map.Make (K)
67 (* Threads which are idle wait on this condition. This is
69 * - a new job is added (idle threads may be able to run it)
70 * - a job finishes (idle threads may be able to run another
71 * job which has the same key as the one which finished)
73 let idle = Condition.create ()
75 (* Threads which are running or idle but NOT waiting. This
76 * starts as one because the main thread is running. A thread
77 * which is waiting is essentially blocking another job which
78 * could run, so we should start a new thread. A thread which
79 * is idle on the other hand is not blocking anything from
80 * running, it's idle because there is nothing that can be run.
82 * We aim to keep this <= Cmdline.nr_jobs.
86 (* If stop_all is called, this is set to true and we stop
91 (* The worker thread. *)
93 let id = Thread.id (Thread.self ()) in
96 while not !stop && !ready <= Cmdline.nr_jobs () do
97 (* See if there's any queue with a job which is ready to run. *)
98 Cmdline.debug "thread %d: checking for a runnable queue" id;
99 match get_runnable_queue () with
101 (* Nothing that we can run, go idle. This also drops
102 * the lock so other threads can examine the queue.
104 Cmdline.debug "thread %d: idle" id;
105 Condition.wait idle lock;
107 (* Note that q.q_lock is now held by this thread, and q.q
108 * is non-empty. Pick the job off the head of this queue.
110 let job = List.hd q.q in
113 (* Run the job, dropping the main lock while running. *)
114 job.state <- Running;
116 Cmdline.debug "thread %d: running job" id;
117 let exn = try job.f (); None with exn -> Some exn in
118 Cmdline.debug "thread %d: finished job" id;
122 (* Since we have finished a job, it may be that other
123 * idle threads could now run (if a job with the same
126 Mutex.unlock q.q_lock;
127 Condition.broadcast idle
132 (* Check all the queues to see if there is any job which can run.
133 * The lock must be held when calling this function. This
134 * locks the queue if it finds one.
136 and get_runnable_queue () =
138 let qs = List.map snd (Qs.bindings !qs) in
139 Some (List.find is_runnable_queue qs)
143 (* Return true iff the queue contains jobs and no existing job
144 * from this queue is already running. This locks the queue
145 * if it returns true.
147 and is_runnable_queue = function
148 | { q = [] } -> false
149 | { q_lock } -> Mutex.try_lock q_lock
151 (* A group is simply a list of jobs. *)
152 type group = job list ref
153 let new_group () = ref []
155 (* Submit a new job. *)
156 let start group key f =
157 let id = Thread.id (Thread.self ()) in
158 Cmdline.debug "thread %d: submitting new job" id;
160 let job = { state = Waiting; f; exn = None } in
161 group := job :: !group;
163 (* Put the job on the queue associated with this key. *)
167 let q = new_queue () in (* Allocate a new queue for this key. *)
168 qs := Qs.add key q !qs;
172 (* Wake up any idle threads. *)
173 Condition.signal idle;
176 (* Wait for all jobs in the group to be done. *)
178 let id = Thread.id (Thread.self ()) in
180 while not !stop && not (all_done group); do
182 (* Start more threads if fewer than nr_jobs threads are ready. *)
183 let needed = Cmdline.nr_jobs () - !ready in
184 if not !stop && needed > 0 then
185 ignore (Array.init needed (Thread.create worker));
187 Cmdline.debug "thread %d: waiting for group to complete" id;
188 Condition.wait idle lock;
193 (* If any job in the group raised an exception, we re-raise it here.
194 * We can only reraise the first exception though so if there are
195 * multiple failures then the exceptions are lost, but that doesn't
196 * really matter as long as goals exits with an error. Note that if
197 * we are being called from a worker, then the exception we raise
198 * here may be saved and reraised in another wait.
205 Cmdline.debug "thread %d: raising exception in wait: %s"
206 id (Printexc.to_string exn);
211 (* It can happen that we didn't finish all jobs, especially if
212 * the stop flag was set in another thread. In this case we
213 * shouldn't just return as if everything is fine because it
214 * will cause the job to continue. Instead make sure we
215 * raise an error in this case.
217 if not (all_done group) then
218 failwithf "job cancelled because of earlier error"
220 (* Test if all jobs in a group are done. Note you must hold
223 and all_done group = List.for_all (fun { state } -> state = Done) !group
227 (* All threads will exit after running jobs if this is set. *)
230 Condition.wait idle lock;