Add commands to list, cancel, start jobs.
authorRichard W.M. Jones <rjones@redhat.com>
Thu, 23 Feb 2012 22:09:36 +0000 (22:09 +0000)
committerRichard W.M. Jones <rjones@redhat.com>
Thu, 23 Feb 2012 22:43:44 +0000 (22:43 +0000)
daemon/daemon.ml
lib/whenexpr.ml
lib/whenexpr.mli
lib/whenproto.x
lib/whenstate.ml
lib/whenstate.mli
lib/whenutils.ml
lib/whenutils.mli
tools/whenjobs.ml
tools/whenjobs.pod

index ddbb9cd..c3e1abd 100644 (file)
@@ -32,11 +32,14 @@ let jobsdir = ref ""
 (* The state. *)
 let state = ref Whenstate.empty
 
-(* Jobs that are running: a map of PID -> (job, tmpdir, serial).
+(* Jobs that are running: a map of PID -> (job, tmpdir, serial, start_time).
  * Note that the job may no longer exist *OR* it may have been renamed,
  * eg. if the jobs file was reloaded.
  *)
-let running = ref IntMap.empty
+let runningmap = ref IntMap.empty
+
+(* Serial numbers of running jobs.  Map of serial -> PID (in runningmap). *)
+let serialmap = ref BigIntMap.empty
 
 (* Was debugging requested on the command line? *)
 let debug = ref false
@@ -72,6 +75,9 @@ let rec init j d =
       ~proc_get_variable
       ~proc_get_variable_names
       ~proc_exit_daemon
+      ~proc_get_jobs
+      ~proc_cancel_job
+      ~proc_start_job
       (Rpc_server.Unix addr)
       Rpc.Tcp (* not TCP, this is the same as SOCK_STREAM *)
       Rpc.Socket
@@ -133,6 +139,34 @@ and proc_exit_daemon () =
     server := None;
     `ok
 
+and proc_get_jobs () =
+  let running = Array.of_list (IntMap.values !runningmap) in
+  Array.map (
+    fun (job, dir, serial, start_time) ->
+      { Whenproto_aux.job_name = job.job_name;
+        job_serial = string_of_big_int serial;
+        job_tmpdir = dir; job_start_time = Int64.of_float start_time }
+  ) running
+
+and proc_cancel_job serial =
+  try
+    let serial = big_int_of_string serial in
+    let pid = BigIntMap.find serial !serialmap in
+    kill pid 15;
+    `ok
+  with
+  | Not_found -> `error "job not found"
+  | exn -> `error (Printexc.to_string exn)
+
+and proc_start_job jobname =
+  try
+    let job = Whenstate.get_job !state jobname in
+    run_job job;
+    `ok
+  with
+  | Not_found -> `error "job not found"
+  | exn -> `error (Printexc.to_string exn)
+
 (* Reload the jobs file. *)
 and reload_file () =
   let file = sprintf "%s/jobs.cmo" !jobsdir in
@@ -344,7 +378,8 @@ and run_job job =
   (* Remember this PID, the job and the temporary directory, so we
    * can clean up when the child exits.
    *)
-  running := IntMap.add pid (job, dir, serial) !running
+  runningmap := IntMap.add pid (job, dir, serial, time ()) !runningmap;
+  serialmap := BigIntMap.add serial pid !serialmap
 
 and tmpdir () =
   let chan = open_in "/dev/urandom" in
@@ -362,13 +397,14 @@ and handle_sigchld _ =
     let pid, status = waitpid [WNOHANG] 0 in
     if pid > 0 then (
       (* Look up the PID in the running jobs map. *)
-      let job, dir, serial = IntMap.find pid !running in
-      running := IntMap.remove pid !running;
-      cleanup_job job dir serial status
+      let job, dir, serial, time = IntMap.find pid !runningmap in
+      runningmap := IntMap.remove pid !runningmap;
+      serialmap := BigIntMap.remove serial !serialmap;
+      cleanup_job job dir serial time status
     )
   with Unix_error _ | Not_found -> ()
 
-and cleanup_job job dir serial status =
+and cleanup_job job dir serial time status =
   (* If there is a cleanup function, run it. *)
   (match job.job_cleanup with
   | None -> ()
@@ -382,7 +418,8 @@ and cleanup_job job dir serial status =
       res_serial = serial;
       res_code = code;
       res_tmpdir = dir;
-      res_output = dir // "output.txt"
+      res_output = dir // "output.txt";
+      res_start_time = time
     } in
     try cleanup result
     with
index df1665b..6e12a5d 100644 (file)
@@ -89,6 +89,7 @@ type result = {
   res_code : int;
   res_tmpdir : string;
   res_output : string;
+  res_start_time : float;
 }
 
 type cleanup = result -> unit
index f1b8e2e..91bb134 100644 (file)
@@ -66,6 +66,7 @@ type result = {
   res_code : int;                       (** Return code from the script. *)
   res_tmpdir : string;                  (** Temporary directory. *)
   res_output : string;                  (** Filename of output from job. *)
+  res_start_time : float;               (** When the job started. *)
 }
 (** Result of the run of a job. *)
 
index 7988c78..280265d 100644 (file)
 /* Maximum lengths and some useful typedefs. */
 const MAX_VARIABLE_NAME_LENGTH = 256;
 const MAX_VARIABLE_VALUE_LENGTH = 65536;
+const MAX_JOB_NAME_LENGTH = 256;
 const MAX_BIG_INT_LENGTH = 64; /* when encoded as a string */
+const MAX_PATH_LENGTH = 4096;
 
 typedef string variable_name<MAX_VARIABLE_NAME_LENGTH>;
 typedef string string_value<MAX_VARIABLE_VALUE_LENGTH>;
+typedef string job_name<MAX_JOB_NAME_LENGTH>;
 typedef string string_big_int<MAX_BIG_INT_LENGTH>;
+typedef string path<MAX_PATH_LENGTH>;
 
 typedef variable_name variable_name_list<>;
 
@@ -66,6 +70,15 @@ union variable switch (variable_type t) {
    double f;                    /* C 'double' maps to an OCaml 'float' */
 };
 
+struct job {
+  job_name job_name;
+  string_big_int job_serial;
+  path job_tmpdir;
+  hyper job_start_time;
+};
+
+typedef job job_list<>;
+
 /* The API of the daemon. */
 program When {
   version V1 {
@@ -74,5 +87,8 @@ program When {
     variable get_variable (variable_name) = 3;
     variable_name_list get_variable_names (void) = 4;
     status exit_daemon (void) = 5;
+    job_list get_jobs (void) = 6;
+    status cancel_job (string_big_int) = 7;
+    status start_job (job_name) = 8;
   } = 1;
 } = 0x20008081;
index ddbc302..fe53a16 100644 (file)
@@ -115,7 +115,7 @@ let get_everyjobs t =
   List.filter (function { job_cond = Every_job _ } -> true | _ -> false) t.jobs
 
 let get_job t jobname =
-  try StringMap.find jobname t.jobmap with Not_found -> assert false
+  StringMap.find jobname t.jobmap
 
 let evaluate_whenjob ?(onload = false) t job =
   match job with
index a8de795..084cd49 100644 (file)
@@ -70,7 +70,7 @@ val get_everyjobs : t -> Whenexpr.job list
 (** Return all of the when-jobs / every-jobs. *)
 
 val get_job : t -> string -> Whenexpr.job
-(** Return the named job. *)
+(** Return the named job, or raise [Not_found]. *)
 
 val evaluate_whenjob : ?onload:bool -> t -> Whenexpr.job -> bool * t
 (** This evaluates the whenjob and returns [true] iff the whenjob
index 583d030..ca26755 100644 (file)
@@ -37,6 +37,12 @@ module IntMap = struct
   let values m = fold (fun _ v vs -> v :: vs) m []
 end
 
+module BigIntMap = struct
+  include Map.Make (struct type t = big_int let compare = compare_big_int end)
+  let keys m = fold (fun k _ ks -> k :: ks) m []
+  let values m = fold (fun _ v vs -> v :: vs) m []
+end
+
 module StringSet = Set.Make (String)
 
 let (//) = Filename.concat
index 61cd4f8..00b2ff1 100644 (file)
@@ -84,6 +84,39 @@ module IntMap : sig
 end
 (** A map from int to any type. *)
 
+module BigIntMap : sig
+  type key = Big_int.big_int
+  type 'a t
+  val empty : 'a t
+  val is_empty : 'a t -> bool
+  val mem : key -> 'a t -> bool
+  val add : key -> 'a -> 'a t -> 'a t
+  (*val singleton : key -> 'a -> 'a t*)
+  val remove : key -> 'a t -> 'a t
+  (*val merge :
+    (key -> 'a option -> 'b option -> 'c option) -> 'a t -> 'b t -> 'c t*)
+  val compare : ('a -> 'a -> int) -> 'a t -> 'a t -> int
+  val equal : ('a -> 'a -> bool) -> 'a t -> 'a t -> bool
+  val iter : (key -> 'a -> unit) -> 'a t -> unit
+  val fold : (key -> 'a -> 'b -> 'b) -> 'a t -> 'b -> 'b
+  (*val for_all : (key -> 'a -> bool) -> 'a t -> bool
+  val exists : (key -> 'a -> bool) -> 'a t -> bool
+  val filter : (key -> 'a -> bool) -> 'a t -> 'a t
+  val partition : (key -> 'a -> bool) -> 'a t -> 'a t * 'a t
+  val cardinal : 'a t -> int
+  val bindings : 'a t -> (key * 'a) list
+  val min_binding : 'a t -> key * 'a
+  val max_binding : 'a t -> key * 'a
+  val choose : 'a t -> key * 'a
+  val split : key -> 'a t -> 'a t * 'a option * 'a t*)
+  val find : key -> 'a t -> 'a
+  val map : ('a -> 'b) -> 'a t -> 'b t
+  val mapi : (key -> 'a -> 'b) -> 'a t -> 'b t
+  val keys : 'a t -> key list
+  val values : 'a t -> 'a list
+end
+(** A map from big_int to any type. *)
+
 module StringSet : sig
   type elt = String.t
   type t = Set.Make(String).t
index 8472672..bd26545 100644 (file)
@@ -20,6 +20,8 @@ open Big_int
 open Unix
 open Printf
 
+open Whenutils
+
 (* Ensures that Whentools module is linked to the whenjobs tool. *)
 let _ = Whentools.set_variable
 
@@ -79,6 +81,7 @@ let rec main () =
   in
 
   let argspec = Arg.align [
+    "--cancel", Arg.Unit (set_mode `Cancel), " Cancel a job";
     "--daemon-start", Arg.Unit (set_mode `Daemon_start), " Start the daemon";
     "--daemon-status", Arg.Unit (set_mode `Daemon_status), " Display the status of the daemon";
     "--daemon-stop", Arg.Unit (set_mode `Daemon_stop), " Stop the daemon";
@@ -86,10 +89,12 @@ let rec main () =
     "-e", Arg.Unit (set_mode `Edit), " Edit and upload the script";
     "--edit", Arg.Unit (set_mode `Edit), " Edit and upload the script";
     "--get", Arg.Unit (set_mode `Get), " Display the variable";
