Implement parallel jobs (-j option).
authorRichard W.M. Jones <rjones@redhat.com>
Wed, 8 Jan 2020 11:43:26 +0000 (11:43 +0000)
committerRichard W.M. Jones <rjones@redhat.com>
Wed, 8 Jan 2020 18:21:29 +0000 (18:21 +0000)
Goalfile.in
configure.ac
docs/goals.pod
src/cmdline.ml
src/cmdline.mli
src/jobs.ml [new file with mode: 0644]
src/jobs.mli [new file with mode: 0644]
src/main.ml
src/run.ml
src/run.mli

index 39a9587..f0aaab2 100644 (file)
@@ -56,7 +56,7 @@ let OCAMLFIND = "@OCAMLFIND@"
 let OCAMLLEX = "@OCAMLLEX@"
 # XXX
 let OCAMLFLAGS = [ "-g", "-safe-string", "-warn-error", "CDEFLMPSUVYZX+52-3" ]
-let OCAMLPACKAGES = [ "-package", "str,unix", "-I", "src" ]
+let OCAMLPACKAGES = [ "-package", "str,unix,threads", "-I", "src", "-thread" ]
 #let OCAMLFLAGS = "@OCAMLFLAGS@"
 #let OCAMLPACKAGES = "@OCAMLPACKAGES@"
 
