open Whenutils
+open Big_int
open Unix
open Printf
+(* See [exit.c]. *)
+external _exit : int -> 'a = "whenjobs__exit"
+
(* All jobs that are loaded. Maps name -> [job] structure. *)
let jobs = ref StringMap.empty
*)
let variables : variables ref = ref StringMap.empty
-(* Last time that an every job ran. See schedule_next_everyjob below. *)
-let last_t = ref (time ())
-
(* $HOME/.whenjobs *)
let jobsdir = ref ""
+(* Jobs that are running; map of PID -> (job, other data). Note that
+ * the job may no longer exist *OR* it may have been renamed,
+ * eg. if the jobs file was reloaded.
+ *)
+let running = ref IntMap.empty
+
(* Was debugging requested on the command line? *)
let debug = ref false
+(* The server. *)
+let server = ref None
+
let esys = Unixqueue.standard_event_system ()
-let timer_group = ref None
let rec init j d =
jobsdir := j;
let addr = sprintf "%s/socket" !jobsdir in
(try unlink addr with Unix_error _ -> ());
- ignore (
+ server := Some (
Whenproto_srv.When.V1.create_server
~proc_reload_file
~proc_set_variable
~proc_get_variable
~proc_get_variable_names
+ ~proc_exit_daemon
(Rpc_server.Unix addr)
Rpc.Tcp (* not TCP, this is the same as SOCK_STREAM *)
Rpc.Socket
esys
- )
+ );
+
+ (* Handle SIGCHLD to clean up jobs. *)
+ Sys.set_signal Sys.sigchld (Sys.Signal_handle handle_sigchld);
+
+ (* Initialize the variables. XXX Eventually this will be saved
+ * and loaded from a persistent store.
+ *)
+ variables := StringMap.add "JOBSERIAL" (T_int zero_big_int) !variables
and proc_reload_file () =
if !debug then Syslog.notice "remote call: reload_file";
and proc_set_variable (name, value) =
if !debug then Syslog.notice "remote call: set_variable %s" name;
- let value = variable_of_rpc value in
- variables := StringMap.add name value !variables;
+ try
+ (* Don't permit certain names. *)
+ if name = "JOBSERIAL" then
+ failwith "JOBSERIAL variable cannot be set";
+
+ let len = String.length name in
+ if len = 0 then
+ failwith "variable name is an empty string";
+ if name.[0] <> '_' && not (isalpha name.[0]) then
+ failwith "variable name must start with alphabetic character or underscore";
+
+ let rec loop i =
+ if i >= len then ()
+ else if name.[i] <> '_' && not (isalnum name.[i]) then
+ failwith "variable name contains non-alphanumeric non-underscore character"
+ else loop (i+1)
+ in
+ loop 1;
+
+ let value = variable_of_rpc value in
+ variables := StringMap.add name value !variables;
+
+ (* Which jobs need to be re-evaluated? *)
+ let jobnames = try StringMap.find name !dependencies with Not_found -> [] in
+ reevaluate_whenjobs jobnames;
- (* Which jobs need to be re-evaluated? *)
- let jobnames = try StringMap.find name !dependencies with Not_found -> [] in
- reevaluate_whenjobs jobnames
+ `ok
+ with
+ Failure msg -> `error msg
and proc_get_variable name =
if !debug then Syslog.notice "remote call: get_variable %s" name;
Array.sort compare vars;
vars
+and proc_exit_daemon () =
+ if !debug then Syslog.notice "remote call: exit_daemon";
+
+ match !server with
+ | None ->
+ `error "exit_daemon: no server handle"
+ | Some s ->
+ Rpc_server.stop_server ~graceful:true s;
+ server := None;
+ `ok
+
(* Reload the jobs file. *)
and reload_file () =
let file = sprintf "%s/jobs.cmo" !jobsdir in
dependencies := map in
(* Re-evaluate all when jobs. *)
- reevaluate_whenjobs (StringMap.keys !jobs);
+ reevaluate_whenjobs ~onload:true (StringMap.keys !jobs);
(* Schedule the next every job to run. *)
- last_t := time ();
schedule_next_everyjob ()
(* Re-evaluate each named when-statement job, in a loop until we reach
* a fixpoint. Run those that need to be run. every-statement jobs
* are ignored here.
*)
-and reevaluate_whenjobs jobnames =
+and reevaluate_whenjobs ?(onload=false) jobnames =
let rec loop set jobnames =
let set' =
List.fold_left (
assert (jobname = job.job_name);
let r, job' =
- try job_evaluate job !variables
+ try job_evaluate job !variables onload
with Invalid_argument err | Failure err ->
Syslog.error "error evaluating job %s (at %s): %s"
jobname (Camlp4.PreCast.Ast.Loc.to_string job.job_loc) err;
let set = loop StringSet.empty jobnames in
let jobnames = StringSet.elements set in
(* Ensure the jobs always run in predictable (name) order. *)
- let jobnames = List.sort compare jobnames in
+ let jobnames = List.sort compare_jobnames jobnames in
List.iter run_job
(List.map (fun jobname -> StringMap.find jobname !jobs) jobnames)
(* Schedule the next every-statement job to run, if there is one. We
* look at the every jobs, work out the time that each must run at,
* pick the job(s) which must run soonest, and schedule a timer to run
- * them. When the timer fires, it runs those jobs, then call this
- * function again. 'last_t' is the base time used for scheduling (or
- * the time that the file was last reloaded).
+ * them. When the timer fires, it runs those jobs, then calls this
+ * function again.
*)
and schedule_next_everyjob () =
+ let t = time () in
+
(* Get only everyjobs. *)
let jobs = StringMap.values !jobs in
let jobs = filter_map (
(* Map everyjob to next time it must run. *)
let jobs = List.map (
fun (job, period) ->
- let t' = next_periodexpr !last_t period in
- assert (t' > !last_t); (* serious bug in next_periodexpr if false *)
+ let t' = next_periodexpr t period in
+ assert (t' > t); (* serious bug in next_periodexpr if false *)
job, t'
) jobs in
let t, jobs = pick jobs in
if t > 0. then (
- last_t := t;
-
if jobs <> [] then (
+ (* Ensure the jobs always run in predictable (name) order. *)
+ let jobs =
+ List.sort (fun {job_name = a} {job_name = b} -> compare_jobnames a b)
+ jobs in
+
if !debug then
Syslog.notice "scheduling job(s) %s to run at %s"
(String.concat ", " (List.map (fun { job_name = name } -> name) jobs))
(string_of_time_t t);
(* Schedule them to run at time t. *)
- let g = new_timer_group () in
- let w = Unixqueue.new_wait_id esys in
+ let g = Unixqueue.new_group esys in
let t_diff = t -. Unix.time () in
let t_diff = if t_diff < 0. then 0. else t_diff in
- Unixqueue.add_resource esys g (Unixqueue.Wait w, t_diff);
- let run_jobs _ _ _ =
+ let run_jobs () =
+ Unixqueue.clear esys g; (* Delete the timer. *)
List.iter run_job jobs;
- delete_timer ();
- schedule_next_everyjob ();
+ schedule_next_everyjob ()
in
- Unixqueue.add_handler esys g run_jobs;
+ Unixqueue.weak_once esys g t_diff run_jobs;
)
)
(1900+tm.tm_year) (1+tm.tm_mon) tm.tm_mday
tm.tm_hour tm.tm_min tm.tm_sec
-and new_timer_group () =
- delete_timer ();
- let g = Unixqueue.new_group esys in
- timer_group := Some g;
- g
-
-and delete_timer () =
- match !timer_group with
- | None -> ()
- | Some g ->
- Unixqueue.clear esys g;
- timer_group := None
-
and run_job job =
- Syslog.notice "running %s" job.job_name;
- () (* XXX *)
+ let () =
+ (* Increment JOBSERIAL. *)
+ let serial =
+ match StringMap.find "JOBSERIAL" !variables with
+ | T_int serial ->
+ let serial = succ_big_int serial in
+ variables := StringMap.add "JOBSERIAL" (T_int serial) !variables;
+ serial
+ | _ -> assert false in
+
+ Syslog.notice "running %s (JOBSERIAL=%s)"
+ job.job_name (string_of_big_int serial) in
+
+ (* Create a temporary directory. The current directory of the job
+ * will be in this directory. The directory is removed when the
+ * child process exits.
+ *)
+ let dir = tmpdir () in
+
+ let pid = fork () in
+ if pid = 0 then ( (* child process running the job *)
+ chdir dir;
+
+ (* Set environment variables corresponding to each variable. *)
+ StringMap.iter
+ (fun name value -> putenv name (string_of_variable value)) !variables;
+
+ (* Set the $JOBNAME environment variable. *)
+ putenv "JOBNAME" job.job_name;
+
+ (* Create a temporary file containing the shell script fragment. *)
+ let script = dir // "script" in
+ let chan = open_out script in
+ output_string chan job.job_script.sh_script;
+ close_out chan;
+ chmod script 0o700;
+
+ let shell = try getenv "SHELL" with Not_found -> "/bin/sh" in
+
+ (* Execute the shell script. *)
+ (try execvp shell [| shell; "-c"; script |];
+ with Unix_error (err, fn, _) ->
+ Syslog.error "%s failed: %s: %s" fn script (error_message err)
+ );
+ _exit 1
+ );
+
+ (* Remember this PID, the job and the temporary directory, so we
+ * can clean up when the child exits.
+ *)
+ running := IntMap.add pid (job, dir) !running
+
+and tmpdir () =
+ let chan = open_in "/dev/urandom" in
+ let data = String.create 16 in
+ really_input chan data 0 (String.length data);
+ close_in chan;
+ let data = Digest.to_hex (Digest.string data) in
+ let dir = Filename.temp_dir_name // sprintf "whenjobs%s" data in
+ mkdir dir 0o700;
+ dir
+
+(* This is called when a job (child process) exits. *)
+and handle_sigchld _ =
+ try
+ let pid, status = waitpid [WNOHANG] 0 in
+ if pid > 0 then (
+ (* Look up the PID in the running jobs map. *)
+ let job, dir = IntMap.find pid !running in
+ running := IntMap.remove pid !running;
+ cleanup_job job dir
+ )
+ with Unix_error _ | Not_found -> ()
+
+and cleanup_job job dir =
+ (* This should be safe because the path cannot contain shell metachars. *)
+ let cmd = sprintf "rm -rf '%s'" dir in
+ ignore (Sys.command cmd)
+
+(* Intelligent comparison of job names. *)
+and compare_jobnames name1 name2 =
+ try
+ let len1 = String.length name1
+ and len2 = String.length name2 in
+ if len1 > 4 && len2 > 4 &&
+ String.sub name1 0 4 = "job$" && String.sub name2 0 4 = "job$"
+ then (
+ let i1 = int_of_string (String.sub name1 4 (len1-4)) in
+ let i2 = int_of_string (String.sub name2 4 (len2-4)) in
+ compare i1 i2
+ )
+ else raise Not_found
+ with _ ->
+ compare name1 name2
let main_loop () =
Unixqueue.run esys