1 (* Goals parallel jobs.
2 * Copyright (C) 2020 Richard W.M. Jones
3 * Copyright (C) 2020 Red Hat Inc.
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
22 val compare : t -> t -> int
23 val to_string : t -> string
26 module type Jobs = sig
29 val new_group : unit -> group
30 val start : group -> key -> (unit -> unit) -> unit
31 val wait : group -> unit
32 val stop_all : unit -> unit
35 module Make (K : Key) = struct
38 type state = Waiting | Running | Done
40 mutable state : state;
41 f : unit -> unit; (* The function to run the job. *)
45 (* Lock preventing multiple jobs with the same key from
46 * running at the same time.
49 mutable q : job list; (* List of jobs on this queue. *)
51 let new_queue () = { q_lock = Mutex.create (); q = [] }
53 (* All of the shared state below is protected by this lock that
54 * you must hold before using any of it.
56 let lock = Mutex.create ()
58 (* Jobs are queued on separate queues according to their key.
59 * qs is a map of the key to the list of jobs in that queue.
61 module Qs = Map.Make (K)
64 (* Threads which are idle wait on this condition. This is
66 * - a new job is added (idle threads may be able to run it)
67 * - a job finishes (idle threads may be able to run another
68 * job which has the same key as the one which finished)
70 let idle = Condition.create ()
72 (* Threads which are running or idle but NOT waiting. This
73 * starts as one because the main thread is running. A thread
74 * which is waiting is essentially blocking another job which
75 * could run, so we should start a new thread. A thread which
76 * is idle on the other hand is not blocking anything from
77 * running, it's idle because there is nothing that can be run.
79 * We aim to keep this <= Cmdline.nr_jobs.
83 (* If stop_all is called, this is set to true and we stop
88 (* The worker thread. *)
90 let id = Thread.id (Thread.self ()) in
93 while not !stop && !ready <= Cmdline.nr_jobs () do
94 (* See if there's any queue with a job which is ready to run. *)
95 Cmdline.debug "thread %d: checking for a runnable queue" id;
96 match get_runnable_queue () with
98 (* Nothing that we can run, go idle. This also drops
99 * the lock so other threads can examine the queue.
101 Cmdline.debug "thread %d: idle" id;
102 Condition.wait idle lock;
104 (* Note that q.q_lock is now held by this thread, and q.q
105 * is non-empty. Pick the job off the head of this queue.
107 let job = List.hd q.q in
110 (* Run the job, dropping the main lock while running. *)
111 job.state <- Running;
113 Cmdline.debug "thread %d: running job" id;
115 Cmdline.debug "thread %d: finished job" id;
118 (* Since we have finished a job, it may be that other
119 * idle threads could now run (if a job with the same
122 Mutex.unlock q.q_lock;
123 Condition.broadcast idle
128 (* Check all the queues to see if there is any job which can run.
129 * The lock must be held when calling this function. This
130 * locks the queue if it finds one.
132 and get_runnable_queue () =
134 let qs = List.map snd (Qs.bindings !qs) in
135 Some (List.find is_runnable_queue qs)
139 (* Return true iff the queue contains jobs and no existing job
140 * from this queue is already running. This locks the queue
141 * if it returns true.
143 and is_runnable_queue = function
144 | { q = [] } -> false
145 | { q_lock } -> Mutex.try_lock q_lock
147 (* A group is simply a list of jobs. *)
148 type group = job list ref
149 let new_group () = ref []
151 (* Submit a new job. *)
152 let start group key f =
153 let id = Thread.id (Thread.self ()) in
154 Cmdline.debug "thread %d: submitting new job" id;
156 let job = { state = Waiting; f } in
157 group := job :: !group;
159 (* Put the job on the queue associated with this key. *)
163 let q = new_queue () in (* Allocate a new queue for this key. *)
164 qs := Qs.add key q !qs;
168 (* Wake up any idle threads. *)
169 Condition.signal idle;
172 (* Wait for all jobs in the group to be done. *)
174 let id = Thread.id (Thread.self ()) in
176 while not (all_done group); do
178 (* Start more threads if fewer than nr_jobs threads are ready. *)
179 let needed = Cmdline.nr_jobs () - !ready in
181 ignore (Array.init needed (Thread.create worker));
183 Cmdline.debug "thread %d: waiting for group to complete" id;
184 Condition.wait idle lock;
189 (* Test if all jobs in a group are done. Note you must hold
192 and all_done group = List.for_all (fun { state } -> state = Done) !group
196 (* All threads will exit after running jobs if this is set. *)
199 Condition.wait idle lock;