open Whenutils
+open Big_int
open Unix
open Printf
+(* See [exit.c]. *)
+external _exit : int -> 'a = "whenjobs__exit"
+
(* All jobs that are loaded. Maps name -> [job] structure. *)
let jobs = ref StringMap.empty
(* $HOME/.whenjobs *)
let jobsdir = ref ""
+(* Jobs that are running; map of PID -> (job, other data). Note that
+ * the job may no longer exist *OR* it may have been renamed,
+ * eg. if the jobs file was reloaded.
+ *)
+let running = ref IntMap.empty
+
(* Was debugging requested on the command line? *)
let debug = ref false
+(* The server. *)
+let server = ref None
+
let esys = Unixqueue.standard_event_system ()
let rec init j d =
let addr = sprintf "%s/socket" !jobsdir in
(try unlink addr with Unix_error _ -> ());
- ignore (
+ server := Some (
Whenproto_srv.When.V1.create_server
~proc_reload_file
~proc_set_variable
~proc_get_variable
~proc_get_variable_names
+ ~proc_exit_daemon
(Rpc_server.Unix addr)
Rpc.Tcp (* not TCP, this is the same as SOCK_STREAM *)
Rpc.Socket
esys
- )
+ );
+
+ (* Handle SIGCHLD to clean up jobs. *)
+ Sys.set_signal Sys.sigchld (Sys.Signal_handle handle_sigchld);
+
+ (* Initialize the variables. XXX Eventually this will be saved
+ * and loaded from a persistent store.
+ *)
+ variables := StringMap.add "JOBSERIAL" (T_int zero_big_int) !variables
and proc_reload_file () =
if !debug then Syslog.notice "remote call: reload_file";
and proc_set_variable (name, value) =
if !debug then Syslog.notice "remote call: set_variable %s" name;
- let value = variable_of_rpc value in
- variables := StringMap.add name value !variables;
+ try
+ (* Don't permit certain names. *)
+ if name = "JOBSERIAL" then
+ failwith "JOBSERIAL variable cannot be set";
+
+ let len = String.length name in
+ if len = 0 then
+ failwith "variable name is an empty string";
+ if name.[0] <> '_' && not (isalpha name.[0]) then
+ failwith "variable name must start with alphabetic character or underscore";
+
+ let rec loop i =
+ if i >= len then ()
+ else if name.[i] <> '_' && not (isalnum name.[i]) then
+ failwith "variable name contains non-alphanumeric non-underscore character"
+ else loop (i+1)
+ in
+ loop 1;
- (* Which jobs need to be re-evaluated? *)
- let jobnames = try StringMap.find name !dependencies with Not_found -> [] in
- let jobs = reevaluate_jobs jobnames in
- run_jobs jobs
+ let value = variable_of_rpc value in
+ variables := StringMap.add name value !variables;
+
+ (* Which jobs need to be re-evaluated? *)
+ let jobnames = try StringMap.find name !dependencies with Not_found -> [] in
+ reevaluate_whenjobs jobnames;
+
+ `ok
+ with
+ Failure msg -> `error msg
and proc_get_variable name =
if !debug then Syslog.notice "remote call: get_variable %s" name;
Array.sort compare vars;
vars
+and proc_exit_daemon () =
+ if !debug then Syslog.notice "remote call: exit_daemon";
+
+ match !server with
+ | None ->
+ `error "exit_daemon: no server handle"
+ | Some s ->
+ Rpc_server.stop_server ~graceful:true s;
+ server := None;
+ `ok
+
(* Reload the jobs file. *)
and reload_file () =
let file = sprintf "%s/jobs.cmo" !jobsdir in
) StringMap.empty js in
dependencies := map in
- (* Re-evaluate all jobs. *)
- let jobs = reevaluate_jobs (StringMap.keys !jobs) in
- run_jobs jobs
+ (* Re-evaluate all when jobs. *)
+ reevaluate_whenjobs ~onload:true (StringMap.keys !jobs);
+
+ (* Schedule the next every job to run. *)
+ schedule_next_everyjob ()
-(* Re-evaluate each named job, in a loop until we reach a fixpoint.
- * Return the names of all the jobs that need to be run.
+(* Re-evaluate each named when-statement job, in a loop until we reach
+ * a fixpoint. Run those that need to be run. every-statement jobs
+ * are ignored here.
*)
-and reevaluate_jobs jobnames =
+and reevaluate_whenjobs ?(onload=false) jobnames =
let rec loop set jobnames =
let set' =
List.fold_left (
with Not_found -> assert false in
assert (jobname = job.job_name);
- let r, job' = job_evaluate job !variables in
+ let r, job' =
+ try job_evaluate job !variables onload
+ with Invalid_argument err | Failure err ->
+ Syslog.error "error evaluating job %s (at %s): %s"
+ jobname (Camlp4.PreCast.Ast.Loc.to_string job.job_loc) err;
+ false, job in
+
jobs := StringMap.add jobname job' !jobs;
if !debug then
set'
in
let set = loop StringSet.empty jobnames in
- StringSet.elements set
+ let jobnames = StringSet.elements set in
+ (* Ensure the jobs always run in predictable (name) order. *)
+ let jobnames = List.sort compare_jobnames jobnames in
+ List.iter run_job
+ (List.map (fun jobname -> StringMap.find jobname !jobs) jobnames)
-and run_jobs jobnames =
- let run_job job =
- Syslog.notice "running %s" job.job_name;
- () (* XXX *)
+(* Schedule the next every-statement job to run, if there is one. We
+ * look at the every jobs, work out the time that each must run at,
+ * pick the job(s) which must run soonest, and schedule a timer to run
+ * them. When the timer fires, it runs those jobs, then calls this
+ * function again.
+ *)
+and schedule_next_everyjob () =
+ let t = time () in
+
+ (* Get only everyjobs. *)
+ let jobs = StringMap.values !jobs in
+ let jobs = filter_map (
+ function
+ | { job_cond = Every_job period } as job -> Some (job, period)
+ | { job_cond = When_job _ } -> None
+ ) jobs in
+
+ (* Map everyjob to next time it must run. *)
+ let jobs = List.map (
+ fun (job, period) ->
+ let t' = next_periodexpr t period in
+ assert (t' > t); (* serious bug in next_periodexpr if false *)
+ job, t'
+ ) jobs in
+
+ (* Sort, soonest first. *)
+ let jobs = List.sort (fun (_,a) (_,b) -> compare a b) jobs in
+
+ if !debug then (
+ List.iter (
+ fun (job, t) ->
+ Syslog.notice "%s: next scheduled run at %s"
+ job.job_name (string_of_time_t t)
+ ) jobs
+ );
+
+ (* Pick the job(s) which run soonest. *)
+ let rec pick = function
+ | [] -> 0., []
+ | [j, t] -> t, [j]
+ | (j1, t) :: (j2, t') :: _ when t < t' -> t, [j1]
+ | (j1, t) :: (((j2, t') :: _) as rest) -> t, (j1 :: snd (pick rest))
in
+ let t, jobs = pick jobs in
+
+ if t > 0. then (
+ if jobs <> [] then (
+ (* Ensure the jobs always run in predictable (name) order. *)
+ let jobs =
+ List.sort (fun {job_name = a} {job_name = b} -> compare_jobnames a b)
+ jobs in
+
+ if !debug then
+ Syslog.notice "scheduling job(s) %s to run at %s"
+ (String.concat ", " (List.map (fun { job_name = name } -> name) jobs))
+ (string_of_time_t t);
+
+ (* Schedule them to run at time t. *)
+ let g = Unixqueue.new_group esys in
+ let t_diff = t -. Unix.time () in
+ let t_diff = if t_diff < 0. then 0. else t_diff in
+ let run_jobs () =
+ Unixqueue.clear esys g; (* Delete the timer. *)
+ List.iter run_job jobs;
+ schedule_next_everyjob ()
+ in
+ Unixqueue.weak_once esys g t_diff run_jobs;
+ )
+ )
- List.iter run_job
- (List.map (fun jobname -> StringMap.find jobname !jobs) jobnames)
+and string_of_time_t t =
+ let tm = gmtime t in
+ sprintf "%04d-%02d-%02d %02d:%02d:%02d UTC"
+ (1900+tm.tm_year) (1+tm.tm_mon) tm.tm_mday
+ tm.tm_hour tm.tm_min tm.tm_sec
+
+and run_job job =
+ let () =
+ (* Increment JOBSERIAL. *)
+ let serial =
+ match StringMap.find "JOBSERIAL" !variables with
+ | T_int serial ->
+ let serial = succ_big_int serial in
+ variables := StringMap.add "JOBSERIAL" (T_int serial) !variables;
+ serial
+ | _ -> assert false in
+
+ Syslog.notice "running %s (JOBSERIAL=%s)"
+ job.job_name (string_of_big_int serial) in
+
+ (* Create a temporary directory. The current directory of the job
+ * will be in this directory. The directory is removed when the
+ * child process exits.
+ *)
+ let dir = tmpdir () in
+
+ let pid = fork () in
+ if pid = 0 then ( (* child process running the job *)
+ chdir dir;
+
+ (* Set environment variables corresponding to each variable. *)
+ StringMap.iter
+ (fun name value -> putenv name (string_of_variable value)) !variables;
+
+ (* Set the $JOBNAME environment variable. *)
+ putenv "JOBNAME" job.job_name;
+
+ (* Create a temporary file containing the shell script fragment. *)
+ let script = dir // "script" in
+ let chan = open_out script in
+ output_string chan job.job_script.sh_script;
+ close_out chan;
+ chmod script 0o700;
+
+ let shell = try getenv "SHELL" with Not_found -> "/bin/sh" in
+
+ (* Execute the shell script. *)
+ (try execvp shell [| shell; "-c"; script |];
+ with Unix_error (err, fn, _) ->
+ Syslog.error "%s failed: %s: %s" fn script (error_message err)
+ );
+ _exit 1
+ );
+
+ (* Remember this PID, the job and the temporary directory, so we
+ * can clean up when the child exits.
+ *)
+ running := IntMap.add pid (job, dir) !running
+
+and tmpdir () =
+ let chan = open_in "/dev/urandom" in
+ let data = String.create 16 in
+ really_input chan data 0 (String.length data);
+ close_in chan;
+ let data = Digest.to_hex (Digest.string data) in
+ let dir = Filename.temp_dir_name // sprintf "whenjobs%s" data in
+ mkdir dir 0o700;
+ dir
+
+(* This is called when a job (child process) exits. *)
+and handle_sigchld _ =
+ try
+ let pid, status = waitpid [WNOHANG] 0 in
+ if pid > 0 then (
+ (* Look up the PID in the running jobs map. *)
+ let job, dir = IntMap.find pid !running in
+ running := IntMap.remove pid !running;
+ cleanup_job job dir
+ )
+ with Unix_error _ | Not_found -> ()
+
+and cleanup_job job dir =
+ (* This should be safe because the path cannot contain shell metachars. *)
+ let cmd = sprintf "rm -rf '%s'" dir in
+ ignore (Sys.command cmd)
+
+(* Intelligent comparison of job names. *)
+and compare_jobnames name1 name2 =
+ try
+ let len1 = String.length name1
+ and len2 = String.length name2 in
+ if len1 > 4 && len2 > 4 &&
+ String.sub name1 0 4 = "job$" && String.sub name2 0 4 = "job$"
+ then (
+ let i1 = int_of_string (String.sub name1 4 (len1-4)) in
+ let i2 = int_of_string (String.sub name2 4 (len2-4)) in
+ compare i1 i2
+ )
+ else raise Not_found
+ with _ ->
+ compare name1 name2
let main_loop () =
Unixqueue.run esys