From f35f462b83f860958da38347d30b45986b8f4774 Mon Sep 17 00:00:00 2001 From: "Richard W.M. Jones" Date: Thu, 23 Feb 2012 22:09:36 +0000 Subject: [PATCH] Add commands to list, cancel, start jobs. --- daemon/daemon.ml | 53 ++++++++++++++++++++++++++++++++++------- lib/whenexpr.ml | 1 + lib/whenexpr.mli | 1 + lib/whenproto.x | 16 +++++++++++++ lib/whenstate.ml | 2 +- lib/whenstate.mli | 2 +- lib/whenutils.ml | 6 +++++ lib/whenutils.mli | 33 +++++++++++++++++++++++++ tools/whenjobs.ml | 70 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ tools/whenjobs.pod | 39 ++++++++++++++++++++++++++---- 10 files changed, 208 insertions(+), 15 deletions(-) diff --git a/daemon/daemon.ml b/daemon/daemon.ml index ddbb9cd..c3e1abd 100644 --- a/daemon/daemon.ml +++ b/daemon/daemon.ml @@ -32,11 +32,14 @@ let jobsdir = ref "" (* The state. *) let state = ref Whenstate.empty -(* Jobs that are running: a map of PID -> (job, tmpdir, serial). +(* Jobs that are running: a map of PID -> (job, tmpdir, serial, start_time). * Note that the job may no longer exist *OR* it may have been renamed, * eg. if the jobs file was reloaded. *) -let running = ref IntMap.empty +let runningmap = ref IntMap.empty + +(* Serial numbers of running jobs. Map of serial -> PID (in runningmap). *) +let serialmap = ref BigIntMap.empty (* Was debugging requested on the command line? *) let debug = ref false @@ -72,6 +75,9 @@ let rec init j d = ~proc_get_variable ~proc_get_variable_names ~proc_exit_daemon + ~proc_get_jobs + ~proc_cancel_job + ~proc_start_job (Rpc_server.Unix addr) Rpc.Tcp (* not TCP, this is the same as SOCK_STREAM *) Rpc.Socket @@ -133,6 +139,34 @@ and proc_exit_daemon () = server := None; `ok +and proc_get_jobs () = + let running = Array.of_list (IntMap.values !runningmap) in + Array.map ( + fun (job, dir, serial, start_time) -> + { Whenproto_aux.job_name = job.job_name; + job_serial = string_of_big_int serial; + job_tmpdir = dir; job_start_time = Int64.of_float start_time } + ) running + +and proc_cancel_job serial = + try + let serial = big_int_of_string serial in + let pid = BigIntMap.find serial !serialmap in + kill pid 15; + `ok + with + | Not_found -> `error "job not found" + | exn -> `error (Printexc.to_string exn) + +and proc_start_job jobname = + try + let job = Whenstate.get_job !state jobname in + run_job job; + `ok + with + | Not_found -> `error "job not found" + | exn -> `error (Printexc.to_string exn) + (* Reload the jobs file. *) and reload_file () = let file = sprintf "%s/jobs.cmo" !jobsdir in @@ -344,7 +378,8 @@ and run_job job = (* Remember this PID, the job and the temporary directory, so we * can clean up when the child exits. *) - running := IntMap.add pid (job, dir, serial) !running + runningmap := IntMap.add pid (job, dir, serial, time ()) !runningmap; + serialmap := BigIntMap.add serial pid !serialmap and tmpdir () = let chan = open_in "/dev/urandom" in @@ -362,13 +397,14 @@ and handle_sigchld _ = let pid, status = waitpid [WNOHANG] 0 in if pid > 0 then ( (* Look up the PID in the running jobs map. *) - let job, dir, serial = IntMap.find pid !running in - running := IntMap.remove pid !running; - cleanup_job job dir serial status + let job, dir, serial, time = IntMap.find pid !runningmap in + runningmap := IntMap.remove pid !runningmap; + serialmap := BigIntMap.remove serial !serialmap; + cleanup_job job dir serial time status ) with Unix_error _ | Not_found -> () -and cleanup_job job dir serial status = +and cleanup_job job dir serial time status = (* If there is a cleanup function, run it. *) (match job.job_cleanup with | None -> () @@ -382,7 +418,8 @@ and cleanup_job job dir serial status = res_serial = serial; res_code = code; res_tmpdir = dir; - res_output = dir // "output.txt" + res_output = dir // "output.txt"; + res_start_time = time } in try cleanup result with diff --git a/lib/whenexpr.ml b/lib/whenexpr.ml index df1665b..6e12a5d 100644 --- a/lib/whenexpr.ml +++ b/lib/whenexpr.ml @@ -89,6 +89,7 @@ type result = { res_code : int; res_tmpdir : string; res_output : string; + res_start_time : float; } type cleanup = result -> unit diff --git a/lib/whenexpr.mli b/lib/whenexpr.mli index f1b8e2e..91bb134 100644 --- a/lib/whenexpr.mli +++ b/lib/whenexpr.mli @@ -66,6 +66,7 @@ type result = { res_code : int; (** Return code from the script. *) res_tmpdir : string; (** Temporary directory. *) res_output : string; (** Filename of output from job. *) + res_start_time : float; (** When the job started. *) } (** Result of the run of a job. *) diff --git a/lib/whenproto.x b/lib/whenproto.x index 7988c78..280265d 100644 --- a/lib/whenproto.x +++ b/lib/whenproto.x @@ -24,11 +24,15 @@ /* Maximum lengths and some useful typedefs. */ const MAX_VARIABLE_NAME_LENGTH = 256; const MAX_VARIABLE_VALUE_LENGTH = 65536; +const MAX_JOB_NAME_LENGTH = 256; const MAX_BIG_INT_LENGTH = 64; /* when encoded as a string */ +const MAX_PATH_LENGTH = 4096; typedef string variable_name; typedef string string_value; +typedef string job_name; typedef string string_big_int; +typedef string path; typedef variable_name variable_name_list<>; @@ -66,6 +70,15 @@ union variable switch (variable_type t) { double f; /* C 'double' maps to an OCaml 'float' */ }; +struct job { + job_name job_name; + string_big_int job_serial; + path job_tmpdir; + hyper job_start_time; +}; + +typedef job job_list<>; + /* The API of the daemon. */ program When { version V1 { @@ -74,5 +87,8 @@ program When { variable get_variable (variable_name) = 3; variable_name_list get_variable_names (void) = 4; status exit_daemon (void) = 5; + job_list get_jobs (void) = 6; + status cancel_job (string_big_int) = 7; + status start_job (job_name) = 8; } = 1; } = 0x20008081; diff --git a/lib/whenstate.ml b/lib/whenstate.ml index ddbc302..fe53a16 100644 --- a/lib/whenstate.ml +++ b/lib/whenstate.ml @@ -115,7 +115,7 @@ let get_everyjobs t = List.filter (function { job_cond = Every_job _ } -> true | _ -> false) t.jobs let get_job t jobname = - try StringMap.find jobname t.jobmap with Not_found -> assert false + StringMap.find jobname t.jobmap let evaluate_whenjob ?(onload = false) t job = match job with diff --git a/lib/whenstate.mli b/lib/whenstate.mli index a8de795..084cd49 100644 --- a/lib/whenstate.mli +++ b/lib/whenstate.mli @@ -70,7 +70,7 @@ val get_everyjobs : t -> Whenexpr.job list (** Return all of the when-jobs / every-jobs. *) val get_job : t -> string -> Whenexpr.job -(** Return the named job. *) +(** Return the named job, or raise [Not_found]. *) val evaluate_whenjob : ?onload:bool -> t -> Whenexpr.job -> bool * t (** This evaluates the whenjob and returns [true] iff the whenjob diff --git a/lib/whenutils.ml b/lib/whenutils.ml index 583d030..ca26755 100644 --- a/lib/whenutils.ml +++ b/lib/whenutils.ml @@ -37,6 +37,12 @@ module IntMap = struct let values m = fold (fun _ v vs -> v :: vs) m [] end +module BigIntMap = struct + include Map.Make (struct type t = big_int let compare = compare_big_int end) + let keys m = fold (fun k _ ks -> k :: ks) m [] + let values m = fold (fun _ v vs -> v :: vs) m [] +end + module StringSet = Set.Make (String) let (//) = Filename.concat diff --git a/lib/whenutils.mli b/lib/whenutils.mli index 61cd4f8..00b2ff1 100644 --- a/lib/whenutils.mli +++ b/lib/whenutils.mli @@ -84,6 +84,39 @@ module IntMap : sig end (** A map from int to any type. *) +module BigIntMap : sig + type key = Big_int.big_int + type 'a t + val empty : 'a t + val is_empty : 'a t -> bool + val mem : key -> 'a t -> bool + val add : key -> 'a -> 'a t -> 'a t + (*val singleton : key -> 'a -> 'a t*) + val remove : key -> 'a t -> 'a t + (*val merge : + (key -> 'a option -> 'b option -> 'c option) -> 'a t -> 'b t -> 'c t*) + val compare : ('a -> 'a -> int) -> 'a t -> 'a t -> int + val equal : ('a -> 'a -> bool) -> 'a t -> 'a t -> bool + val iter : (key -> 'a -> unit) -> 'a t -> unit + val fold : (key -> 'a -> 'b -> 'b) -> 'a t -> 'b -> 'b + (*val for_all : (key -> 'a -> bool) -> 'a t -> bool + val exists : (key -> 'a -> bool) -> 'a t -> bool + val filter : (key -> 'a -> bool) -> 'a t -> 'a t + val partition : (key -> 'a -> bool) -> 'a t -> 'a t * 'a t + val cardinal : 'a t -> int + val bindings : 'a t -> (key * 'a) list + val min_binding : 'a t -> key * 'a + val max_binding : 'a t -> key * 'a + val choose : 'a t -> key * 'a + val split : key -> 'a t -> 'a t * 'a option * 'a t*) + val find : key -> 'a t -> 'a + val map : ('a -> 'b) -> 'a t -> 'b t + val mapi : (key -> 'a -> 'b) -> 'a t -> 'b t + val keys : 'a t -> key list + val values : 'a t -> 'a list +end +(** A map from big_int to any type. *) + module StringSet : sig type elt = String.t type t = Set.Make(String).t diff --git a/tools/whenjobs.ml b/tools/whenjobs.ml index 8472672..bd26545 100644 --- a/tools/whenjobs.ml +++ b/tools/whenjobs.ml @@ -20,6 +20,8 @@ open Big_int open Unix open Printf +open Whenutils + (* Ensures that Whentools module is linked to the whenjobs tool. *) let _ = Whentools.set_variable @@ -79,6 +81,7 @@ let rec main () = in let argspec = Arg.align [ + "--cancel", Arg.Unit (set_mode `Cancel), " Cancel a job"; "--daemon-start", Arg.Unit (set_mode `Daemon_start), " Start the daemon"; "--daemon-status", Arg.Unit (set_mode `Daemon_status), " Display the status of the daemon"; "--daemon-stop", Arg.Unit (set_mode `Daemon_stop), " Stop the daemon"; @@ -86,10 +89,12 @@ let rec main () = "-e", Arg.Unit (set_mode `Edit), " Edit and upload the script"; "--edit", Arg.Unit (set_mode `Edit), " Edit and upload the script"; "--get", Arg.Unit (set_mode `Get), " Display the variable"; + "--jobs", Arg.Unit (set_mode `Jobs), " List running jobs"; "-l", Arg.Unit (set_mode `List), " List the script"; "--list", Arg.Unit (set_mode `List), " List the script"; "--lib", Arg.Set_string libdir, "dir Specify directory that contains pa_when.cmo"; "--set", Arg.Unit (set_mode `Set), " Set the variable"; + "--start", Arg.Unit (set_mode `Start), " Start a job manually"; "--type", Arg.Set_string typ, "bool|int|float|string|unit Set the variable type"; "--upload", Arg.Unit (set_mode `Upload), " Upload the script"; "--variables", Arg.Unit (set_mode `Variables), " Display all variables and values"; @@ -194,6 +199,27 @@ Options: unused_error args "--daemon-status"; daemon_status () + | Some `Jobs -> + unused_error args "--jobs"; + jobs () + + | Some `Cancel -> + if List.length args != 1 then ( + eprintf "whenjobs --cancel serial\n"; + suggest_help (); + exit 1 + ); + cancel_job (List.hd args) + + | Some `Start -> + if List.length args != 1 then ( + eprintf "whenjobs --start jobname\n"; + eprintf "If 'value' contains spaces, you may need to quote it.\n"; + suggest_help (); + exit 1 + ); + start_job (List.hd args) + and edit_file () = (* If there is no initial file, create an empty one containing the * tutorial. @@ -351,6 +377,50 @@ and daemon_restart () = and daemon_status () = assert false +and jobs () = + let client = start_client () in + let jobs = Whenproto_clnt.When.V1.get_jobs client () in + stop_client client; + + let cmp { Whenproto_aux.job_name = name1; job_serial = serial1 } + { Whenproto_aux.job_name = name2; job_serial = serial2 } = + let i = compare name1 name2 in + if i <> 0 then i + else + compare_big_int (big_int_of_string serial1) (big_int_of_string serial2) + in + Array.sort cmp jobs; + + Array.iter ( + fun { Whenproto_aux.job_serial = serial; job_name = name; + job_tmpdir = tmpdir; job_start_time = time } -> + printf "%s %s\n\trunning in: %s\n\tstarted at: %s\n" + serial name tmpdir + (string_of_time_t ~localtime:true (Int64.to_float time)) + ) jobs + +and cancel_job serial = + let client = start_client () in + (match Whenproto_clnt.When.V1.cancel_job client serial with + | `ok -> () + | `error msg -> + eprintf "whenjobs: cancel-job: %s\n" msg; + suggest_check_server_logs (); + exit 1 + ); + stop_client client + +and start_job name = + let client = start_client () in + (match Whenproto_clnt.When.V1.start_job client name with + | `ok -> () + | `error msg -> + eprintf "whenjobs: start-job: %s\n" msg; + suggest_check_server_logs (); + exit 1 + ); + stop_client client + and unused_error args op = if args <> [] then ( eprintf "whenjobs %s: unused parameters on the command line.\n" op; diff --git a/tools/whenjobs.pod b/tools/whenjobs.pod index 1c96648..2958eba 100644 --- a/tools/whenjobs.pod +++ b/tools/whenjobs.pod @@ -24,6 +24,12 @@ Start and stop the per-user daemon: whenjobs --daemon-status whenjobs --daemon-restart +Examine running jobs: + + whenjobs --jobs + whenjobs --cancel serial + whenjobs --start "name" + =head1 DESCRIPTION Whenjobs is a powerful but simple replacement for cron. It lets you @@ -83,6 +89,14 @@ The act of setting a variable (using I<--set>) can trigger jobs to run. =over 4 +=item B<--cancel> serial + +Cancel the job with the given serial number. + +Use I<--jobs> to list running jobs along with their serial numbers. +The serial number is also available in the job script (as +C<$JOBSERIAL>) and in the log file. + =item B<--daemon-start> =item B<--daemon-stop> @@ -112,6 +126,13 @@ C is used. Print the value of a variable. +=item B<--jobs> + +List all running jobs. + +Note that it is possible for the same job to be running more than once +(for example, a periodic job that takes longer than the period to run). + =item B<-l> =item B<--list> @@ -128,6 +149,13 @@ source, eg: whenjobs --lib $builddir/lib -e +=item B<--start> "job name" + +Start the job immediately and unconditionally. + +This runs the job even if its normal preconditions are not met. This +may cause unexpected results, so use with caution. + =item B<--set> variable value =item B<--type> bool|int|float|string|unit @@ -664,11 +692,12 @@ This structure is passed to cleanup functions. It has the following fields: type result = { - res_job_name : string; # job name - res_serial : big_int; # job serial (same as $JOBSERIAL) - res_code : int; # return code from the shell script - res_tmpdir : string; # temporary directory script ran in - res_output : string; # filename of stdout/stderr output + res_job_name : string; # job name + res_serial : big_int; # job serial (same as $JOBSERIAL) + res_code : int; # return code from the shell script + res_tmpdir : string; # temporary directory script ran in + res_output : string; # filename of stdout/stderr output + res_start_time : float; # when the job started } =back -- 1.8.3.1