X-Git-Url: http://git.annexia.org/?a=blobdiff_plain;f=src%2Fjobs.ml;h=2e8735e4b0a7bed699ab42c8be5fc36f6659a449;hb=ff4a8c81429495ae914d0dbeef9dbb50b2b1a00c;hp=f9ead5298a73bdbebaf9ae4794fb2f63f5193a95;hpb=2a9d33a300ac414c21679c520bc6434d48f499a9;p=goals.git diff --git a/src/jobs.ml b/src/jobs.ml index f9ead52..2e8735e 100644 --- a/src/jobs.ml +++ b/src/jobs.ml @@ -17,172 +17,75 @@ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. *) -module type Key = sig - type t - val compare : t -> t -> int - val to_string : t -> string -end +open Utils -module type Jobs = sig - type key - type group - val new_group : unit -> group - val start : group -> key -> (unit -> unit) -> unit - val wait : group -> unit -end +type 'a next = Job of 'a * (unit -> unit) | Complete | Not_ready -module Make (K : Key) = struct - type key = K.t +type 'a retire = 'a -> unit - type state = Waiting | Running | Done - type job = { - mutable state : state; - f : unit -> unit; (* The function to run the job. *) - } +type 'a to_string = 'a -> string - type queue = { - (* Lock preventing multiple jobs with the same key from - * running at the same time. - *) - q_lock : Mutex.t; - mutable q : job list; (* List of jobs on this queue. *) - } - let new_queue () = { q_lock = Mutex.create (); q = [] } +let run next_job retire_job string_of_job = + (* Number of running threads <= Cmdline.nr_jobs. *) + let running = ref 0 in - (* All of the shared state below is protected by this lock that - * you must hold before using any of it. - *) - let lock = Mutex.create () + (* Lock and condition for when a thread exits. *) + let lock = Mutex.create () and cond = Condition.create () in - (* Jobs are queued on separate queues according to their key. - * qs is a map of the key to the list of jobs in that queue. - *) - module Qs = Map.Make (K) - let qs = ref Qs.empty + (* If a job throws an exception it is saved here. *) + let last_exn = ref None in - (* Threads which are idle wait on this condition. This is - * signalled when: - * - a new job is added (idle threads may be able to run it) - * - a job finishes (idle threads may be able to run another - * job which has the same key as the one which finished) - *) - let idle = Condition.create () + (* This is the background thread which runs each job. *) + let runner (job, f) = + let exn = try f (); None with exn -> Some exn in - (* Threads which are running or idle but NOT waiting. This - * starts as one because the main thread is running. A thread - * which is waiting is essentially blocking another job which - * could run, so we should start a new thread. A thread which - * is idle on the other hand is not blocking anything from - * running, it's idle because there is nothing that can be run. - * - * We aim to keep this <= Cmdline.nr_jobs. - *) - let ready = ref 1 - - (* The worker thread. *) - let rec worker _ = - let id = Thread.id (Thread.self ()) in - Mutex.lock lock; - incr ready; - while !ready <= Cmdline.nr_jobs do - (* See if there's any queue with a job which is ready to run. *) - Cmdline.debug "thread %d: checking for a runnable queue" id; - match get_runnable_queue () with - | None -> - (* Nothing that we can run, go idle. This also drops - * the lock so other threads can examine the queue. - *) - Cmdline.debug "thread %d: idle" id; - Condition.wait idle lock; - | Some q -> - (* Note that q.q_lock is now held by this thread, and q.q - * is non-empty. Pick the job off the head of this queue. - *) - let job = List.hd q.q in - q.q <- List.tl q.q; - - (* Run the job, dropping the main lock while running. *) - job.state <- Running; - Mutex.unlock lock; - Cmdline.debug "thread %d: running job" id; - job.f (); - Cmdline.debug "thread %d: finished job" id; - Mutex.lock lock; - job.state <- Done; - (* Since we have finished a job, it may be that other - * idle threads could now run (if a job with the same - * key is waiting). - *) - Mutex.unlock q.q_lock; - Condition.broadcast idle - done; - decr ready; - Mutex.unlock lock - - (* Check all the queues to see if there is any job which can run. - * The lock must be held when calling this function. This - * locks the queue if it finds one. - *) - and get_runnable_queue () = - try - let qs = List.map snd (Qs.bindings !qs) in - Some (List.find is_runnable_queue qs) - with - Not_found -> None - - (* Return true iff the queue contains jobs and no existing job - * from this queue is already running. This locks the queue - * if it returns true. - *) - and is_runnable_queue = function - | { q = [] } -> false - | { q_lock } -> Mutex.try_lock q_lock - - (* A group is simply a list of jobs. *) - type group = job list ref - let new_group () = ref [] - - (* Submit a new job. *) - let start group key f = - let id = Thread.id (Thread.self ()) in - Cmdline.debug "thread %d: submitting new job" id; - Mutex.lock lock; - let job = { state = Waiting; f } in - group := job :: !group; - - (* Put the job on the queue associated with this key. *) - let q = - try Qs.find key !qs - with Not_found -> - let q = new_queue () in (* Allocate a new queue for this key. *) - qs := Qs.add key q !qs; - q in - q.q <- q.q @ [job]; - - (* Wake up any idle threads. *) - Condition.signal idle; - Mutex.unlock lock - - (* Wait for all jobs in the group to be done. *) - let rec wait group = - let id = Thread.id (Thread.self ()) in Mutex.lock lock; - while not (all_done group); do - decr ready; - (* Start more threads if fewer than nr_jobs threads are ready. *) - let needed = Cmdline.nr_jobs - !ready in - if needed > 0 then - ignore (Array.init needed (Thread.create worker)); - - Cmdline.debug "thread %d: waiting for group to complete" id; - Condition.wait idle lock; - incr ready - done; + (match exn with + | None -> retire_job job + | Some exn -> last_exn := Some exn + ); + decr running; + Condition.signal cond; Mutex.unlock lock - - (* Test if all jobs in a group are done. Note you must hold - * the lock. - *) - and all_done group = List.for_all (fun { state } -> state = Done) !group - -end + in + + let rec loop () = + if !last_exn = None then ( + match next_job () with + | Complete -> () + | Not_ready -> + assert (!running > 0); + Cmdline.debug "%d/%d threads running, waiting for dependencies" + !running (Cmdline.nr_jobs ()); + (* Wait for any running thread to finish. *) + Condition.wait cond lock; + loop () + | Job (job, f) -> + incr running; + ignore (Thread.create runner (job, f)); + (* If we've reached the limit on number of threads, wait + * for any running thread to finish. + *) + while !running >= Cmdline.nr_jobs () do + Condition.wait cond lock + done; + loop () + ) + in + Mutex.lock lock; + loop (); + + (* Wait for all jobs to complete. *) + while !running > 0 do + Cmdline.debug "%d/%d threads running, waiting for completion" + !running (Cmdline.nr_jobs ()); + Condition.wait cond lock + done; + + let exn = !last_exn in + Mutex.unlock lock; + + (* Re-raise the saved exception from the job which failed. *) + match exn with + | None -> () + | Some exn -> raise exn