Split implementation into dependency analysis and traversal.
[goals.git] / src / jobs.ml
index cdf4e29..085d466 100644 (file)
 
 open Utils
 
-module type Key = sig
-  type t
-  val compare : t -> t -> int
-  val to_string : t -> string
-end
+type 'a next = Job of 'a * (unit -> unit) | Complete | Not_ready
 
-module type Jobs = sig
-  type key
-  type group
-  val new_group : unit -> group
-  val start : group -> key -> (unit -> unit) -> unit
-  val wait : group -> unit
-  val stop_all : unit -> unit
-end
+type 'a retire = 'a -> unit
 
-module Make (K : Key) = struct
-  type key = K.t
+type 'a to_string = 'a -> string
 
-  type state = Waiting | Running | Done
-  type job = {
-    mutable state : state;
-    f : unit -> unit;           (* The function to run the job. *)
-    mutable exn : exn option;   (* If the job raised an exception. *)
-  }
+let run next_job retire_job string_of_job =
+  (* Number of running threads <= Cmdline.nr_jobs. *)
+  let running = ref 0 in
 
-  type queue = {
-    (* Lock preventing multiple jobs with the same key from
-     * running at the same time.
-     *)
-    q_lock : Mutex.t;
-    mutable q : job list;       (* List of jobs on this queue. *)
-  }
-  let new_queue () = { q_lock = Mutex.create (); q = [] }
+  (* Lock and condition for when a thread exits. *)
+  let lock = Mutex.create () and cond = Condition.create () in
 
-  (* All of the shared state below is protected by this lock that
-   * you must hold before using any of it.
-   *)
-  let lock = Mutex.create ()
+  (* If a job throws an exception it is saved here. *)
+  let last_exn = ref None in
 
-  (* Jobs are queued on separate queues according to their key.
-   * qs is a map of the key to the list of jobs in that queue.
-   *)
-  module Qs = Map.Make (K)
-  let qs = ref Qs.empty
+  (* This is the background thread which runs each job. *)
+  let runner (job, f) =
+    let exn = try f (); None with exn -> Some exn in
 
