From: Richard W.M. Jones Date: Tue, 21 Feb 2012 13:42:59 +0000 (+0000) Subject: Implement running of jobs. X-Git-Tag: 0.0.1~16 X-Git-Url: http://git.annexia.org/?a=commitdiff_plain;h=c6ac020d503360f4944fefcd91364c1f5b037c54;p=whenjobs.git Implement running of jobs. --- diff --git a/daemon/Makefile.am b/daemon/Makefile.am index 6068594..e388676 100644 --- a/daemon/Makefile.am +++ b/daemon/Makefile.am @@ -41,7 +41,7 @@ OBJECTS = \ # Daemon. noinst_LIBRARIES = libdaemon.a -libdaemon_a_SOURCES = syslog_c.c +libdaemon_a_SOURCES = exit.c syslog_c.c libdaemon_a_CFLAGS = -I$(shell $(OCAMLC) -where) whenproto_srv.ml whenproto_srv.mli: whenproto.x @@ -53,7 +53,7 @@ whenproto.x: ../lib/whenproto.x whenjobsd: ../lib/whenlib.cma $(OBJECTS) libdaemon.a $(OCAMLFIND) c -custom $(OCAMLCFLAGS) -ccopt -L../lib \ -linkpkg whenlib.cma \ - libdaemon_a-syslog_c.o \ + libdaemon_a-exit.o libdaemon_a-syslog_c.o \ $(OBJECTS) -o $@ # Rules for all OCaml files. diff --git a/daemon/daemon.ml b/daemon/daemon.ml index 347aff7..fd986fb 100644 --- a/daemon/daemon.ml +++ b/daemon/daemon.ml @@ -21,6 +21,9 @@ open Whenutils open Unix open Printf +(* See [exit.c]. *) +external _exit : int -> 'a = "whenjobs__exit" + (* All jobs that are loaded. Maps name -> [job] structure. *) let jobs = ref StringMap.empty @@ -36,12 +39,15 @@ let dependencies = ref StringMap.empty *) let variables : variables ref = ref StringMap.empty -(* Last time that an every job ran. See schedule_next_everyjob below. *) -let last_t = ref (time ()) - (* $HOME/.whenjobs *) let jobsdir = ref "" +(* Jobs that are running; map of PID -> (job, other data). Note that + * the job may no longer exist *OR* it may have been renamed, + * eg. if the jobs file was reloaded. + *) +let running = ref IntMap.empty + (* Was debugging requested on the command line? *) let debug = ref false @@ -49,7 +55,6 @@ let debug = ref false let server = ref None let esys = Unixqueue.standard_event_system () -let timer_group = ref None let rec init j d = jobsdir := j; @@ -72,7 +77,10 @@ let rec init j d = Rpc.Tcp (* not TCP, this is the same as SOCK_STREAM *) Rpc.Socket esys - ) + ); + + (* Handle SIGCHLD to clean up jobs. *) + Sys.set_signal Sys.sigchld (Sys.Signal_handle handle_sigchld) and proc_reload_file () = if !debug then Syslog.notice "remote call: reload_file"; @@ -164,7 +172,6 @@ and reload_file () = reevaluate_whenjobs (StringMap.keys !jobs); (* Schedule the next every job to run. *) - last_t := time (); schedule_next_everyjob () (* Re-evaluate each named when-statement job, in a loop until we reach @@ -210,11 +217,12 @@ and reevaluate_whenjobs jobnames = (* Schedule the next every-statement job to run, if there is one. We * look at the every jobs, work out the time that each must run at, * pick the job(s) which must run soonest, and schedule a timer to run - * them. When the timer fires, it runs those jobs, then call this - * function again. 'last_t' is the base time used for scheduling (or - * the time that the file was last reloaded). + * them. When the timer fires, it runs those jobs, then calls this + * function again. *) and schedule_next_everyjob () = + let t = time () in + (* Get only everyjobs. *) let jobs = StringMap.values !jobs in let jobs = filter_map ( @@ -226,8 +234,8 @@ and schedule_next_everyjob () = (* Map everyjob to next time it must run. *) let jobs = List.map ( fun (job, period) -> - let t' = next_periodexpr !last_t period in - assert (t' > !last_t); (* serious bug in next_periodexpr if false *) + let t' = next_periodexpr t period in + assert (t' > t); (* serious bug in next_periodexpr if false *) job, t' ) jobs in @@ -252,26 +260,26 @@ and schedule_next_everyjob () = let t, jobs = pick jobs in if t > 0. then ( - last_t := t; - if jobs <> [] then ( + (* Ensure the jobs always run in predictable (name) order. *) + let jobs = + List.sort (fun { job_name = a } { job_name = b } -> compare a b) jobs in + if !debug then Syslog.notice "scheduling job(s) %s to run at %s" (String.concat ", " (List.map (fun { job_name = name } -> name) jobs)) (string_of_time_t t); (* Schedule them to run at time t. *) - let g = new_timer_group () in - let w = Unixqueue.new_wait_id esys in + let g = Unixqueue.new_group esys in let t_diff = t -. Unix.time () in let t_diff = if t_diff < 0. then 0. else t_diff in - Unixqueue.add_weak_resource esys g (Unixqueue.Wait w, t_diff); - let run_jobs _ _ _ = + let run_jobs () = + Unixqueue.clear esys g; (* Delete the timer. *) List.iter run_job jobs; - delete_timer (); - schedule_next_everyjob (); + schedule_next_everyjob () in - Unixqueue.add_handler esys g run_jobs; + Unixqueue.weak_once esys g t_diff run_jobs; ) ) @@ -281,22 +289,72 @@ and string_of_time_t t = (1900+tm.tm_year) (1+tm.tm_mon) tm.tm_mday tm.tm_hour tm.tm_min tm.tm_sec -and new_timer_group () = - delete_timer (); - let g = Unixqueue.new_group esys in - timer_group := Some g; - g - -and delete_timer () = - match !timer_group with - | None -> () - | Some g -> - Unixqueue.clear esys g; - timer_group := None - and run_job job = Syslog.notice "running %s" job.job_name; - () (* XXX *) + + (* Create a temporary directory. The current directory of the job + * will be in this directory. The directory is removed when the + * child process exits. + *) + let dir = tmpdir () in + + let pid = fork () in + if pid = 0 then ( (* child process running the job *) + chdir dir; + + (* Set environment variables corresponding to each variable. *) + StringMap.iter + (fun name value -> putenv name (string_of_variable value)) !variables; + + (* Set the $JOBNAME environment variable. *) + putenv "JOBNAME" job.job_name; + + (* Create a temporary file containing the shell script fragment. *) + let script = dir // "script" in + let chan = open_out script in + output_string chan job.job_script.sh_script; + close_out chan; + chmod script 0o700; + + (* Execute the shell script. *) + (try execvp "bash" [| "bash"; "-c"; script |]; + with Unix_error (err, fn, _) -> + Syslog.error "%s failed: %s: %s" fn script (error_message err) + ); + _exit 1 + ); + + (* Remember this PID, the job and the temporary directory, so we + * can clean up when the child exits. + *) + running := IntMap.add pid (job, dir) !running + +and tmpdir () = + let chan = open_in "/dev/urandom" in + let data = String.create 16 in + really_input chan data 0 (String.length data); + close_in chan; + let data = Digest.to_hex (Digest.string data) in + let dir = Filename.temp_dir_name // sprintf "whenjobs%s" data in + mkdir dir 0o700; + dir + +(* This is called when a job (child process) exits. *) +and handle_sigchld _ = + try + let pid, status = waitpid [WNOHANG] 0 in + if pid > 0 then ( + (* Look up the PID in the running jobs map. *) + let job, dir = IntMap.find pid !running in + running := IntMap.remove pid !running; + cleanup_job job dir + ) + with Unix_error _ | Not_found -> () + +and cleanup_job job dir = + (* This should be safe because the path cannot contain shell metachars. *) + let cmd = sprintf "rm -rf '%s'" dir in + ignore (Sys.command cmd) let main_loop () = Unixqueue.run esys diff --git a/daemon/exit.c b/daemon/exit.c new file mode 100644 index 0000000..904e0d7 --- /dev/null +++ b/daemon/exit.c @@ -0,0 +1,41 @@ +/* whenjobs + * (C) Copyright 2012 Red Hat Inc. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#include + +#include +#include +#include + +#include +#include +#include +#include +#include + +value whenjobs__exit (value) Noreturn; + +/* _exit : int -> 'a (does not return) */ +value +whenjobs__exit (value statusv) +{ + CAMLparam1 (statusv); + int status = Int_val (statusv); + + _exit (status); +}