@@ -65,6 +65,7 @@ let objects = [
     "src/config.cmx",
     "src/utils.cmx",
     "src/cmdline.cmx",
+    "src/jobs.cmx",
     "src/ast.cmx",
     "src/parser.cmx",
     "src/lexer.cmx",
index e3fd9ae..e8fd7e5 100644 (file)
@@ -55,7 +55,7 @@ AC_CHECK_PROG([POD2MAN], [pod2man], [pod2man], [
 
 dnl Substitute OCaml flags and packages.
 AC_SUBST([OCAMLFLAGS], ["-g -safe-string -warn-error CDEFLMPSUVYZX+52-3"])
-AC_SUBST([OCAMLPACKAGES], ["-package str,unix"])
+AC_SUBST([OCAMLPACKAGES], ["-package str,unix,threads -thread"])
 
 dnl Produce output files.
 AC_CONFIG_HEADERS([config.h])
index c94cadb..e2b1af1 100644 (file)
@@ -8,7 +8,7 @@ goals - an experimental tool that generalizes “make”
 
  goals ['TARGET'] ['VAR=VALUE']
        [-C|--directory DIRECTORY] [-d] [-f|--file Goalfile]
-       [-I|--include DIRECTORY] [--no-prelude]
+       [-I|--include DIRECTORY] [-j|--jobs JOBS] [--no-prelude]
 
  goals --help
 
@@ -76,6 +76,15 @@ Note that if a relative path is given here, it is relative to the
 directory specified with the I<-C> option, or to the current directory
 if I<-C> was not used.
 
+=item B<-j> JOBS
+
+=item B<--jobs> JOBS
+
+Set the maximum number of commands that can run at the same time.
+Unlike make, goals defaults to running in parallel, setting the
+default to the number of cores on the machine.  To disable parallel
+jobs, you must use S<I<-j 1>>.
+
 =item B<--no-prelude>
 
 Do not load F<prelude.gl> from C<%stdlib>.  The default is that the
index fdf7ce6..0549833 100644 (file)
@@ -51,13 +51,14 @@ let () =
       Sys.executable_name stdlibdir
 
 let input_file,
-    debug_flag, directory, includes, use_prelude, anon_vars, targets =
+    debug_flag, directory, includes, nr_jobs, use_prelude, anon_vars, targets =
   let args = ref [] in
   let debug_flag = ref false in
   let directory = ref "." in
   let input_file = ref "Goalfile" in
   let includes = ref [stdlibdir] in
   let add_include dir = includes := dir :: !includes in
+  let nr_jobs = ref 4 (* XXX use nproc *) in
   let use_prelude = ref true in
 
   let argspec = [
@@ -75,6 +76,10 @@ let input_file,
                    "dir Add include directory";
     "--include",   Arg.String add_include,
                    "dir Add include directory";
+    "-j",          Arg.Set_int nr_jobs,
+                   "jobs Set number of parallel jobs";
+    "--jobs",      Arg.Set_int nr_jobs,
+                   "jobs Set number of parallel jobs";
     "--no-prelude",Arg.Clear use_prelude,
                    " Do not automatically use prelude.gl from stdlib";
     "-v",          Arg.Unit print_version,
@@ -92,6 +97,9 @@ let input_file,
   let input_file = !input_file in
   (* Don't reverse includes - we want newer -I options to take precedence. *)
   let includes = !includes in
+  let nr_jobs = !nr_jobs in
+  if nr_jobs < 1 then
+    failwithf "%s: -j must be >= 1" Sys.executable_name;
   let use_prelude = !use_prelude in
 
   (* Get the anon var assignments and targets. *)
@@ -109,7 +117,7 @@ let input_file,
     ) anon_vars in
 
   input_file,
-  debug_flag, directory, includes, use_prelude, anon_vars, targets
+  debug_flag, directory, includes, nr_jobs, use_prelude, anon_vars, targets
 
 (* Create the debug function. *)
 let debug fs =
index f29bb84..d2e889f 100644 (file)
@@ -42,6 +42,9 @@ val directory : string
 val includes : string list
 (** Get list of include directories (-I option). *)
 
+val nr_jobs : int
+(** Number of jobs (-j option). *)
+
 val use_prelude : bool
 (** True if we should load the prelude, or false if --no-prelude. *)
 
diff --git a/src/jobs.ml b/src/jobs.ml
new file mode 100644 (file)
index 0000000..f9ead52
--- /dev/null
@@ -0,0 +1,188 @@
+(* Goals parallel jobs.
+ * Copyright (C) 2020 Richard W.M. Jones
+ * Copyright (C) 2020 Red Hat Inc.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ *)
+
+module type Key = sig
+  type t
+  val compare : t -> t -> int
+  val to_string : t -> string
+end
+
+module type Jobs = sig
+  type key
+  type group
+  val new_group : unit -> group
+  val start : group -> key -> (unit -> unit) -> unit
+  val wait : group -> unit
+end
+
+module Make (K : Key) = struct
+  type key = K.t
+
+  type state = Waiting | Running | Done
+  type job = {
+    mutable state : state;
+    f : unit -> unit;           (* The function to run the job. *)
+  }
+
+  type queue = {
+    (* Lock preventing multiple jobs with the same key from
+     * running at the same time.
+     *)
+    q_lock : Mutex.t;
+    mutable q : job list;       (* List of jobs on this queue. *)
+  }
+  let new_queue () = { q_lock = Mutex.create (); q = [] }
+
+  (* All of the shared state below is protected by this lock that
+   * you must hold before using any of it.
+   *)
+  let lock = Mutex.create ()
+
+  (* Jobs are queued on separate queues according to their key.
+   * qs is a map of the key to the list of jobs in that queue.
+   *)
+  module Qs = Map.Make (K)
+  let qs = ref Qs.empty
+
+  (* Threads which are idle wait on this condition.  This is
+   * signalled when:
+   *  - a new job is added (idle threads may be able to run it)
+   *  - a job finishes (idle threads may be able to run another
+   *    job which has the same key as the one which finished)
+   *)
+  let idle = Condition.create ()
+
+  (* Threads which are running or idle but NOT waiting.  This
+   * starts as one because the main thread is running.  A thread
+   * which is waiting is essentially blocking another job which
+   * could run, so we should start a new thread.  A thread which
+   * is idle on the other hand is not blocking anything from
+   * running, it's idle because there is nothing that can be run.
+   *
+   * We aim to keep this <= Cmdline.nr_jobs.
+   *)
+  let ready = ref 1
+
+  (* The worker thread. *)
+  let rec worker _ =
+    let id = Thread.id (Thread.self ()) in
+    Mutex.lock lock;
+    incr ready;
+    while !ready <= Cmdline.nr_jobs do
+      (* See if there's any queue with a job which is ready to run. *)
+      Cmdline.debug "thread %d: checking for a runnable queue" id;
+      match get_runnable_queue () with
+      | None ->
+         (* Nothing that we can run, go idle.  This also drops
+          * the lock so other threads can examine the queue.
+          *)
+         Cmdline.debug "thread %d: idle" id;
+         Condition.wait idle lock;
+      | Some q ->
+         (* Note that q.q_lock is now held by this thread, and q.q
+          * is non-empty.  Pick the job off the head of this queue.
+          *)
+         let job = List.hd q.q in
+         q.q <- List.tl q.q;
+
+         (* Run the job, dropping the main lock while running. *)
+         job.state <- Running;
+         Mutex.unlock lock;
+         Cmdline.debug "thread %d: running job" id;
+         job.f ();
+         Cmdline.debug "thread %d: finished job" id;
+         Mutex.lock lock;
+         job.state <- Done;
+         (* Since we have finished a job, it may be that other
+          * idle threads could now run (if a job with the same
+          * key is waiting).
+          *)
+         Mutex.unlock q.q_lock;
+         Condition.broadcast idle
+    done;
+    decr ready;
+    Mutex.unlock lock
+
+  (* Check all the queues to see if there is any job which can run.
+   * The lock must be held when calling this function.  This
+   * locks the queue if it finds one.
+   *)
+  and get_runnable_queue () =
+    try
+      let qs = List.map snd (Qs.bindings !qs) in
+      Some (List.find is_runnable_queue qs)
+    with
+      Not_found -> None
+
+  (* Return true iff the queue contains jobs and no existing job
+   * from this queue is already running.  This locks the queue
+   * if it returns true.
+   *)
+  and is_runnable_queue = function
+    | { q = [] } -> false
+    | { q_lock } -> Mutex.try_lock q_lock
+
+  (* A group is simply a list of jobs. *)
+  type group = job list ref
+  let new_group () = ref []
+
+  (* Submit a new job. *)
+  let start group key f =
+    let id = Thread.id (Thread.self ()) in
+    Cmdline.debug "thread %d: submitting new job" id;
+    Mutex.lock lock;
+    let job = { state = Waiting; f } in
+    group := job :: !group;
+
+    (* Put the job on the queue associated with this key. *)
+    let q =
+      try Qs.find key !qs
+      with Not_found ->
+        let q = new_queue () in (* Allocate a new queue for this key. *)
+        qs := Qs.add key q !qs;
+        q in
+    q.q <- q.q @ [job];
+
+    (* Wake up any idle threads. *)
+    Condition.signal idle;
+    Mutex.unlock lock
+
+  (* Wait for all jobs in the group to be done. *)
+  let rec wait group =
+    let id = Thread.id (Thread.self ()) in
+    Mutex.lock lock;
+    while not (all_done group); do
+      decr ready;
+      (* Start more threads if fewer than nr_jobs threads are ready. *)
+      let needed = Cmdline.nr_jobs - !ready in
+      if needed > 0 then
+        ignore (Array.init needed (Thread.create worker));
+
+      Cmdline.debug "thread %d: waiting for group to complete" id;
+      Condition.wait idle lock;
+      incr ready
+    done;
+    Mutex.unlock lock
+
+  (* Test if all jobs in a group are done.  Note you must hold
+   * the lock.
+   *)
+  and all_done group = List.for_all (fun { state } -> state = Done) !group
+
+end
diff --git a/src/jobs.mli b/src/jobs.mli
new file mode 100644 (file)
index 0000000..697263d
--- /dev/null
@@ -0,0 +1,66 @@
+(* Goals parallel jobs.
+ * Copyright (C) 2020 Richard W.M. Jones
+ * Copyright (C) 2020 Red Hat Inc.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ *)
+
+(** This module manages parallel jobs.
+
+    Jobs are grouped.  You call [new_group] to create a new
+    group of jobs, initially empty.  Then add jobs to it.  Then
+    wait for all the jobs in the group to complete.
+
+    To submit a job to a group use [start group key f].  [group]
+    is an existing group of jobs to which this is added.  [key] is
+    a key which ensures that two identical jobs cannot be running
+    at the same time (across all groups).  If two or more jobs
+    with the same key are submitted then only one will run and
+    the others will wait until the first finishes, and then another
+    will be picked to run and so on.  Jobs with different keys run
+    freely in parallel, assuming there are enough threads available
+    to run them.
+
+    Goals uses the goal (name + parameters) as the key to
+    ensure you cannot have two jobs running at the same time
+    which would interfere with each other by trying to build
+    the same target.
+
+    To wait for a group of jobs to complete, call [wait group].
+ *)
+
+module type Key = sig
+  type t
+  val compare : t -> t -> int
+  val to_string : t -> string
+end
+
+module type Jobs = sig
+  type key
+  type group
+
+  val new_group : unit -> group
+  (** Create a new empty jobs group. *)
+
+  val start : group -> key -> (unit -> unit) -> unit
+  (** [start group key f] submits a job to run in the background.
+      The [key] ensures that two jobs with the same key cannot run
+      at the same time (across all groups). *)
+
+  val wait : group -> unit
+  (** [wait group] waits for all of the jobs in the group to finish. *)
+end
+
+module Make (K : Key) : Jobs with type key = K.t
index 2b74a4f..63dac2b 100644 (file)
@@ -75,7 +75,7 @@ let main () =
     Ast.print_env stderr env;
 
   (* Run the target expressions. *)
-  Run.run_targets env targets
+  Run.run_targets_to_completion env targets
 
 let () =
   try main ()
index 0d4fa88..9a15775 100644 (file)
@@ -21,10 +21,38 @@ open Printf
 
 open Utils
 
-let rec run_targets env exprs =
-  List.iter (run_target env) exprs
+(* Goals uses the goal (name + parameters) as the key to
+ * ensure you cannot have two jobs running at the same time
+ * which would interfere with each other by trying to build
+ * the same target.
+ *)
+module Jobs = Jobs.Make (
+  struct
+    type t = string * Ast.expr list
+    let compare = compare
+    let to_string (name, args) =
+      sprintf "%s (%s)" name
+        (String.concat ", " (List.map (Ast.string_expr ()) args))
+  end
+)
+
+(* Starts the target expressions running and waits for them to complete. *)
+let rec run_targets_to_completion env exprs =
+  let group = Jobs.new_group () in
+  run_targets group env exprs;
+  Jobs.wait group
+
+(* This starts the targets, adding them to the jobs group, but does not
+ * wait for them to complete.
+ *)
+and run_targets group env exprs =
+  List.iter (run_target group env) exprs
 
-and run_target env = function
+(* This starts a single target, adding the (usually single but can
+ * be multiple) jobs to the jobs group.  It does not wait for the
+ * jobs to complete.
+ *)
+and run_target group env = function
   | Ast.EGoalDefn _ | Ast.EFuncDefn _ | Ast.ETacticDefn _ -> assert false
 
   (* Call a goal or function. *)
@@ -32,10 +60,13 @@ and run_target env = function
      let expr = Ast.getvar env loc name in
      (match expr with
       | Ast.EGoalDefn (_, goal) ->
-         run_goal env loc name args goal []
+         let key = name, args in
+         Jobs.start group key (
+           fun () -> run_goal env loc name args goal []
+         )
       | Ast.EFuncDefn (_, func) ->
          let expr = Eval.call_function env loc name args func in
-         run_target env expr
+         run_target group env expr
       | _ ->
          failwithf "%a: tried to call ‘%s’ which is not a goal or a function"
            Ast.string_loc loc name
@@ -47,7 +78,7 @@ and run_target env = function
       * (strings, in future booleans, numbers, etc).
       *)
      let args = List.map (Eval.to_constant env) args in
-     run_tactic env loc name args
+     run_tactic group env loc name args
 
   (* If this is a goal then it's the same as calling goal().  If not
    * then look up the variable and substitute it.
@@ -56,25 +87,25 @@ and run_target env = function
      let expr = Ast.getvar env loc name in
      (match expr with
       | Ast.EGoalDefn (loc, ([], _, _, _)) ->
-         run_target env (Ast.ECall (loc, name, []))
+         run_target group env (Ast.ECall (loc, name, []))
       | EGoalDefn _ ->
          failwithf "%a: cannot call %s() since this goal has parameters"
            Ast.string_loc loc name
       | _ ->
-         run_target env expr
+         run_target group env expr
      )
 
   (* Lists are inlined when found as a target. *)
   | Ast.EList (loc, exprs) ->
-     run_targets env exprs
+     run_targets group env exprs
 
   (* A string (with or without substitutions) implies *file(filename). *)
   | Ast.ESubsts (loc, str) ->
      let str = Eval.substitute env loc str in
-     run_tactic env loc "*file" [Ast.CString str]
+     run_tactic group env loc "*file" [Ast.CString str]
 
   | Ast.EConstant (loc, c) ->
-     run_tactic env loc "*file" [c]
+     run_tactic group env loc "*file" [c]
 
 (* Run a goal by name. *)
 and run_goal env loc name args (params, patterns, deps, code) extra_deps =
@@ -102,8 +133,10 @@ and run_goal env loc name args (params, patterns, deps, code) extra_deps =
           (List.length params) (List.length args) in
     List.fold_left (fun env (k, v) -> Ast.Env.add k v env) env params in
 
-  (* Check all dependencies have been updated. *)
-  run_targets env (deps @ extra_deps);
+  (* Check all dependencies have been updated.  We must wait
+   * for these to complete before we can continue.
+   *)
+  run_targets_to_completion env (deps @ extra_deps);
 
   (* Check if any target (ie. pattern) needs to be rebuilt.
    * As with make, a goal with no targets is always run.
@@ -199,10 +232,10 @@ and needs_rebuild env loc deps extra_deps pattern =
        exit 1
      )
 
-(* Find the goal which matches the given tactic and run it.
+(* Find the goal which matches the given tactic and start it.
  * cargs is a list of parameters (all constants).
  *)
-and run_tactic env loc tactic cargs =
+and run_tactic group env loc tactic cargs =
   (* This is used to print the tactic in debug and error messages only. *)
   let debug_tactic =
     Ast.string_expr ()
@@ -249,7 +282,10 @@ and run_tactic env loc tactic cargs =
 
   | [_, name, goal, args] ->
      (* Single goal matches, run it. *)
-     run_goal env loc name args goal []
+     let key = name, args in
+     Jobs.start group key (
+       fun () -> run_goal env loc name args goal []
+     )
 
   | goals ->
      (* Two or more goals match.  Only one must have a CODE section,
@@ -286,7 +322,10 @@ and run_tactic env loc tactic cargs =
           failwithf "%a: multiple goals found which match tactic %s, but more than one of these goals have {code} sections which is not allowed"
             Ast.string_loc loc debug_tactic in
 
-     run_goal env loc name args goal extra_deps
+     let key = name, args in
+     Jobs.start group key (fun () ->
+       run_goal env loc name args goal extra_deps
+     )
 
 (* Test if pattern matches *tactic(cargs).  If it does
  * then we return Some args where args is the arguments that must
index 603ad03..6281242 100644 (file)
@@ -17,7 +17,7 @@
  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  *)
 
-val run_targets : Ast.env -> Ast.expr list -> unit
+val run_targets_to_completion : Ast.env -> Ast.expr list -> unit
 (** This drives evaluation of the list of target expressions (in
     parallel) until they are complete or we reach an error.  The
     expressions are either a list of dependencies and/or a list of