Allow the output of a job to be listed. This is pretty easy since we
know the tmpdir and the output file.
-How to deal with stuck / long-running jobs?
+For Whentools.once and Whentools.max:
- pre() and post()
- (instead of cleanup)
+ - Add a ~group parameter so we can run no more than "n" jobs in one
+ group.
- - We should have instance/timeout settings that allows us to deal
- with jobs that run too long:
-
- job "foo" timeout 1 hour ... # timeout the job after an hour
- job "bar" single instance ... # only one instance can run
- job "bar" kill previous ... # kill previous if still running
+ - Add a ~kill_oldest (bool) parameter so we can kill the oldest jobs
+ and start the new job.
Implement native dynlink.
serial
| _ -> assert false in
- Syslog.notice "running %s (JOBSERIAL=%s)"
- job.job_name (string_of_big_int serial);
-
- (* Create a temporary directory. The current directory of the job
- * will be in this directory. The directory is removed when the
- * child process exits.
+ (* Call the pre-condition script. Note this may decide not to run
+ * the job by returning false.
*)
- let dir = tmpdir () in
-
- let pid = fork () in
- if pid = 0 then ( (* child process running the job *)
- chdir dir;
-
- (* Set environment variables corresponding to each variable. *)
- List.iter
- (fun (name, value) -> putenv name (string_of_variable value))
- (Whenstate.get_variables !state);
-
- (* Set the $JOBNAME environment variable. *)
- putenv "JOBNAME" job.job_name;
-
- (* Create a temporary file containing the shell script fragment. *)
- let script = dir // "script.sh" in
- let chan = open_out script in
- fprintf chan "set -e\n"; (* So that jobs exit on error. *)
- output_string chan job.job_script.sh_script;
- close_out chan;
- chmod script 0o700;
-
- let shell = try getenv "SHELL" with Not_found -> "/bin/sh" in
-
- (* Set output to file. *)
- let output = dir // "output.txt" in
- let fd = openfile output [O_WRONLY; O_CREAT; O_TRUNC; O_NOCTTY] 0o600 in
- dup2 fd stdout;
- dup2 fd stderr;
- close fd;
-
- (* Execute the shell script. *)
- (try execvp shell [| shell; "-c"; script |];
- with Unix_error (err, fn, _) ->
- Syslog.error "%s failed: %s: %s" fn script (error_message err)
+ let pre_condition () =
+ match job.job_pre with
+ | None -> true
+ | Some pre ->
+ let rs = ref [] in
+ IntMap.iter (
+ fun pid (job, _, serial, start_time) ->
+ let r = { pirun_job_name = job.job_name;
+ pirun_serial = serial;
+ pirun_start_time = start_time;
+ pirun_pid = pid } in
+ rs := r :: !rs
+ ) !runningmap;
+ let preinfo = {
+ pi_job_name = job.job_name;
+ pi_serial = serial;
+ pi_variables = Whenstate.get_variables !state;
+ pi_running = !rs;
+ } in
+ pre preinfo
+ in
+ if pre_condition () then (
+ Syslog.notice "running %s (JOBSERIAL=%s)"
+ job.job_name (string_of_big_int serial);
+
+ (* Create a temporary directory. The current directory of the job
+ * will be in this directory. The directory is removed when the
+ * child process exits.
+ *)
+ let dir = tmpdir () in
+
+ let pid = fork () in
+ if pid = 0 then ( (* child process running the job *)
+ chdir dir;
+
+ (* Set environment variables corresponding to each variable. *)
+ List.iter
+ (fun (name, value) -> putenv name (string_of_variable value))
+ (Whenstate.get_variables !state);
+
+ (* Set the $JOBNAME environment variable. *)
+ putenv "JOBNAME" job.job_name;
+
+ (* Create a temporary file containing the shell script fragment. *)
+ let script = dir // "script.sh" in
+ let chan = open_out script in
+ fprintf chan "set -e\n"; (* So that jobs exit on error. *)
+ output_string chan job.job_script.sh_script;
+ close_out chan;
+ chmod script 0o700;
+
+ let shell = try getenv "SHELL" with Not_found -> "/bin/sh" in
+
+ (* Set output to file. *)
+ let output = dir // "output.txt" in
+ let fd = openfile output [O_WRONLY; O_CREAT; O_TRUNC; O_NOCTTY] 0o600 in
+ dup2 fd stdout;
+ dup2 fd stderr;
+ close fd;
+
+ (* Execute the shell script. *)
+ (try execvp shell [| shell; "-c"; script |];
+ with Unix_error (err, fn, _) ->
+ Syslog.error "%s failed: %s: %s" fn script (error_message err)
+ );
+ _exit 1
);
- _exit 1
- );
- (* Remember this PID, the job and the temporary directory, so we
- * can clean up when the child exits.
- *)
- runningmap := IntMap.add pid (job, dir, serial, time ()) !runningmap;
- serialmap := BigIntMap.add serial pid !serialmap
+ (* Remember this PID, the job and the temporary directory, so we
+ * can clean up when the child exits.
+ *)
+ runningmap := IntMap.add pid (job, dir, serial, time ()) !runningmap;
+ serialmap := BigIntMap.add serial pid !serialmap
+ )
+ else (
+ Syslog.notice "not running %s (JOBSERIAL=%s) because pre() condition returned false"
+ job.job_name (string_of_big_int serial);
+ )
and tmpdir () =
let chan = open_in "/dev/urandom" in
sh_script : string;
}
-type preinfo = {
- pi_job_name : string;
- pi_serial : Big_int.big_int;
-}
-
-type result = {
- res_job_name : string;
- res_serial : Big_int.big_int;
- res_code : int;
- res_tmpdir : string;
- res_output : string;
- res_start_time : float;
-}
-
-type pre = preinfo -> bool
-type post = result -> unit
-
type variable =
| T_unit
| T_bool of bool
type variables = variable StringMap.t
+type preinfo = {
+ pi_job_name : string;
+ pi_serial : Big_int.big_int;
+ pi_variables : (string * variable) list;
+ pi_running : preinfo_running_job list;
+}
+and preinfo_running_job = {
+ pirun_job_name : string;
+ pirun_serial : Big_int.big_int;
+ pirun_start_time : float;
+ pirun_pid : int;
+}
+
+type result = {
+ res_job_name : string;
+ res_serial : Big_int.big_int;
+ res_code : int;
+ res_tmpdir : string;
+ res_output : string;
+ res_start_time : float;
+}
+
+type pre = preinfo -> bool
+type post = result -> unit
+
type job_cond =
| When_job of whenexpr
| Every_job of periodexpr
}
(** A shell script. *)
+type variable =
+ | T_unit
+ | T_bool of bool
+ | T_string of string
+ | T_int of Big_int.big_int
+ | T_float of float
+(** Typed variable (see also [whenproto.x]) *)
+
+val string_of_variable : variable -> string
+
+val variable_of_rpc : Whenproto_aux.variable -> variable
+val rpc_of_variable : variable -> Whenproto_aux.variable
+
+type variables = variable Whenutils.StringMap.t
+(** A set of variables. *)
+
type preinfo = {
pi_job_name : string; (** Job name. *)
pi_serial : Big_int.big_int; (** Job serial number. *)
+ pi_variables : (string * variable) list; (** Variables set in job. *)
+ pi_running : preinfo_running_job list; (** List of running jobs. *)
+}
+and preinfo_running_job = {
+ pirun_job_name : string; (** Running job name. *)
+ pirun_serial : Big_int.big_int; (** Running job serial number. *)
+ pirun_start_time : float; (** Running job start time. *)
+ pirun_pid : int; (** Running job process ID. *)
}
(** Information available to pre function before the job runs. *)
type post = result -> unit
(** Pre and post functions. *)
-type variable =
- | T_unit
- | T_bool of bool
- | T_string of string
- | T_int of Big_int.big_int
- | T_float of float
-(** Typed variable (see also [whenproto.x]) *)
-
-val string_of_variable : variable -> string
-
-val variable_of_rpc : Whenproto_aux.variable -> variable
-val rpc_of_variable : variable -> Whenproto_aux.variable
-
-type variables = variable Whenutils.StringMap.t
-(** A set of variables. *)
-
type job_cond =
| When_job of whenexpr (** when ... : << >> *)
| Every_job of periodexpr (** every ... : << >> *)
check_valid_variable_name name;
Whenfile.set_variable name (T_float value)
+type preinfo = Whenexpr.preinfo
+
+let max n preinfo =
+ let name = preinfo.pi_job_name in
+
+ (* Count running jobs with the same name. *)
+ let count = List.fold_left (
+ fun count ->
+ function
+ | { pirun_job_name = n } when n = name -> count + 1
+ | _ -> count
+ ) 0 preinfo.pi_running in
+
+ (* Only let this job run if there are fewer than n already running. *)
+ count < n
+
+let one () preinfo = max 1 preinfo
+
type result = Whenexpr.result
let mailto ?(only_on_failure = false) ?from email result =
val set_variable_float : string -> float -> unit
(** Set variable (just a wrapper around {!Whenfile.set_variable}). *)
+type preinfo = Whenexpr.preinfo
+
+val max : int -> preinfo -> bool
+val one : unit -> preinfo -> bool
+(** Pre functions to limit number of jobs running. *)
+
type result = Whenexpr.result
val mailto : ?only_on_failure:bool -> ?from:string -> string -> result -> unit
Whentools.set_variable "name" "Richard";
Whentools.set_variable_int "counter" 0
+=head3 PRE FUNCTIONS
+
+Before a job runs, you can arrange that a C<pre> function is called.
+This function may decide not to run the job (by returning C<false>).
+
+One use for this is to prevent a particular job from running if there
+is already an instance of the same job running:
+
+ job "only one"
+ pre (Whentools.one ())
+ every 10 seconds :
+ <<
+ # Takes longer than 10 seconds to run, but 'Whentools.one ()'
+ # will ensure only one is ever running.
+ sleep 11
+ >>
+
+When using pre functions, jobs must be given an explicit name, ie.
+you must use the C<job> statement.
+
+A number of pre functions are available in the library; see below.
+
+You can also write your own post functions (in OCaml). The function
+is passed one argument which is a C<Whentools.preinfo> struct, defined
+below. It should return a boolean: C<true> if the job should run, and
+C<false> if the job should not run.
+
+Note that a fresh serial number (see L</JOBSERIAL>) is assigned to
+each run, whether or not the job actually runs because of
+preconditions.
+
=head3 POST FUNCTIONS
After a job runs, you can control what happens to its output by
# do something
>>
+=item B<Whentools.max> I<n>
+
+This built-in pre function ensures that a maximum of I<n> instances of
+the job are running.
+
+It checks the list of running jobs, and if I<n> or more instances are
+already running, then it returns C<false>, which ensures that the new
+job is not started.
+
+=item B<Whentools.one> I<()>
+
+This built-in pre function ensures that only one instance of the job
+is running. It is the same as calling:
+
+ Whentools.max 1
+
=item B<Whentools.set_variable> I<name> I<string>
Set variable I<name> to the string.
=over 4
+=item B<Whentools.preinfo>
+
+This structure is passed to pre functions. It has the following
+fields:
+
+ type preinfo = {
+ pi_job_name : string; # Job name.
+ pi_serial : Big_int.big_int; # Job serial number.
+ pi_variables : (string * variable) list; # Variables set in job.
+ pi_running : preinfo_running_job list; # List of running jobs.
+ }
+ and preinfo_running_job = {
+ pirun_job_name : string; # Running job name.
+ pirun_serial : Big_int.big_int; # Running job serial number.
+ pirun_start_time : float; # Running job start time.
+ pirun_pid : int; # Running job process ID.
+ }
+
=item B<Whentools.result>
This structure is passed to post functions. It has the following