Implement pre functions.
authorRichard W.M. Jones <rjones@redhat.com>
Tue, 28 Feb 2012 12:56:35 +0000 (12:56 +0000)
committerRichard W.M. Jones <rjones@redhat.com>
Tue, 28 Feb 2012 13:35:24 +0000 (13:35 +0000)
TODO
daemon/daemon.ml
lib/whenexpr.ml
lib/whenexpr.mli
lib/whentools.ml
lib/whentools.mli
tools/whenjobs.pod

diff --git a/TODO b/TODO
index a4e253b..88bd0ad 100644 (file)
--- a/TODO
+++ b/TODO
@@ -1,16 +1,12 @@
 Allow the output of a job to be listed.  This is pretty easy since we
 know the tmpdir and the output file.
 
-How to deal with stuck / long-running jobs?
+For Whentools.once and Whentools.max:
 
-   pre() and post()
-     (instead of cleanup)
+ - Add a ~group parameter so we can run no more than "n" jobs in one
+   group.
 
- - We should have instance/timeout settings that allows us to deal
-   with jobs that run too long:
-
-   job "foo" timeout 1 hour ...    # timeout the job after an hour
-   job "bar" single instance ...   # only one instance can run
-   job "bar" kill previous ...     # kill previous if still running
+ - Add a ~kill_oldest (bool) parameter so we can kill the oldest jobs
+   and start the new job.
 
 Implement native dynlink.
index 25b81b7..ac0f1ff 100644 (file)
@@ -329,57 +329,87 @@ and run_job job =
       serial
     | _ -> assert false in
 
