From: Richard W.M. Jones Date: Tue, 28 Feb 2012 12:56:35 +0000 (+0000) Subject: Implement pre functions. X-Git-Tag: 0.0.4~1 X-Git-Url: http://git.annexia.org/?p=whenjobs.git;a=commitdiff_plain;h=2ce29ff559f9bc36733ab2dde5b657eaa76ea8a6 Implement pre functions. --- diff --git a/TODO b/TODO index a4e253b..88bd0ad 100644 --- a/TODO +++ b/TODO @@ -1,16 +1,12 @@ Allow the output of a job to be listed. This is pretty easy since we know the tmpdir and the output file. -How to deal with stuck / long-running jobs? +For Whentools.once and Whentools.max: - pre() and post() - (instead of cleanup) + - Add a ~group parameter so we can run no more than "n" jobs in one + group. - - We should have instance/timeout settings that allows us to deal - with jobs that run too long: - - job "foo" timeout 1 hour ... # timeout the job after an hour - job "bar" single instance ... # only one instance can run - job "bar" kill previous ... # kill previous if still running + - Add a ~kill_oldest (bool) parameter so we can kill the oldest jobs + and start the new job. Implement native dynlink. diff --git a/daemon/daemon.ml b/daemon/daemon.ml index 25b81b7..ac0f1ff 100644 --- a/daemon/daemon.ml +++ b/daemon/daemon.ml @@ -329,57 +329,87 @@ and run_job job = serial | _ -> assert false in - Syslog.notice "running %s (JOBSERIAL=%s)" - job.job_name (string_of_big_int serial); - - (* Create a temporary directory. The current directory of the job - * will be in this directory. The directory is removed when the - * child process exits. + (* Call the pre-condition script. Note this may decide not to run + * the job by returning false. *) - let dir = tmpdir () in - - let pid = fork () in - if pid = 0 then ( (* child process running the job *) - chdir dir; - - (* Set environment variables corresponding to each variable. *) - List.iter - (fun (name, value) -> putenv name (string_of_variable value)) - (Whenstate.get_variables !state); - - (* Set the $JOBNAME environment variable. *) - putenv "JOBNAME" job.job_name; - - (* Create a temporary file containing the shell script fragment. *) - let script = dir // "script.sh" in - let chan = open_out script in - fprintf chan "set -e\n"; (* So that jobs exit on error. *) - output_string chan job.job_script.sh_script; - close_out chan; - chmod script 0o700; - - let shell = try getenv "SHELL" with Not_found -> "/bin/sh" in - - (* Set output to file. *) - let output = dir // "output.txt" in - let fd = openfile output [O_WRONLY; O_CREAT; O_TRUNC; O_NOCTTY] 0o600 in - dup2 fd stdout; - dup2 fd stderr; - close fd; - - (* Execute the shell script. *) - (try execvp shell [| shell; "-c"; script |]; - with Unix_error (err, fn, _) -> - Syslog.error "%s failed: %s: %s" fn script (error_message err) + let pre_condition () = + match job.job_pre with + | None -> true + | Some pre -> + let rs = ref [] in + IntMap.iter ( + fun pid (job, _, serial, start_time) -> + let r = { pirun_job_name = job.job_name; + pirun_serial = serial; + pirun_start_time = start_time; + pirun_pid = pid } in + rs := r :: !rs + ) !runningmap; + let preinfo = { + pi_job_name = job.job_name; + pi_serial = serial; + pi_variables = Whenstate.get_variables !state; + pi_running = !rs; + } in + pre preinfo + in + if pre_condition () then ( + Syslog.notice "running %s (JOBSERIAL=%s)" + job.job_name (string_of_big_int serial); + + (* Create a temporary directory. The current directory of the job + * will be in this directory. The directory is removed when the + * child process exits. + *) + let dir = tmpdir () in + + let pid = fork () in + if pid = 0 then ( (* child process running the job *) + chdir dir; + + (* Set environment variables corresponding to each variable. *) + List.iter + (fun (name, value) -> putenv name (string_of_variable value)) + (Whenstate.get_variables !state); + + (* Set the $JOBNAME environment variable. *) + putenv "JOBNAME" job.job_name; + + (* Create a temporary file containing the shell script fragment. *) + let script = dir // "script.sh" in + let chan = open_out script in + fprintf chan "set -e\n"; (* So that jobs exit on error. *) + output_string chan job.job_script.sh_script; + close_out chan; + chmod script 0o700; + + let shell = try getenv "SHELL" with Not_found -> "/bin/sh" in + + (* Set output to file. *) + let output = dir // "output.txt" in + let fd = openfile output [O_WRONLY; O_CREAT; O_TRUNC; O_NOCTTY] 0o600 in + dup2 fd stdout; + dup2 fd stderr; + close fd; + + (* Execute the shell script. *) + (try execvp shell [| shell; "-c"; script |]; + with Unix_error (err, fn, _) -> + Syslog.error "%s failed: %s: %s" fn script (error_message err) + ); + _exit 1 ); - _exit 1 - ); - (* Remember this PID, the job and the temporary directory, so we - * can clean up when the child exits. - *) - runningmap := IntMap.add pid (job, dir, serial, time ()) !runningmap; - serialmap := BigIntMap.add serial pid !serialmap + (* Remember this PID, the job and the temporary directory, so we + * can clean up when the child exits. + *) + runningmap := IntMap.add pid (job, dir, serial, time ()) !runningmap; + serialmap := BigIntMap.add serial pid !serialmap + ) + else ( + Syslog.notice "not running %s (JOBSERIAL=%s) because pre() condition returned false" + job.job_name (string_of_big_int serial); + ) and tmpdir () = let chan = open_in "/dev/urandom" in diff --git a/lib/whenexpr.ml b/lib/whenexpr.ml index 2cb6863..c2ffa33 100644 --- a/lib/whenexpr.ml +++ b/lib/whenexpr.ml @@ -83,23 +83,6 @@ type shell_script = { sh_script : string; } -type preinfo = { - pi_job_name : string; - pi_serial : Big_int.big_int; -} - -type result = { - res_job_name : string; - res_serial : Big_int.big_int; - res_code : int; - res_tmpdir : string; - res_output : string; - res_start_time : float; -} - -type pre = preinfo -> bool -type post = result -> unit - type variable = | T_unit | T_bool of bool @@ -123,6 +106,31 @@ let rpc_of_variable = function type variables = variable StringMap.t +type preinfo = { + pi_job_name : string; + pi_serial : Big_int.big_int; + pi_variables : (string * variable) list; + pi_running : preinfo_running_job list; +} +and preinfo_running_job = { + pirun_job_name : string; + pirun_serial : Big_int.big_int; + pirun_start_time : float; + pirun_pid : int; +} + +type result = { + res_job_name : string; + res_serial : Big_int.big_int; + res_code : int; + res_tmpdir : string; + res_output : string; + res_start_time : float; +} + +type pre = preinfo -> bool +type post = result -> unit + type job_cond = | When_job of whenexpr | Every_job of periodexpr diff --git a/lib/whenexpr.mli b/lib/whenexpr.mli index 818c75d..7310387 100644 --- a/lib/whenexpr.mli +++ b/lib/whenexpr.mli @@ -60,9 +60,33 @@ type shell_script = { } (** A shell script. *) +type variable = + | T_unit + | T_bool of bool + | T_string of string + | T_int of Big_int.big_int + | T_float of float +(** Typed variable (see also [whenproto.x]) *) + +val string_of_variable : variable -> string + +val variable_of_rpc : Whenproto_aux.variable -> variable +val rpc_of_variable : variable -> Whenproto_aux.variable + +type variables = variable Whenutils.StringMap.t +(** A set of variables. *) + type preinfo = { pi_job_name : string; (** Job name. *) pi_serial : Big_int.big_int; (** Job serial number. *) + pi_variables : (string * variable) list; (** Variables set in job. *) + pi_running : preinfo_running_job list; (** List of running jobs. *) +} +and preinfo_running_job = { + pirun_job_name : string; (** Running job name. *) + pirun_serial : Big_int.big_int; (** Running job serial number. *) + pirun_start_time : float; (** Running job start time. *) + pirun_pid : int; (** Running job process ID. *) } (** Information available to pre function before the job runs. *) @@ -80,22 +104,6 @@ type pre = preinfo -> bool type post = result -> unit (** Pre and post functions. *) -type variable = - | T_unit - | T_bool of bool - | T_string of string - | T_int of Big_int.big_int - | T_float of float -(** Typed variable (see also [whenproto.x]) *) - -val string_of_variable : variable -> string - -val variable_of_rpc : Whenproto_aux.variable -> variable -val rpc_of_variable : variable -> Whenproto_aux.variable - -type variables = variable Whenutils.StringMap.t -(** A set of variables. *) - type job_cond = | When_job of whenexpr (** when ... : << >> *) | Every_job of periodexpr (** every ... : << >> *) diff --git a/lib/whentools.ml b/lib/whentools.ml index cac698a..569421d 100644 --- a/lib/whentools.ml +++ b/lib/whentools.ml @@ -39,6 +39,24 @@ let set_variable_float name value = check_valid_variable_name name; Whenfile.set_variable name (T_float value) +type preinfo = Whenexpr.preinfo + +let max n preinfo = + let name = preinfo.pi_job_name in + + (* Count running jobs with the same name. *) + let count = List.fold_left ( + fun count -> + function + | { pirun_job_name = n } when n = name -> count + 1 + | _ -> count + ) 0 preinfo.pi_running in + + (* Only let this job run if there are fewer than n already running. *) + count < n + +let one () preinfo = max 1 preinfo + type result = Whenexpr.result let mailto ?(only_on_failure = false) ?from email result = diff --git a/lib/whentools.mli b/lib/whentools.mli index b5fa858..773af14 100644 --- a/lib/whentools.mli +++ b/lib/whentools.mli @@ -34,6 +34,12 @@ val set_variable_string : string -> string -> unit val set_variable_float : string -> float -> unit (** Set variable (just a wrapper around {!Whenfile.set_variable}). *) +type preinfo = Whenexpr.preinfo + +val max : int -> preinfo -> bool +val one : unit -> preinfo -> bool +(** Pre functions to limit number of jobs running. *) + type result = Whenexpr.result val mailto : ?only_on_failure:bool -> ?from:string -> string -> result -> unit diff --git a/tools/whenjobs.pod b/tools/whenjobs.pod index b8c76a6..fe3041b 100644 --- a/tools/whenjobs.pod +++ b/tools/whenjobs.pod @@ -592,6 +592,37 @@ this example: Whentools.set_variable "name" "Richard"; Whentools.set_variable_int "counter" 0 +=head3 PRE FUNCTIONS + +Before a job runs, you can arrange that a C
 function is called.
