1 (* Goals parallel jobs.
2 * Copyright (C) 2020 Richard W.M. Jones
3 * Copyright (C) 2020 Red Hat Inc.
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
22 val compare : t -> t -> int
23 val to_string : t -> string
26 module type Jobs = sig
29 val new_group : unit -> group
30 val start : group -> key -> (unit -> unit) -> unit
31 val wait : group -> unit
34 module Make (K : Key) = struct
37 type state = Waiting | Running | Done
39 mutable state : state;
40 f : unit -> unit; (* The function to run the job. *)
44 (* Lock preventing multiple jobs with the same key from
45 * running at the same time.
48 mutable q : job list; (* List of jobs on this queue. *)
50 let new_queue () = { q_lock = Mutex.create (); q = [] }
52 (* All of the shared state below is protected by this lock that
53 * you must hold before using any of it.
55 let lock = Mutex.create ()
57 (* Jobs are queued on separate queues according to their key.
58 * qs is a map of the key to the list of jobs in that queue.
60 module Qs = Map.Make (K)
63 (* Threads which are idle wait on this condition. This is
65 * - a new job is added (idle threads may be able to run it)
66 * - a job finishes (idle threads may be able to run another
67 * job which has the same key as the one which finished)
69 let idle = Condition.create ()
71 (* Threads which are running or idle but NOT waiting. This
72 * starts as one because the main thread is running. A thread
73 * which is waiting is essentially blocking another job which
74 * could run, so we should start a new thread. A thread which
75 * is idle on the other hand is not blocking anything from
76 * running, it's idle because there is nothing that can be run.
78 * We aim to keep this <= Cmdline.nr_jobs.
82 (* The worker thread. *)
84 let id = Thread.id (Thread.self ()) in
87 while !ready <= Cmdline.nr_jobs do
88 (* See if there's any queue with a job which is ready to run. *)
89 Cmdline.debug "thread %d: checking for a runnable queue" id;
90 match get_runnable_queue () with
92 (* Nothing that we can run, go idle. This also drops
93 * the lock so other threads can examine the queue.
95 Cmdline.debug "thread %d: idle" id;
96 Condition.wait idle lock;
98 (* Note that q.q_lock is now held by this thread, and q.q
99 * is non-empty. Pick the job off the head of this queue.
101 let job = List.hd q.q in
104 (* Run the job, dropping the main lock while running. *)
105 job.state <- Running;
107 Cmdline.debug "thread %d: running job" id;
109 Cmdline.debug "thread %d: finished job" id;
112 (* Since we have finished a job, it may be that other
113 * idle threads could now run (if a job with the same
116 Mutex.unlock q.q_lock;
117 Condition.broadcast idle
122 (* Check all the queues to see if there is any job which can run.
123 * The lock must be held when calling this function. This
124 * locks the queue if it finds one.
126 and get_runnable_queue () =
128 let qs = List.map snd (Qs.bindings !qs) in
129 Some (List.find is_runnable_queue qs)
133 (* Return true iff the queue contains jobs and no existing job
134 * from this queue is already running. This locks the queue
135 * if it returns true.
137 and is_runnable_queue = function
138 | { q = [] } -> false
139 | { q_lock } -> Mutex.try_lock q_lock
141 (* A group is simply a list of jobs. *)
142 type group = job list ref
143 let new_group () = ref []
145 (* Submit a new job. *)
146 let start group key f =
147 let id = Thread.id (Thread.self ()) in
148 Cmdline.debug "thread %d: submitting new job" id;
150 let job = { state = Waiting; f } in
151 group := job :: !group;
153 (* Put the job on the queue associated with this key. *)
157 let q = new_queue () in (* Allocate a new queue for this key. *)
158 qs := Qs.add key q !qs;
162 (* Wake up any idle threads. *)
163 Condition.signal idle;
166 (* Wait for all jobs in the group to be done. *)
168 let id = Thread.id (Thread.self ()) in
170 while not (all_done group); do
172 (* Start more threads if fewer than nr_jobs threads are ready. *)
173 let needed = Cmdline.nr_jobs - !ready in
175 ignore (Array.init needed (Thread.create worker));
177 Cmdline.debug "thread %d: waiting for group to complete" id;
178 Condition.wait idle lock;
183 (* Test if all jobs in a group are done. Note you must hold
186 and all_done group = List.for_all (fun { state } -> state = Done) !group