From: Richard W.M. Jones Date: Wed, 8 Jan 2020 11:43:26 +0000 (+0000) Subject: Implement parallel jobs (-j option). X-Git-Tag: v'0.2'~59 X-Git-Url: http://git.annexia.org/?a=commitdiff_plain;h=2a9d33a300ac414c21679c520bc6434d48f499a9;p=goals.git Implement parallel jobs (-j option). --- diff --git a/Goalfile.in b/Goalfile.in index 39a9587..f0aaab2 100644 --- a/Goalfile.in +++ b/Goalfile.in @@ -56,7 +56,7 @@ let OCAMLFIND = "@OCAMLFIND@" let OCAMLLEX = "@OCAMLLEX@" # XXX let OCAMLFLAGS = [ "-g", "-safe-string", "-warn-error", "CDEFLMPSUVYZX+52-3" ] -let OCAMLPACKAGES = [ "-package", "str,unix", "-I", "src" ] +let OCAMLPACKAGES = [ "-package", "str,unix,threads", "-I", "src", "-thread" ] #let OCAMLFLAGS = "@OCAMLFLAGS@" #let OCAMLPACKAGES = "@OCAMLPACKAGES@" @@ -65,6 +65,7 @@ let objects = [ "src/config.cmx", "src/utils.cmx", "src/cmdline.cmx", + "src/jobs.cmx", "src/ast.cmx", "src/parser.cmx", "src/lexer.cmx", diff --git a/configure.ac b/configure.ac index e3fd9ae..e8fd7e5 100644 --- a/configure.ac +++ b/configure.ac @@ -55,7 +55,7 @@ AC_CHECK_PROG([POD2MAN], [pod2man], [pod2man], [ dnl Substitute OCaml flags and packages. AC_SUBST([OCAMLFLAGS], ["-g -safe-string -warn-error CDEFLMPSUVYZX+52-3"]) -AC_SUBST([OCAMLPACKAGES], ["-package str,unix"]) +AC_SUBST([OCAMLPACKAGES], ["-package str,unix,threads -thread"]) dnl Produce output files. AC_CONFIG_HEADERS([config.h]) diff --git a/docs/goals.pod b/docs/goals.pod index c94cadb..e2b1af1 100644 --- a/docs/goals.pod +++ b/docs/goals.pod @@ -8,7 +8,7 @@ goals - an experimental tool that generalizes “make” goals ['TARGET'] ['VAR=VALUE'] [-C|--directory DIRECTORY] [-d] [-f|--file Goalfile] - [-I|--include DIRECTORY] [--no-prelude] + [-I|--include DIRECTORY] [-j|--jobs JOBS] [--no-prelude] goals --help @@ -76,6 +76,15 @@ Note that if a relative path is given here, it is relative to the directory specified with the I<-C> option, or to the current directory if I<-C> was not used. +=item B<-j> JOBS + +=item B<--jobs> JOBS + +Set the maximum number of commands that can run at the same time. +Unlike make, goals defaults to running in parallel, setting the +default to the number of cores on the machine. To disable parallel +jobs, you must use S>. + =item B<--no-prelude> Do not load F from C<%stdlib>. The default is that the diff --git a/src/cmdline.ml b/src/cmdline.ml index fdf7ce6..0549833 100644 --- a/src/cmdline.ml +++ b/src/cmdline.ml @@ -51,13 +51,14 @@ let () = Sys.executable_name stdlibdir let input_file, - debug_flag, directory, includes, use_prelude, anon_vars, targets = + debug_flag, directory, includes, nr_jobs, use_prelude, anon_vars, targets = let args = ref [] in let debug_flag = ref false in let directory = ref "." in let input_file = ref "Goalfile" in let includes = ref [stdlibdir] in let add_include dir = includes := dir :: !includes in + let nr_jobs = ref 4 (* XXX use nproc *) in let use_prelude = ref true in let argspec = [ @@ -75,6 +76,10 @@ let input_file, "dir Add include directory"; "--include", Arg.String add_include, "dir Add include directory"; + "-j", Arg.Set_int nr_jobs, + "jobs Set number of parallel jobs"; + "--jobs", Arg.Set_int nr_jobs, + "jobs Set number of parallel jobs"; "--no-prelude",Arg.Clear use_prelude, " Do not automatically use prelude.gl from stdlib"; "-v", Arg.Unit print_version, @@ -92,6 +97,9 @@ let input_file, let input_file = !input_file in (* Don't reverse includes - we want newer -I options to take precedence. *) let includes = !includes in + let nr_jobs = !nr_jobs in + if nr_jobs < 1 then + failwithf "%s: -j must be >= 1" Sys.executable_name; let use_prelude = !use_prelude in (* Get the anon var assignments and targets. *) @@ -109,7 +117,7 @@ let input_file, ) anon_vars in input_file, - debug_flag, directory, includes, use_prelude, anon_vars, targets + debug_flag, directory, includes, nr_jobs, use_prelude, anon_vars, targets (* Create the debug function. *) let debug fs = diff --git a/src/cmdline.mli b/src/cmdline.mli index f29bb84..d2e889f 100644 --- a/src/cmdline.mli +++ b/src/cmdline.mli @@ -42,6 +42,9 @@ val directory : string val includes : string list (** Get list of include directories (-I option). *) +val nr_jobs : int +(** Number of jobs (-j option). *) + val use_prelude : bool (** True if we should load the prelude, or false if --no-prelude. *) diff --git a/src/jobs.ml b/src/jobs.ml new file mode 100644 index 0000000..f9ead52 --- /dev/null +++ b/src/jobs.ml @@ -0,0 +1,188 @@ +(* Goals parallel jobs. + * Copyright (C) 2020 Richard W.M. Jones + * Copyright (C) 2020 Red Hat Inc. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + *) + +module type Key = sig + type t + val compare : t -> t -> int + val to_string : t -> string +end + +module type Jobs = sig + type key + type group + val new_group : unit -> group + val start : group -> key -> (unit -> unit) -> unit + val wait : group -> unit +end + +module Make (K : Key) = struct + type key = K.t + + type state = Waiting | Running | Done + type job = { + mutable state : state; + f : unit -> unit; (* The function to run the job. *) + } + + type queue = { + (* Lock preventing multiple jobs with the same key from + * running at the same time. + *) + q_lock : Mutex.t; + mutable q : job list; (* List of jobs on this queue. *) + } + let new_queue () = { q_lock = Mutex.create (); q = [] } + + (* All of the shared state below is protected by this lock that + * you must hold before using any of it. + *) + let lock = Mutex.create () + + (* Jobs are queued on separate queues according to their key. + * qs is a map of the key to the list of jobs in that queue. + *) + module Qs = Map.Make (K) + let qs = ref Qs.empty + + (* Threads which are idle wait on this condition. This is + * signalled when: + * - a new job is added (idle threads may be able to run it) + * - a job finishes (idle threads may be able to run another + * job which has the same key as the one which finished) + *) + let idle = Condition.create () + + (* Threads which are running or idle but NOT waiting. This + * starts as one because the main thread is running. A thread + * which is waiting is essentially blocking another job which + * could run, so we should start a new thread. A thread which + * is idle on the other hand is not blocking anything from + * running, it's idle because there is nothing that can be run. + * + * We aim to keep this <= Cmdline.nr_jobs. + *) + let ready = ref 1 + + (* The worker thread. *) + let rec worker _ = + let id = Thread.id (Thread.self ()) in + Mutex.lock lock; + incr ready; + while !ready <= Cmdline.nr_jobs do + (* See if there's any queue with a job which is ready to run. *) + Cmdline.debug "thread %d: checking for a runnable queue" id; + match get_runnable_queue () with + | None -> + (* Nothing that we can run, go idle. This also drops + * the lock so other threads can examine the queue. + *) + Cmdline.debug "thread %d: idle" id; + Condition.wait idle lock; + | Some q -> + (* Note that q.q_lock is now held by this thread, and q.q + * is non-empty. Pick the job off the head of this queue. + *) + let job = List.hd q.q in + q.q <- List.tl q.q; + + (* Run the job, dropping the main lock while running. *) + job.state <- Running; + Mutex.unlock lock; + Cmdline.debug "thread %d: running job" id; + job.f (); + Cmdline.debug "thread %d: finished job" id; + Mutex.lock lock; + job.state <- Done; + (* Since we have finished a job, it may be that other + * idle threads could now run (if a job with the same + * key is waiting). + *) + Mutex.unlock q.q_lock; + Condition.broadcast idle + done; + decr ready; + Mutex.unlock lock + + (* Check all the queues to see if there is any job which can run. + * The lock must be held when calling this function. This + * locks the queue if it finds one. + *) + and get_runnable_queue () = + try + let qs = List.map snd (Qs.bindings !qs) in + Some (List.find is_runnable_queue qs) + with + Not_found -> None + + (* Return true iff the queue contains jobs and no existing job + * from this queue is already running. This locks the queue + * if it returns true. + *) + and is_runnable_queue = function + | { q = [] } -> false + | { q_lock } -> Mutex.try_lock q_lock + + (* A group is simply a list of jobs. *) + type group = job list ref + let new_group () = ref [] + + (* Submit a new job. *) + let start group key f = + let id = Thread.id (Thread.self ()) in + Cmdline.debug "thread %d: submitting new job" id; + Mutex.lock lock; + let job = { state = Waiting; f } in + group := job :: !group; + + (* Put the job on the queue associated with this key. *) + let q = + try Qs.find key !qs + with Not_found -> + let q = new_queue () in (* Allocate a new queue for this key. *) + qs := Qs.add key q !qs; + q in + q.q <- q.q @ [job]; + + (* Wake up any idle threads. *) + Condition.signal idle; + Mutex.unlock lock + + (* Wait for all jobs in the group to be done. *) + let rec wait group = + let id = Thread.id (Thread.self ()) in + Mutex.lock lock; + while not (all_done group); do + decr ready; + (* Start more threads if fewer than nr_jobs threads are ready. *) + let needed = Cmdline.nr_jobs - !ready in + if needed > 0 then + ignore (Array.init needed (Thread.create worker)); + + Cmdline.debug "thread %d: waiting for group to complete" id; + Condition.wait idle lock; + incr ready + done; + Mutex.unlock lock + + (* Test if all jobs in a group are done. Note you must hold + * the lock. + *) + and all_done group = List.for_all (fun { state } -> state = Done) !group + +end diff --git a/src/jobs.mli b/src/jobs.mli new file mode 100644 index 0000000..697263d --- /dev/null +++ b/src/jobs.mli @@ -0,0 +1,66 @@ +(* Goals parallel jobs. + * Copyright (C) 2020 Richard W.M. Jones + * Copyright (C) 2020 Red Hat Inc. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + *) + +(** This module manages parallel jobs. + + Jobs are grouped. You call [new_group] to create a new + group of jobs, initially empty. Then add jobs to it. Then + wait for all the jobs in the group to complete. + + To submit a job to a group use [start group key f]. [group] + is an existing group of jobs to which this is added. [key] is + a key which ensures that two identical jobs cannot be running + at the same time (across all groups). If two or more jobs + with the same key are submitted then only one will run and + the others will wait until the first finishes, and then another + will be picked to run and so on. Jobs with different keys run + freely in parallel, assuming there are enough threads available + to run them. + + Goals uses the goal (name + parameters) as the key to + ensure you cannot have two jobs running at the same time + which would interfere with each other by trying to build + the same target. + + To wait for a group of jobs to complete, call [wait group]. + *) + +module type Key = sig + type t + val compare : t -> t -> int + val to_string : t -> string +end + +module type Jobs = sig + type key + type group + + val new_group : unit -> group + (** Create a new empty jobs group. *) + + val start : group -> key -> (unit -> unit) -> unit + (** [start group key f] submits a job to run in the background. + The [key] ensures that two jobs with the same key cannot run + at the same time (across all groups). *) + + val wait : group -> unit + (** [wait group] waits for all of the jobs in the group to finish. *) +end + +module Make (K : Key) : Jobs with type key = K.t diff --git a/src/main.ml b/src/main.ml index 2b74a4f..63dac2b 100644 --- a/src/main.ml +++ b/src/main.ml @@ -75,7 +75,7 @@ let main () = Ast.print_env stderr env; (* Run the target expressions. *) - Run.run_targets env targets + Run.run_targets_to_completion env targets let () = try main () diff --git a/src/run.ml b/src/run.ml index 0d4fa88..9a15775 100644 --- a/src/run.ml +++ b/src/run.ml @@ -21,10 +21,38 @@ open Printf open Utils -let rec run_targets env exprs = - List.iter (run_target env) exprs +(* Goals uses the goal (name + parameters) as the key to + * ensure you cannot have two jobs running at the same time + * which would interfere with each other by trying to build + * the same target. + *) +module Jobs = Jobs.Make ( + struct + type t = string * Ast.expr list + let compare = compare + let to_string (name, args) = + sprintf "%s (%s)" name + (String.concat ", " (List.map (Ast.string_expr ()) args)) + end +) + +(* Starts the target expressions running and waits for them to complete. *) +let rec run_targets_to_completion env exprs = + let group = Jobs.new_group () in + run_targets group env exprs; + Jobs.wait group + +(* This starts the targets, adding them to the jobs group, but does not + * wait for them to complete. + *) +and run_targets group env exprs = + List.iter (run_target group env) exprs -and run_target env = function +(* This starts a single target, adding the (usually single but can + * be multiple) jobs to the jobs group. It does not wait for the + * jobs to complete. + *) +and run_target group env = function | Ast.EGoalDefn _ | Ast.EFuncDefn _ | Ast.ETacticDefn _ -> assert false (* Call a goal or function. *) @@ -32,10 +60,13 @@ and run_target env = function let expr = Ast.getvar env loc name in (match expr with | Ast.EGoalDefn (_, goal) -> - run_goal env loc name args goal [] + let key = name, args in + Jobs.start group key ( + fun () -> run_goal env loc name args goal [] + ) | Ast.EFuncDefn (_, func) -> let expr = Eval.call_function env loc name args func in - run_target env expr + run_target group env expr | _ -> failwithf "%a: tried to call ‘%s’ which is not a goal or a function" Ast.string_loc loc name @@ -47,7 +78,7 @@ and run_target env = function * (strings, in future booleans, numbers, etc). *) let args = List.map (Eval.to_constant env) args in - run_tactic env loc name args + run_tactic group env loc name args (* If this is a goal then it's the same as calling goal(). If not * then look up the variable and substitute it. @@ -56,25 +87,25 @@ and run_target env = function let expr = Ast.getvar env loc name in (match expr with | Ast.EGoalDefn (loc, ([], _, _, _)) -> - run_target env (Ast.ECall (loc, name, [])) + run_target group env (Ast.ECall (loc, name, [])) | EGoalDefn _ -> failwithf "%a: cannot call %s() since this goal has parameters" Ast.string_loc loc name | _ -> - run_target env expr + run_target group env expr ) (* Lists are inlined when found as a target. *) | Ast.EList (loc, exprs) -> - run_targets env exprs + run_targets group env exprs (* A string (with or without substitutions) implies *file(filename). *) | Ast.ESubsts (loc, str) -> let str = Eval.substitute env loc str in - run_tactic env loc "*file" [Ast.CString str] + run_tactic group env loc "*file" [Ast.CString str] | Ast.EConstant (loc, c) -> - run_tactic env loc "*file" [c] + run_tactic group env loc "*file" [c] (* Run a goal by name. *) and run_goal env loc name args (params, patterns, deps, code) extra_deps = @@ -102,8 +133,10 @@ and run_goal env loc name args (params, patterns, deps, code) extra_deps = (List.length params) (List.length args) in List.fold_left (fun env (k, v) -> Ast.Env.add k v env) env params in - (* Check all dependencies have been updated. *) - run_targets env (deps @ extra_deps); + (* Check all dependencies have been updated. We must wait + * for these to complete before we can continue. + *) + run_targets_to_completion env (deps @ extra_deps); (* Check if any target (ie. pattern) needs to be rebuilt. * As with make, a goal with no targets is always run. @@ -199,10 +232,10 @@ and needs_rebuild env loc deps extra_deps pattern = exit 1 ) -(* Find the goal which matches the given tactic and run it. +(* Find the goal which matches the given tactic and start it. * cargs is a list of parameters (all constants). *) -and run_tactic env loc tactic cargs = +and run_tactic group env loc tactic cargs = (* This is used to print the tactic in debug and error messages only. *) let debug_tactic = Ast.string_expr () @@ -249,7 +282,10 @@ and run_tactic env loc tactic cargs = | [_, name, goal, args] -> (* Single goal matches, run it. *) - run_goal env loc name args goal [] + let key = name, args in + Jobs.start group key ( + fun () -> run_goal env loc name args goal [] + ) | goals -> (* Two or more goals match. Only one must have a CODE section, @@ -286,7 +322,10 @@ and run_tactic env loc tactic cargs = failwithf "%a: multiple goals found which match tactic %s, but more than one of these goals have {code} sections which is not allowed" Ast.string_loc loc debug_tactic in - run_goal env loc name args goal extra_deps + let key = name, args in + Jobs.start group key (fun () -> + run_goal env loc name args goal extra_deps + ) (* Test if pattern matches *tactic(cargs). If it does * then we return Some args where args is the arguments that must diff --git a/src/run.mli b/src/run.mli index 603ad03..6281242 100644 --- a/src/run.mli +++ b/src/run.mli @@ -17,7 +17,7 @@ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. *) -val run_targets : Ast.env -> Ast.expr list -> unit +val run_targets_to_completion : Ast.env -> Ast.expr list -> unit (** This drives evaluation of the list of target expressions (in parallel) until they are complete or we reach an error. The expressions are either a list of dependencies and/or a list of