+This function may decide not to run the job (by returning C).
+
+One use for this is to prevent a particular job from running if there
+is already an instance of the same job running:
+
+ job "only one"
+ pre (Whentools.one ())
+ every 10 seconds :
+ <<
+   # Takes longer than 10 seconds to run, but 'Whentools.one ()'
+   # will ensure only one is ever running.
+   sleep 11
+ >>
+
+When using pre functions, jobs must be given an explicit name, ie.
+you must use the C statement.
+
+A number of pre functions are available in the library; see below.
+
+You can also write your own post functions (in OCaml).  The function
+is passed one argument which is a C struct, defined
+below.  It should return a boolean: C if the job should run, and
+C if the job should not run.
+
+Note that a fresh serial number (see L) is assigned to
+each run, whether or not the job actually runs because of
+preconditions.
+
 =head3 POST FUNCTIONS
 
 After a job runs, you can control what happens to its output by
@@ -660,6 +691,22 @@ Here are some examples of using the mailto function:
    # do something
  >>
 
+=item B I
+
+This built-in pre function ensures that a maximum of I instances of
+the job are running.
+
+It checks the list of running jobs, and if I or more instances are
+already running, then it returns C, which ensures that the new
+job is not started.
+
+=item B I<()>
+
+This built-in pre function ensures that only one instance of the job
+is running.  It is the same as calling:
+
+ Whentools.max 1
+
 =item B I I
 
 Set variable I to the string.
@@ -687,6 +734,24 @@ Set variable I to the floating point value I.
 
 =over 4
 
+=item B
+
+This structure is passed to pre functions.  It has the following
+fields:
+
+ type preinfo = {
+   pi_job_name : string;           # Job name.
+   pi_serial : Big_int.big_int;    # Job serial number.
+   pi_variables : (string * variable) list; # Variables set in job.
+   pi_running : preinfo_running_job list;   # List of running jobs.
+ }
+ and preinfo_running_job = {
+   pirun_job_name : string;        # Running job name.
+   pirun_serial : Big_int.big_int; # Running job serial number.
+   pirun_start_time : float;       # Running job start time.
+   pirun_pid : int;                # Running job process ID.
+ }
+
 =item B
 
 This structure is passed to post functions.  It has the following