X-Git-Url: http://git.annexia.org/?a=blobdiff_plain;f=src%2Fjobs.ml;h=085d4660c175772f0f87814cf176eaf5210fb58a;hb=a70987f316ab4b948bf941ddea8fb6ccef09da4f;hp=cdf4e29a3e0461dd37f236412e8d02adebb4efd2;hpb=0aead2922062dc8a4e7dc88fe7776f92ac6c232a;p=goals.git diff --git a/src/jobs.ml b/src/jobs.ml index cdf4e29..085d466 100644 --- a/src/jobs.ml +++ b/src/jobs.ml @@ -19,216 +19,71 @@ open Utils -module type Key = sig - type t - val compare : t -> t -> int - val to_string : t -> string -end +type 'a next = Job of 'a * (unit -> unit) | Complete | Not_ready -module type Jobs = sig - type key - type group - val new_group : unit -> group - val start : group -> key -> (unit -> unit) -> unit - val wait : group -> unit - val stop_all : unit -> unit -end +type 'a retire = 'a -> unit -module Make (K : Key) = struct - type key = K.t +type 'a to_string = 'a -> string - type state = Waiting | Running | Done - type job = { - mutable state : state; - f : unit -> unit; (* The function to run the job. *) - mutable exn : exn option; (* If the job raised an exception. *) - } +let run next_job retire_job string_of_job = + (* Number of running threads <= Cmdline.nr_jobs. *) + let running = ref 0 in - type queue = { - (* Lock preventing multiple jobs with the same key from - * running at the same time. - *) - q_lock : Mutex.t; - mutable q : job list; (* List of jobs on this queue. *) - } - let new_queue () = { q_lock = Mutex.create (); q = [] } + (* Lock and condition for when a thread exits. *) + let lock = Mutex.create () and cond = Condition.create () in - (* All of the shared state below is protected by this lock that - * you must hold before using any of it. - *) - let lock = Mutex.create () + (* If a job throws an exception it is saved here. *) + let last_exn = ref None in - (* Jobs are queued on separate queues according to their key. - * qs is a map of the key to the list of jobs in that queue. - *) - module Qs = Map.Make (K) - let qs = ref Qs.empty + (* This is the background thread which runs each job. *) + let runner (job, f) = + let exn = try f (); None with exn -> Some exn in - (* Threads which are idle wait on this condition. This is - * signalled when: - * - a new job is added (idle threads may be able to run it) - * - a job finishes (idle threads may be able to run another - * job which has the same key as the one which finished) - *) - let idle = Condition.create () - - (* Threads which are running or idle but NOT waiting. This - * starts as one because the main thread is running. A thread - * which is waiting is essentially blocking another job which - * could run, so we should start a new thread. A thread which - * is idle on the other hand is not blocking anything from - * running, it's idle because there is nothing that can be run. - * - * We aim to keep this <= Cmdline.nr_jobs. - *) - let ready = ref 1 - - (* If stop_all is called, this is set to true and we stop - * running new jobs. - *) - let stop = ref false - - (* The worker thread. *) - let rec worker _ = - let id = Thread.id (Thread.self ()) in - Mutex.lock lock; - incr ready; - while not !stop && !ready <= Cmdline.nr_jobs () do - (* See if there's any queue with a job which is ready to run. *) - Cmdline.debug "thread %d: checking for a runnable queue" id; - match get_runnable_queue () with - | None -> - (* Nothing that we can run, go idle. This also drops - * the lock so other threads can examine the queue. - *) - Cmdline.debug "thread %d: idle" id; - Condition.wait idle lock; - | Some q -> - (* Note that q.q_lock is now held by this thread, and q.q - * is non-empty. Pick the job off the head of this queue. - *) - let job = List.hd q.q in - q.q <- List.tl q.q; - - (* Run the job, dropping the main lock while running. *) - job.state <- Running; - Mutex.unlock lock; - Cmdline.debug "thread %d: running job" id; - let exn = try job.f (); None with exn -> Some exn in - Cmdline.debug "thread %d: finished job" id; - Mutex.lock lock; - job.state <- Done; - job.exn <- exn; - (* Since we have finished a job, it may be that other - * idle threads could now run (if a job with the same - * key is waiting). - *) - Mutex.unlock q.q_lock; - Condition.broadcast idle - done; - decr ready; - Mutex.unlock lock - - (* Check all the queues to see if there is any job which can run. - * The lock must be held when calling this function. This - * locks the queue if it finds one. - *) - and get_runnable_queue () = - try - let qs = List.map snd (Qs.bindings !qs) in - Some (List.find is_runnable_queue qs) - with - Not_found -> None - - (* Return true iff the queue contains jobs and no existing job - * from this queue is already running. This locks the queue - * if it returns true. - *) - and is_runnable_queue = function - | { q = [] } -> false - | { q_lock } -> Mutex.try_lock q_lock - - (* A group is simply a list of jobs. *) - type group = job list ref - let new_group () = ref [] - - (* Submit a new job. *) - let start group key f = - let id = Thread.id (Thread.self ()) in - Cmdline.debug "thread %d: submitting new job" id; Mutex.lock lock; - let job = { state = Waiting; f; exn = None } in - group := job :: !group; - - (* Put the job on the queue associated with this key. *) - let q = - try Qs.find key !qs - with Not_found -> - let q = new_queue () in (* Allocate a new queue for this key. *) - qs := Qs.add key q !qs; - q in - q.q <- q.q @ [job]; - - (* Wake up any idle threads. *) - Condition.signal idle; + (match exn with + | None -> retire_job job + | Some exn -> last_exn := Some exn + ); + decr running; + Condition.signal cond; Mutex.unlock lock - - (* Wait for all jobs in the group to be done. *) - let rec wait group = - let id = Thread.id (Thread.self ()) in - Mutex.lock lock; - while not !stop && not (all_done group); do - decr ready; - (* Start more threads if fewer than nr_jobs threads are ready. *) - let needed = Cmdline.nr_jobs () - !ready in - if not !stop && needed > 0 then - ignore (Array.init needed (Thread.create worker)); - - Cmdline.debug "thread %d: waiting for group to complete" id; - Condition.wait idle lock; - incr ready - done; - Mutex.unlock lock; - - (* If any job in the group raised an exception, we re-raise it here. - * We can only reraise the first exception though so if there are - * multiple failures then the exceptions are lost, but that doesn't - * really matter as long as goals exits with an error. Note that if - * we are being called from a worker, then the exception we raise - * here may be saved and reraised in another wait. - *) - List.iter ( - fun job -> - match job.exn with - | None -> () - | Some exn -> - Cmdline.debug "thread %d: raising exception in wait: %s" - id (Printexc.to_string exn); - stop := true; - raise exn - ) !group; - - (* It can happen that we didn't finish all jobs, especially if - * the stop flag was set in another thread. In this case we - * shouldn't just return as if everything is fine because it - * will cause the job to continue. Instead make sure we - * raise an error in this case. - *) - if not (all_done group) then - failwithf "job cancelled because of earlier error" - - (* Test if all jobs in a group are done. Note you must hold - * the lock. - *) - and all_done group = List.for_all (fun { state } -> state = Done) !group - - let stop_all () = - Mutex.lock lock; - (* All threads will exit after running jobs if this is set. *) - stop := true; - while !ready > 1 do - Condition.wait idle lock; - done; - Mutex.unlock lock - -end + in + + let rec loop () = + if !last_exn = None then ( + match next_job () with + | Complete -> + if !running > 0 then ( + Cmdline.debug "%d/%d threads running, waiting for completion" + !running (Cmdline.nr_jobs ()); + Condition.wait cond lock; + loop () + ) + | Not_ready -> + assert (!running > 0); + Cmdline.debug "%d/%d threads running, waiting for dependencies" + !running (Cmdline.nr_jobs ()); + (* Wait for any running thread to finish. *) + Condition.wait cond lock; + loop () + | Job (job, f) -> + incr running; + ignore (Thread.create runner (job, f)); + (* If we've reached the limit on number of threads, wait + * for any running thread to finish. + *) + while !running >= Cmdline.nr_jobs () do + Condition.wait cond lock + done; + loop () + ) + in + Mutex.lock lock; + loop (); + let exn = !last_exn in + Mutex.unlock lock; + + (* Re-raise the saved exception from the job which failed. *) + match exn with + | None -> () + | Some exn -> raise exn