-  (* Threads which are idle wait on this condition.  This is
-   * signalled when:
-   *  - a new job is added (idle threads may be able to run it)
-   *  - a job finishes (idle threads may be able to run another
-   *    job which has the same key as the one which finished)
-   *)
-  let idle = Condition.create ()
-
-  (* Threads which are running or idle but NOT waiting.  This
-   * starts as one because the main thread is running.  A thread
-   * which is waiting is essentially blocking another job which
-   * could run, so we should start a new thread.  A thread which
-   * is idle on the other hand is not blocking anything from
-   * running, it's idle because there is nothing that can be run.
-   *
-   * We aim to keep this <= Cmdline.nr_jobs.
-   *)
-  let ready = ref 1
-
-  (* If stop_all is called, this is set to true and we stop
-   * running new jobs.
-   *)
-  let stop = ref false
-
-  (* The worker thread. *)
-  let rec worker _ =
-    let id = Thread.id (Thread.self ()) in
-    Mutex.lock lock;
-    incr ready;
-    while not !stop && !ready <= Cmdline.nr_jobs () do
-      (* See if there's any queue with a job which is ready to run. *)
-      Cmdline.debug "thread %d: checking for a runnable queue" id;
-      match get_runnable_queue () with
-      | None ->
-         (* Nothing that we can run, go idle.  This also drops
-          * the lock so other threads can examine the queue.
-          *)
-         Cmdline.debug "thread %d: idle" id;
-         Condition.wait idle lock;
-      | Some q ->
-         (* Note that q.q_lock is now held by this thread, and q.q
-          * is non-empty.  Pick the job off the head of this queue.
-          *)
-         let job = List.hd q.q in
-         q.q <- List.tl q.q;
-
-         (* Run the job, dropping the main lock while running. *)
-         job.state <- Running;
-         Mutex.unlock lock;
-         Cmdline.debug "thread %d: running job" id;
-         let exn = try job.f (); None with exn -> Some exn in
-         Cmdline.debug "thread %d: finished job" id;
-         Mutex.lock lock;
-         job.state <- Done;
-         job.exn <- exn;
-         (* Since we have finished a job, it may be that other
-          * idle threads could now run (if a job with the same
-          * key is waiting).
-          *)
-         Mutex.unlock q.q_lock;
-         Condition.broadcast idle
-    done;
-    decr ready;
-    Mutex.unlock lock
-
-  (* Check all the queues to see if there is any job which can run.
-   * The lock must be held when calling this function.  This
-   * locks the queue if it finds one.
-   *)
-  and get_runnable_queue () =
-    try
-      let qs = List.map snd (Qs.bindings !qs) in
-      Some (List.find is_runnable_queue qs)
-    with
-      Not_found -> None
-
-  (* Return true iff the queue contains jobs and no existing job
-   * from this queue is already running.  This locks the queue
-   * if it returns true.
-   *)
-  and is_runnable_queue = function
-    | { q = [] } -> false
-    | { q_lock } -> Mutex.try_lock q_lock
-
-  (* A group is simply a list of jobs. *)
-  type group = job list ref
-  let new_group () = ref []
-
-  (* Submit a new job. *)
-  let start group key f =
-    let id = Thread.id (Thread.self ()) in
-    Cmdline.debug "thread %d: submitting new job" id;
     Mutex.lock lock;
-    let job = { state = Waiting; f; exn = None } in
-    group := job :: !group;
-
-    (* Put the job on the queue associated with this key. *)
-    let q =
-      try Qs.find key !qs
-      with Not_found ->
-        let q = new_queue () in (* Allocate a new queue for this key. *)
-        qs := Qs.add key q !qs;
-        q in
-    q.q <- q.q @ [job];
-
-    (* Wake up any idle threads. *)
-    Condition.signal idle;
+    (match exn with
+     | None -> retire_job job
+     | Some exn -> last_exn := Some exn
+    );
+    decr running;
+    Condition.signal cond;
     Mutex.unlock lock
-
-  (* Wait for all jobs in the group to be done. *)
-  let rec wait group =
-    let id = Thread.id (Thread.self ()) in
-    Mutex.lock lock;
-    while not !stop && not (all_done group); do
-      decr ready;
-      (* Start more threads if fewer than nr_jobs threads are ready. *)
-      let needed = Cmdline.nr_jobs () - !ready in
-      if not !stop && needed > 0 then
-        ignore (Array.init needed (Thread.create worker));
-
-      Cmdline.debug "thread %d: waiting for group to complete" id;
-      Condition.wait idle lock;
-      incr ready
-    done;
-    Mutex.unlock lock;
-
-    (* If any job in the group raised an exception, we re-raise it here.
-     * We can only reraise the first exception though so if there are
-     * multiple failures then the exceptions are lost, but that doesn't
-     * really matter as long as goals exits with an error.  Note that if
-     * we are being called from a worker, then the exception we raise
-     * here may be saved and reraised in another wait.
-     *)
-    List.iter (
-      fun job ->
-        match job.exn with
-        | None -> ()
-        | Some exn ->
-           Cmdline.debug "thread %d: raising exception in wait: %s"
-             id (Printexc.to_string exn);
-           stop := true;
-           raise exn
-    ) !group;
-
-    (* It can happen that we didn't finish all jobs, especially if
-     * the stop flag was set in another thread.  In this case we
-     * shouldn't just return as if everything is fine because it
-     * will cause the job to continue.  Instead make sure we
-     * raise an error in this case.
-     *)
-    if not (all_done group) then
-      failwithf "job cancelled because of earlier error"
-
-  (* Test if all jobs in a group are done.  Note you must hold
-   * the lock.
-   *)
-  and all_done group = List.for_all (fun { state } -> state = Done) !group
-
-  let stop_all () =
-    Mutex.lock lock;
-    (* All threads will exit after running jobs if this is set. *)
-    stop := true;
-    while !ready > 1 do
-      Condition.wait idle lock;
-    done;
-    Mutex.unlock lock
-
-end
+  in
+
+  let rec loop () =
+    if !last_exn = None then (
+      match next_job () with
+      | Complete ->
+         if !running > 0 then (
+           Cmdline.debug "%d/%d threads running, waiting for completion"
+             !running (Cmdline.nr_jobs ());
+           Condition.wait cond lock;
+           loop ()
+         )
+      | Not_ready ->
+         assert (!running > 0);
+         Cmdline.debug "%d/%d threads running, waiting for dependencies"
+           !running (Cmdline.nr_jobs ());
+         (* Wait for any running thread to finish. *)
+         Condition.wait cond lock;
+         loop ()
+      | Job (job, f) ->
+         incr running;
+         ignore (Thread.create runner (job, f));
+         (* If we've reached the limit on number of threads, wait
+          * for any running thread to finish.
+          *)
+         while !running >= Cmdline.nr_jobs () do
+           Condition.wait cond lock
+         done;
+         loop ()
+    )
+  in
+  Mutex.lock lock;
+  loop ();
+  let exn = !last_exn in
+  Mutex.unlock lock;
+
+  (* Re-raise the saved exception from the job which failed. *)
+  match exn with
+  | None -> ()
+  | Some exn -> raise exn