X-Git-Url: http://git.annexia.org/?p=whenjobs.git;a=blobdiff_plain;f=daemon%2Fdaemon.ml;h=6f5edfbaf34367d40f3d4d57e7c1184e86f2fa3a;hp=fba3ae5114a76856dac3043ac06e302aa246604f;hb=902e1e977991e0e2121c0d9a8417f1d157b87d87;hpb=61cad7bbaf63389b520b695eefdd735bc11a8aa6 diff --git a/daemon/daemon.ml b/daemon/daemon.ml index fba3ae5..6f5edfb 100644 --- a/daemon/daemon.ml +++ b/daemon/daemon.ml @@ -18,9 +18,13 @@ open Whenutils +open Big_int open Unix open Printf +(* See [exit.c]. *) +external _exit : int -> 'a = "whenjobs__exit" + (* All jobs that are loaded. Maps name -> [job] structure. *) let jobs = ref StringMap.empty @@ -39,9 +43,18 @@ let variables : variables ref = ref StringMap.empty (* $HOME/.whenjobs *) let jobsdir = ref "" +(* Jobs that are running; map of PID -> (job, other data). Note that + * the job may no longer exist *OR* it may have been renamed, + * eg. if the jobs file was reloaded. + *) +let running = ref IntMap.empty + (* Was debugging requested on the command line? *) let debug = ref false +(* The server. *) +let server = ref None + let esys = Unixqueue.standard_event_system () let rec init j d = @@ -54,17 +67,26 @@ let rec init j d = let addr = sprintf "%s/socket" !jobsdir in (try unlink addr with Unix_error _ -> ()); - ignore ( + server := Some ( Whenproto_srv.When.V1.create_server ~proc_reload_file ~proc_set_variable ~proc_get_variable ~proc_get_variable_names + ~proc_exit_daemon (Rpc_server.Unix addr) Rpc.Tcp (* not TCP, this is the same as SOCK_STREAM *) Rpc.Socket esys - ) + ); + + (* Handle SIGCHLD to clean up jobs. *) + Sys.set_signal Sys.sigchld (Sys.Signal_handle handle_sigchld); + + (* Initialize the variables. XXX Eventually this will be saved + * and loaded from a persistent store. + *) + variables := StringMap.add "JOBSERIAL" (T_int zero_big_int) !variables and proc_reload_file () = if !debug then Syslog.notice "remote call: reload_file"; @@ -75,13 +97,35 @@ and proc_reload_file () = and proc_set_variable (name, value) = if !debug then Syslog.notice "remote call: set_variable %s" name; - let value = variable_of_rpc value in - variables := StringMap.add name value !variables; + try + (* Don't permit certain names. *) + if name = "JOBSERIAL" then + failwith "JOBSERIAL variable cannot be set"; + + let len = String.length name in + if len = 0 then + failwith "variable name is an empty string"; + if name.[0] <> '_' && not (isalpha name.[0]) then + failwith "variable name must start with alphabetic character or underscore"; + + let rec loop i = + if i >= len then () + else if name.[i] <> '_' && not (isalnum name.[i]) then + failwith "variable name contains non-alphanumeric non-underscore character" + else loop (i+1) + in + loop 1; - (* Which jobs need to be re-evaluated? *) - let jobnames = try StringMap.find name !dependencies with Not_found -> [] in - let jobs = reevaluate_jobs jobnames in - run_jobs jobs + let value = variable_of_rpc value in + variables := StringMap.add name value !variables; + + (* Which jobs need to be re-evaluated? *) + let jobnames = try StringMap.find name !dependencies with Not_found -> [] in + reevaluate_whenjobs jobnames; + + `ok + with + Failure msg -> `error msg and proc_get_variable name = if !debug then Syslog.notice "remote call: get_variable %s" name; @@ -101,6 +145,17 @@ and proc_get_variable_names () = Array.sort compare vars; vars +and proc_exit_daemon () = + if !debug then Syslog.notice "remote call: exit_daemon"; + + match !server with + | None -> + `error "exit_daemon: no server handle" + | Some s -> + Rpc_server.stop_server ~graceful:true s; + server := None; + `ok + (* Reload the jobs file. *) and reload_file () = let file = sprintf "%s/jobs.cmo" !jobsdir in @@ -142,14 +197,17 @@ and reload_file () = ) StringMap.empty js in dependencies := map in - (* Re-evaluate all jobs. *) - let jobs = reevaluate_jobs (StringMap.keys !jobs) in - run_jobs jobs + (* Re-evaluate all when jobs. *) + reevaluate_whenjobs ~onload:true (StringMap.keys !jobs); + + (* Schedule the next every job to run. *) + schedule_next_everyjob () -(* Re-evaluate each named job, in a loop until we reach a fixpoint. - * Return the names of all the jobs that need to be run. +(* Re-evaluate each named when-statement job, in a loop until we reach + * a fixpoint. Run those that need to be run. every-statement jobs + * are ignored here. *) -and reevaluate_jobs jobnames = +and reevaluate_whenjobs ?(onload=false) jobnames = let rec loop set jobnames = let set' = List.fold_left ( @@ -159,7 +217,13 @@ and reevaluate_jobs jobnames = with Not_found -> assert false in assert (jobname = job.job_name); - let r, job' = job_evaluate job !variables in + let r, job' = + try job_evaluate job !variables onload + with Invalid_argument err | Failure err -> + Syslog.error "error evaluating job %s (at %s): %s" + jobname (Camlp4.PreCast.Ast.Loc.to_string job.job_loc) err; + false, job in + jobs := StringMap.add jobname job' !jobs; if !debug then @@ -173,16 +237,183 @@ and reevaluate_jobs jobnames = set' in let set = loop StringSet.empty jobnames in - StringSet.elements set + let jobnames = StringSet.elements set in + (* Ensure the jobs always run in predictable (name) order. *) + let jobnames = List.sort compare_jobnames jobnames in + List.iter run_job + (List.map (fun jobname -> StringMap.find jobname !jobs) jobnames) -and run_jobs jobnames = - let run_job job = - Syslog.notice "running %s" job.job_name; - () (* XXX *) +(* Schedule the next every-statement job to run, if there is one. We + * look at the every jobs, work out the time that each must run at, + * pick the job(s) which must run soonest, and schedule a timer to run + * them. When the timer fires, it runs those jobs, then calls this + * function again. + *) +and schedule_next_everyjob () = + let t = time () in + + (* Get only everyjobs. *) + let jobs = StringMap.values !jobs in + let jobs = filter_map ( + function + | { job_cond = Every_job period } as job -> Some (job, period) + | { job_cond = When_job _ } -> None + ) jobs in + + (* Map everyjob to next time it must run. *) + let jobs = List.map ( + fun (job, period) -> + let t' = next_periodexpr t period in + assert (t' > t); (* serious bug in next_periodexpr if false *) + job, t' + ) jobs in + + (* Sort, soonest first. *) + let jobs = List.sort (fun (_,a) (_,b) -> compare a b) jobs in + + if !debug then ( + List.iter ( + fun (job, t) -> + Syslog.notice "%s: next scheduled run at %s" + job.job_name (string_of_time_t t) + ) jobs + ); + + (* Pick the job(s) which run soonest. *) + let rec pick = function + | [] -> 0., [] + | [j, t] -> t, [j] + | (j1, t) :: (j2, t') :: _ when t < t' -> t, [j1] + | (j1, t) :: (((j2, t') :: _) as rest) -> t, (j1 :: snd (pick rest)) in + let t, jobs = pick jobs in + + if t > 0. then ( + if jobs <> [] then ( + (* Ensure the jobs always run in predictable (name) order. *) + let jobs = + List.sort (fun {job_name = a} {job_name = b} -> compare_jobnames a b) + jobs in + + if !debug then + Syslog.notice "scheduling job(s) %s to run at %s" + (String.concat ", " (List.map (fun { job_name = name } -> name) jobs)) + (string_of_time_t t); + + (* Schedule them to run at time t. *) + let g = Unixqueue.new_group esys in + let t_diff = t -. Unix.time () in + let t_diff = if t_diff < 0. then 0. else t_diff in + let run_jobs () = + Unixqueue.clear esys g; (* Delete the timer. *) + List.iter run_job jobs; + schedule_next_everyjob () + in + Unixqueue.weak_once esys g t_diff run_jobs; + ) + ) - List.iter run_job - (List.map (fun jobname -> StringMap.find jobname !jobs) jobnames) +and string_of_time_t t = + let tm = gmtime t in + sprintf "%04d-%02d-%02d %02d:%02d:%02d UTC" + (1900+tm.tm_year) (1+tm.tm_mon) tm.tm_mday + tm.tm_hour tm.tm_min tm.tm_sec + +and run_job job = + let () = + (* Increment JOBSERIAL. *) + let serial = + match StringMap.find "JOBSERIAL" !variables with + | T_int serial -> + let serial = succ_big_int serial in + variables := StringMap.add "JOBSERIAL" (T_int serial) !variables; + serial + | _ -> assert false in + + Syslog.notice "running %s (JOBSERIAL=%s)" + job.job_name (string_of_big_int serial) in + + (* Create a temporary directory. The current directory of the job + * will be in this directory. The directory is removed when the + * child process exits. + *) + let dir = tmpdir () in + + let pid = fork () in + if pid = 0 then ( (* child process running the job *) + chdir dir; + + (* Set environment variables corresponding to each variable. *) + StringMap.iter + (fun name value -> putenv name (string_of_variable value)) !variables; + + (* Set the $JOBNAME environment variable. *) + putenv "JOBNAME" job.job_name; + + (* Create a temporary file containing the shell script fragment. *) + let script = dir // "script" in + let chan = open_out script in + output_string chan job.job_script.sh_script; + close_out chan; + chmod script 0o700; + + let shell = try getenv "SHELL" with Not_found -> "/bin/sh" in + + (* Execute the shell script. *) + (try execvp shell [| shell; "-c"; script |]; + with Unix_error (err, fn, _) -> + Syslog.error "%s failed: %s: %s" fn script (error_message err) + ); + _exit 1 + ); + + (* Remember this PID, the job and the temporary directory, so we + * can clean up when the child exits. + *) + running := IntMap.add pid (job, dir) !running + +and tmpdir () = + let chan = open_in "/dev/urandom" in + let data = String.create 16 in + really_input chan data 0 (String.length data); + close_in chan; + let data = Digest.to_hex (Digest.string data) in + let dir = Filename.temp_dir_name // sprintf "whenjobs%s" data in + mkdir dir 0o700; + dir + +(* This is called when a job (child process) exits. *) +and handle_sigchld _ = + try + let pid, status = waitpid [WNOHANG] 0 in + if pid > 0 then ( + (* Look up the PID in the running jobs map. *) + let job, dir = IntMap.find pid !running in + running := IntMap.remove pid !running; + cleanup_job job dir + ) + with Unix_error _ | Not_found -> () + +and cleanup_job job dir = + (* This should be safe because the path cannot contain shell metachars. *) + let cmd = sprintf "rm -rf '%s'" dir in + ignore (Sys.command cmd) + +(* Intelligent comparison of job names. *) +and compare_jobnames name1 name2 = + try + let len1 = String.length name1 + and len2 = String.length name2 in + if len1 > 4 && len2 > 4 && + String.sub name1 0 4 = "job$" && String.sub name2 0 4 = "job$" + then ( + let i1 = int_of_string (String.sub name1 4 (len1-4)) in + let i2 = int_of_string (String.sub name2 4 (len2-4)) in + compare i1 i2 + ) + else raise Not_found + with _ -> + compare name1 name2 let main_loop () = Unixqueue.run esys