* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*)
+open Utils
+
module type Key = sig
type t
val compare : t -> t -> int
type job = {
mutable state : state;
f : unit -> unit; (* The function to run the job. *)
+ mutable exn : exn option; (* If the job raised an exception. *)
}
type queue = {
job.state <- Running;
Mutex.unlock lock;
Cmdline.debug "thread %d: running job" id;
- job.f ();
+ let exn = try job.f (); None with exn -> Some exn in
Cmdline.debug "thread %d: finished job" id;
Mutex.lock lock;
job.state <- Done;
+ job.exn <- exn;
(* Since we have finished a job, it may be that other
* idle threads could now run (if a job with the same
* key is waiting).
let id = Thread.id (Thread.self ()) in
Cmdline.debug "thread %d: submitting new job" id;
Mutex.lock lock;
- let job = { state = Waiting; f } in
+ let job = { state = Waiting; f; exn = None } in
group := job :: !group;
(* Put the job on the queue associated with this key. *)
let rec wait group =
let id = Thread.id (Thread.self ()) in
Mutex.lock lock;
- while not (all_done group); do
+ while not !stop && not (all_done group); do
decr ready;
(* Start more threads if fewer than nr_jobs threads are ready. *)
let needed = Cmdline.nr_jobs () - !ready in
- if needed > 0 then
+ if not !stop && needed > 0 then
ignore (Array.init needed (Thread.create worker));
Cmdline.debug "thread %d: waiting for group to complete" id;
Condition.wait idle lock;
incr ready
done;
- Mutex.unlock lock
+ Mutex.unlock lock;
+
+ (* If any job in the group raised an exception, we re-raise it here.
+ * We can only reraise the first exception though so if there are
+ * multiple failures then the exceptions are lost, but that doesn't
+ * really matter as long as goals exits with an error. Note that if
+ * we are being called from a worker, then the exception we raise
+ * here may be saved and reraised in another wait.
+ *)
+ List.iter (
+ fun job ->
+ match job.exn with
+ | None -> ()
+ | Some exn ->
+ Cmdline.debug "thread %d: raising exception in wait: %s"
+ id (Printexc.to_string exn);
+ stop := true;
+ raise exn
+ ) !group;
+
+ (* It can happen that we didn't finish all jobs, especially if
+ * the stop flag was set in another thread. In this case we
+ * shouldn't just return as if everything is fine because it
+ * will cause the job to continue. Instead make sure we
+ * raise an error in this case.
+ *)
+ if not (all_done group) then
+ failwithf "job cancelled because of earlier error"
(* Test if all jobs in a group are done. Note you must hold
* the lock.