+    "--jobs", Arg.Unit (set_mode `Jobs), " List running jobs";
     "-l", Arg.Unit (set_mode `List), " List the script";
     "--list", Arg.Unit (set_mode `List), " List the script";
     "--lib", Arg.Set_string libdir, "dir Specify directory that contains pa_when.cmo";
     "--set", Arg.Unit (set_mode `Set), " Set the variable";
+    "--start", Arg.Unit (set_mode `Start), " Start a job manually";
     "--type", Arg.Set_string typ, "bool|int|float|string|unit Set the variable type";
     "--upload", Arg.Unit (set_mode `Upload), " Upload the script";
     "--variables", Arg.Unit (set_mode `Variables), " Display all variables and values";
@@ -194,6 +199,27 @@ Options:
     unused_error args "--daemon-status";
     daemon_status ()
 
+  | Some `Jobs ->
+    unused_error args "--jobs";
+    jobs ()
+
+  | Some `Cancel ->
+    if List.length args != 1 then (
+      eprintf "whenjobs --cancel serial\n";
+      suggest_help ();
+      exit 1
+    );
+    cancel_job (List.hd args)
+
+  | Some `Start ->
+    if List.length args != 1 then (
+      eprintf "whenjobs --start jobname\n";
+      eprintf "If 'value' contains spaces, you may need to quote it.\n";
+      suggest_help ();
+      exit 1
+    );
+    start_job (List.hd args)
+
 and edit_file () =
   (* If there is no initial file, create an empty one containing the
    * tutorial.
@@ -351,6 +377,50 @@ and daemon_restart () =
 and daemon_status () =
   assert false
 
+and jobs () =
+  let client = start_client () in
+  let jobs = Whenproto_clnt.When.V1.get_jobs client () in
+  stop_client client;
+
+  let cmp { Whenproto_aux.job_name = name1; job_serial = serial1 }
+      { Whenproto_aux.job_name = name2; job_serial = serial2 } =
+    let i = compare name1 name2 in
+    if i <> 0 then i
+    else
+      compare_big_int (big_int_of_string serial1) (big_int_of_string serial2)
+  in
+  Array.sort cmp jobs;
+
+  Array.iter (
+    fun { Whenproto_aux.job_serial = serial; job_name = name;
+          job_tmpdir = tmpdir; job_start_time = time } ->
+      printf "%s %s\n\trunning in: %s\n\tstarted at: %s\n"
+        serial name tmpdir
+        (string_of_time_t ~localtime:true (Int64.to_float time))
+  ) jobs
+
+and cancel_job serial =
+  let client = start_client () in
+  (match Whenproto_clnt.When.V1.cancel_job client serial with
+  | `ok -> ()
+  | `error msg ->
+    eprintf "whenjobs: cancel-job: %s\n" msg;
+    suggest_check_server_logs ();
+    exit 1
+  );
+  stop_client client
+
+and start_job name =
+  let client = start_client () in
+  (match Whenproto_clnt.When.V1.start_job client name with
+  | `ok -> ()
+  | `error msg ->
+    eprintf "whenjobs: start-job: %s\n" msg;
+    suggest_check_server_logs ();
+    exit 1
+  );
+  stop_client client
+
 and unused_error args op =
   if args <> [] then (
     eprintf "whenjobs %s: unused parameters on the command line.\n" op;
index 1c96648..2958eba 100644 (file)
@@ -24,6 +24,12 @@ Start and stop the per-user daemon:
  whenjobs --daemon-status
  whenjobs --daemon-restart
 
+Examine running jobs:
+
+ whenjobs --jobs
+ whenjobs --cancel serial
+ whenjobs --start "name"
+
 =head1 DESCRIPTION
 
 Whenjobs is a powerful but simple replacement for cron.  It lets you
@@ -83,6 +89,14 @@ The act of setting a variable (using I<--set>) can trigger jobs to run.
 
 =over 4
 
+=item B<--cancel> serial
+
+Cancel the job with the given serial number.
+
+Use I<--jobs> to list running jobs along with their serial numbers.
+The serial number is also available in the job script (as
+C<$JOBSERIAL>) and in the log file.
+
 =item B<--daemon-start>
 
 =item B<--daemon-stop>
@@ -112,6 +126,13 @@ C<vi> is used.
 
 Print the value of a variable.
 
+=item B<--jobs>
+
+List all running jobs.
+
+Note that it is possible for the same job to be running more than once
+(for example, a periodic job that takes longer than the period to run).
+
 =item B<-l>
 
 =item B<--list>
@@ -128,6 +149,13 @@ source, eg:
 
  whenjobs --lib $builddir/lib -e
 
+=item B<--start> "job name"
+
+Start the job immediately and unconditionally.
+
+This runs the job even if its normal preconditions are not met.  This
+may cause unexpected results, so use with caution.
+
 =item B<--set> variable value
 
 =item B<--type> bool|int|float|string|unit
@@ -664,11 +692,12 @@ This structure is passed to cleanup functions.  It has the following
 fields:
 
  type result = {
-   res_job_name : string; # job name
-   res_serial : big_int;  # job serial (same as $JOBSERIAL)
-   res_code : int;        # return code from the shell script
-   res_tmpdir : string;   # temporary directory script ran in
-   res_output : string;   # filename of stdout/stderr output
+   res_job_name : string;  # job name
+   res_serial : big_int;   # job serial (same as $JOBSERIAL)
+   res_code : int;         # return code from the shell script
+   res_tmpdir : string;    # temporary directory script ran in
+   res_output : string;    # filename of stdout/stderr output
+   res_start_time : float; # when the job started
  }
 
 =back