-  Syslog.notice "running %s (JOBSERIAL=%s)"
-    job.job_name (string_of_big_int serial);
-
-  (* Create a temporary directory.  The current directory of the job
-   * will be in this directory.  The directory is removed when the
-   * child process exits.
+  (* Call the pre-condition script.  Note this may decide not to run
+   * the job by returning false.
    *)
-  let dir = tmpdir () in
-
-  let pid = fork () in
-  if pid = 0 then ( (* child process running the job *)
-    chdir dir;
-
-    (* Set environment variables corresponding to each variable. *)
-    List.iter
-      (fun (name, value) -> putenv name (string_of_variable value))
-      (Whenstate.get_variables !state);
-
-    (* Set the $JOBNAME environment variable. *)
-    putenv "JOBNAME" job.job_name;
-
-    (* Create a temporary file containing the shell script fragment. *)
-    let script = dir // "script.sh" in
-    let chan = open_out script in
-    fprintf chan "set -e\n"; (* So that jobs exit on error. *)
-    output_string chan job.job_script.sh_script;
-    close_out chan;
-    chmod script 0o700;
-
-    let shell = try getenv "SHELL" with Not_found -> "/bin/sh" in
-
-    (* Set output to file. *)
-    let output = dir // "output.txt" in
-    let fd = openfile output [O_WRONLY; O_CREAT; O_TRUNC; O_NOCTTY] 0o600 in
-    dup2 fd stdout;
-    dup2 fd stderr;
-    close fd;
-
-    (* Execute the shell script. *)
-    (try execvp shell [| shell; "-c"; script |];
-     with Unix_error (err, fn, _) ->
-       Syslog.error "%s failed: %s: %s" fn script (error_message err)
+  let pre_condition () =
+    match job.job_pre with
+    | None -> true
+    | Some pre ->
+      let rs = ref [] in
+      IntMap.iter (
+        fun pid (job, _, serial, start_time) ->
+          let r = { pirun_job_name = job.job_name;
+                    pirun_serial = serial;
+                    pirun_start_time = start_time;
+                    pirun_pid = pid } in
+          rs := r :: !rs
+      ) !runningmap;
+      let preinfo = {
+        pi_job_name = job.job_name;
+        pi_serial = serial;
+        pi_variables = Whenstate.get_variables !state;
+        pi_running = !rs;
+      } in
+      pre preinfo
+  in
+  if pre_condition () then (
+    Syslog.notice "running %s (JOBSERIAL=%s)"
+      job.job_name (string_of_big_int serial);
+
+    (* Create a temporary directory.  The current directory of the job
+     * will be in this directory.  The directory is removed when the
+     * child process exits.
+     *)
+    let dir = tmpdir () in
+
+    let pid = fork () in
+    if pid = 0 then ( (* child process running the job *)
+      chdir dir;
+
+      (* Set environment variables corresponding to each variable. *)
+      List.iter
+        (fun (name, value) -> putenv name (string_of_variable value))
+        (Whenstate.get_variables !state);
+
+      (* Set the $JOBNAME environment variable. *)
+      putenv "JOBNAME" job.job_name;
+
+      (* Create a temporary file containing the shell script fragment. *)
+      let script = dir // "script.sh" in
+      let chan = open_out script in
+      fprintf chan "set -e\n"; (* So that jobs exit on error. *)
+      output_string chan job.job_script.sh_script;
+      close_out chan;
+      chmod script 0o700;
+
+      let shell = try getenv "SHELL" with Not_found -> "/bin/sh" in
+
+      (* Set output to file. *)
+      let output = dir // "output.txt" in
+      let fd = openfile output [O_WRONLY; O_CREAT; O_TRUNC; O_NOCTTY] 0o600 in
+      dup2 fd stdout;
+      dup2 fd stderr;
+      close fd;
+
+      (* Execute the shell script. *)
+      (try execvp shell [| shell; "-c"; script |];
+       with Unix_error (err, fn, _) ->
+         Syslog.error "%s failed: %s: %s" fn script (error_message err)
+      );
+      _exit 1
     );
-    _exit 1
-  );
 
-  (* Remember this PID, the job and the temporary directory, so we
-   * can clean up when the child exits.
-   *)
-  runningmap := IntMap.add pid (job, dir, serial, time ()) !runningmap;
-  serialmap := BigIntMap.add serial pid !serialmap
+    (* Remember this PID, the job and the temporary directory, so we
+     * can clean up when the child exits.
+     *)
+    runningmap := IntMap.add pid (job, dir, serial, time ()) !runningmap;
+    serialmap := BigIntMap.add serial pid !serialmap
+  )
+  else (
+    Syslog.notice "not running %s (JOBSERIAL=%s) because pre() condition returned false"
+      job.job_name (string_of_big_int serial);
+  )
 
 and tmpdir () =
   let chan = open_in "/dev/urandom" in
index 2cb6863..c2ffa33 100644 (file)
@@ -83,23 +83,6 @@ type shell_script = {
   sh_script : string;
 }
 
-type preinfo = {
-  pi_job_name : string;
-  pi_serial : Big_int.big_int;
-}
-
-type result = {
-  res_job_name : string;
-  res_serial : Big_int.big_int;
-  res_code : int;
-  res_tmpdir : string;
-  res_output : string;
-  res_start_time : float;
-}
-
-type pre = preinfo -> bool
-type post = result -> unit
-
 type variable =
   | T_unit
   | T_bool of bool
@@ -123,6 +106,31 @@ let rpc_of_variable = function
 
 type variables = variable StringMap.t
 
+type preinfo = {
+  pi_job_name : string;
+  pi_serial : Big_int.big_int;
+  pi_variables : (string * variable) list;
+  pi_running : preinfo_running_job list;
+}
+and preinfo_running_job = {
+  pirun_job_name : string;
+  pirun_serial : Big_int.big_int;
+  pirun_start_time : float;
+  pirun_pid : int;
+}
+
+type result = {
+  res_job_name : string;
+  res_serial : Big_int.big_int;
+  res_code : int;
+  res_tmpdir : string;
+  res_output : string;
+  res_start_time : float;
+}
+
+type pre = preinfo -> bool
+type post = result -> unit
+
 type job_cond =
   | When_job of whenexpr
   | Every_job of periodexpr
index 818c75d..7310387 100644 (file)
@@ -60,9 +60,33 @@ type shell_script = {
 }
 (** A shell script. *)
 
+type variable =
+  | T_unit
+  | T_bool of bool
+  | T_string of string
+  | T_int of Big_int.big_int
+  | T_float of float
+(** Typed variable (see also [whenproto.x]) *)
+
+val string_of_variable : variable -> string
+
+val variable_of_rpc : Whenproto_aux.variable -> variable
+val rpc_of_variable : variable -> Whenproto_aux.variable
+
+type variables = variable Whenutils.StringMap.t
+(** A set of variables. *)
+
 type preinfo = {
   pi_job_name : string;                 (** Job name. *)
   pi_serial : Big_int.big_int;          (** Job serial number. *)
+  pi_variables : (string * variable) list; (** Variables set in job. *)
+  pi_running : preinfo_running_job list; (** List of running jobs. *)
+}
+and preinfo_running_job = {
+  pirun_job_name : string;              (** Running job name. *)
+  pirun_serial : Big_int.big_int;       (** Running job serial number. *)
+  pirun_start_time : float;             (** Running job start time. *)
+  pirun_pid : int;                      (** Running job process ID. *)
 }
 (** Information available to pre function before the job runs. *)
 
@@ -80,22 +104,6 @@ type pre = preinfo -> bool
 type post = result -> unit
 (** Pre and post functions. *)
 
-type variable =
-  | T_unit
-  | T_bool of bool
-  | T_string of string
-  | T_int of Big_int.big_int
-  | T_float of float
-(** Typed variable (see also [whenproto.x]) *)
-
-val string_of_variable : variable -> string
-
-val variable_of_rpc : Whenproto_aux.variable -> variable
-val rpc_of_variable : variable -> Whenproto_aux.variable
-
-type variables = variable Whenutils.StringMap.t
-(** A set of variables. *)
-
 type job_cond =
   | When_job of whenexpr                (** when ... : << >> *)
   | Every_job of periodexpr             (** every ... : << >> *)
index cac698a..569421d 100644 (file)
@@ -39,6 +39,24 @@ let set_variable_float name value =
   check_valid_variable_name name;
   Whenfile.set_variable name (T_float value)
 
+type preinfo = Whenexpr.preinfo
+
+let max n preinfo =
+  let name = preinfo.pi_job_name in
+
+  (* Count running jobs with the same name. *)
+  let count = List.fold_left (
+    fun count ->
+      function
+      | { pirun_job_name = n } when n = name -> count + 1
+      | _ -> count
+  ) 0 preinfo.pi_running in
+
+  (* Only let this job run if there are fewer than n already running. *)
+  count < n
+
+let one () preinfo = max 1 preinfo
+
 type result = Whenexpr.result
 
 let mailto ?(only_on_failure = false) ?from email result =
index b5fa858..773af14 100644 (file)
@@ -34,6 +34,12 @@ val set_variable_string : string -> string -> unit
 val set_variable_float : string -> float -> unit
 (** Set variable (just a wrapper around {!Whenfile.set_variable}). *)
 
+type preinfo = Whenexpr.preinfo
+
+val max : int -> preinfo -> bool
+val one : unit -> preinfo -> bool
+(** Pre functions to limit number of jobs running. *)
+
 type result = Whenexpr.result
 
 val mailto : ?only_on_failure:bool -> ?from:string -> string -> result -> unit
index b8c76a6..fe3041b 100644 (file)
@@ -592,6 +592,37 @@ this example:
    Whentools.set_variable "name" "Richard";
    Whentools.set_variable_int "counter" 0
 
+=head3 PRE FUNCTIONS
+
+Before a job runs, you can arrange that a C<pre> function is called.
+This function may decide not to run the job (by returning C<false>).
+
+One use for this is to prevent a particular job from running if there
+is already an instance of the same job running:
+
+ job "only one"
+ pre (Whentools.one ())
+ every 10 seconds :
+ <<
+   # Takes longer than 10 seconds to run, but 'Whentools.one ()'
+   # will ensure only one is ever running.
+   sleep 11
+ >>
+
+When using pre functions, jobs must be given an explicit name, ie.
+you must use the C<job> statement.
+
+A number of pre functions are available in the library; see below.
+
+You can also write your own post functions (in OCaml).  The function
+is passed one argument which is a C<Whentools.preinfo> struct, defined
+below.  It should return a boolean: C<true> if the job should run, and
+C<false> if the job should not run.
+
+Note that a fresh serial number (see L</JOBSERIAL>) is assigned to
+each run, whether or not the job actually runs because of
+preconditions.
+
 =head3 POST FUNCTIONS
 
 After a job runs, you can control what happens to its output by
@@ -660,6 +691,22 @@ Here are some examples of using the mailto function:
    # do something
  >>
 
+=item B<Whentools.max> I<n>
+
+This built-in pre function ensures that a maximum of I<n> instances of
+the job are running.
+
+It checks the list of running jobs, and if I<n> or more instances are
+already running, then it returns C<false>, which ensures that the new
+job is not started.
+
+=item B<Whentools.one> I<()>
+
+This built-in pre function ensures that only one instance of the job
+is running.  It is the same as calling:
+
+ Whentools.max 1
+
 =item B<Whentools.set_variable> I<name> I<string>
 
 Set variable I<name> to the string.
@@ -687,6 +734,24 @@ Set variable I<name> to the floating point value I<f>.
 
 =over 4
 
+=item B<Whentools.preinfo>
+
+This structure is passed to pre functions.  It has the following
+fields:
+
+ type preinfo = {
+   pi_job_name : string;           # Job name.
+   pi_serial : Big_int.big_int;    # Job serial number.
+   pi_variables : (string * variable) list; # Variables set in job.
+   pi_running : preinfo_running_job list;   # List of running jobs.
+ }
+ and preinfo_running_job = {
+   pirun_job_name : string;        # Running job name.
+   pirun_serial : Big_int.big_int; # Running job serial number.
+   pirun_start_time : float;       # Running job start time.
+   pirun_pid : int;                # Running job process ID.
+ }
+
 =item B<Whentools.result>
 
 This structure is passed to post functions.  It has the following