- (* Threads which are idle wait on this condition. This is
- * signalled when:
- * - a new job is added (idle threads may be able to run it)
- * - a job finishes (idle threads may be able to run another
- * job which has the same key as the one which finished)
- *)
- let idle = Condition.create ()
-
- (* Threads which are running or idle but NOT waiting. This
- * starts as one because the main thread is running. A thread
- * which is waiting is essentially blocking another job which
- * could run, so we should start a new thread. A thread which
- * is idle on the other hand is not blocking anything from
- * running, it's idle because there is nothing that can be run.
- *
- * We aim to keep this <= Cmdline.nr_jobs.
- *)
- let ready = ref 1
-
- (* If stop_all is called, this is set to true and we stop
- * running new jobs.
- *)
- let stop = ref false
-
- (* The worker thread. *)
- let rec worker _ =
- let id = Thread.id (Thread.self ()) in
- Mutex.lock lock;
- incr ready;
- while not !stop && !ready <= Cmdline.nr_jobs () do
- (* See if there's any queue with a job which is ready to run. *)
- Cmdline.debug "thread %d: checking for a runnable queue" id;
- match get_runnable_queue () with
- | None ->
- (* Nothing that we can run, go idle. This also drops
- * the lock so other threads can examine the queue.
- *)
- Cmdline.debug "thread %d: idle" id;
- Condition.wait idle lock;
- | Some q ->
- (* Note that q.q_lock is now held by this thread, and q.q
- * is non-empty. Pick the job off the head of this queue.
- *)
- let job = List.hd q.q in
- q.q <- List.tl q.q;
-
- (* Run the job, dropping the main lock while running. *)
- job.state <- Running;
- Mutex.unlock lock;
- Cmdline.debug "thread %d: running job" id;
- let exn = try job.f (); None with exn -> Some exn in
- Cmdline.debug "thread %d: finished job" id;
- Mutex.lock lock;
- job.state <- Done;
- job.exn <- exn;
- (* Since we have finished a job, it may be that other
- * idle threads could now run (if a job with the same
- * key is waiting).
- *)
- Mutex.unlock q.q_lock;
- Condition.broadcast idle
- done;
- decr ready;
- Mutex.unlock lock
-
- (* Check all the queues to see if there is any job which can run.
- * The lock must be held when calling this function. This
- * locks the queue if it finds one.
- *)
- and get_runnable_queue () =
- try
- let qs = List.map snd (Qs.bindings !qs) in
- Some (List.find is_runnable_queue qs)
- with
- Not_found -> None
-
- (* Return true iff the queue contains jobs and no existing job
- * from this queue is already running. This locks the queue
- * if it returns true.
- *)
- and is_runnable_queue = function
- | { q = [] } -> false
- | { q_lock } -> Mutex.try_lock q_lock
-
- (* A group is simply a list of jobs. *)
- type group = job list ref
- let new_group () = ref []
-
- (* Submit a new job. *)
- let start group key f =
- let id = Thread.id (Thread.self ()) in
- Cmdline.debug "thread %d: submitting new job" id;