Version 0.5
[whenjobs.git] / daemon / daemon.ml
1 (* whenjobs
2  * Copyright (C) 2012 Red Hat Inc.
3  *
4  * This program is free software; you can redistribute it and/or modify
5  * it under the terms of the GNU General Public License as published by
6  * the Free Software Foundation; either version 2 of the License, or
7  * (at your option) any later version.
8  *
9  * This program is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  * GNU General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License along
15  * with this program; if not, write to the Free Software Foundation, Inc.,
16  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17  *)
18
19 open Whenutils
20 open Whenexpr
21
22 open Big_int
23 open Unix
24 open Printf
25
26 (* See [exit.c]. *)
27 external _exit : int -> 'a = "whenjobs__exit"
28
29 (* $HOME/.whenjobs *)
30 let jobsdir = ref ""
31
32 (* The state. *)
33 let state = ref Whenstate.empty
34
35 (* Jobs that are running: a map of PID -> (job, tmpdir, serial, start_time).
36  * Note that the job may no longer exist *OR* it may have been renamed,
37  * eg. if the jobs file was reloaded.
38  *)
39 let runningmap = ref IntMap.empty
40
41 (* Serial numbers of running jobs.  Map of serial -> PID (in runningmap). *)
42 let serialmap = ref BigIntMap.empty
43
44 (* Was debugging requested on the command line? *)
45 let debug = ref false
46
47 (* The server. *)
48 let server = ref None
49
50 let esys = Unixqueue.standard_event_system ()
51
52 (* The timer.  It's convenient to have this as a global variable
53  * because (a) there should only be one timer (which fires when the
54  * soonest every-job becomes ready), and (b) it's complicated to track
55  * that timer and avoid it getting double-scheduled (eg.  when we
56  * reload the jobs file) without having a global variable.
57  *)
58 let timer_group = ref None
59
60 let rec init j d =
61   jobsdir := j;
62   debug := d;
63
64   Whenlock.create_lock !jobsdir;
65
66   (* Remove old socket if it exists. *)
67   let addr = sprintf "%s/socket" !jobsdir in
68   (try unlink addr with Unix_error _ -> ());
69
70   (* Create the Unix domain socket server. *)
71   server := Some (
72     Whenproto_srv.When.V1.create_server
73       ~proc_reload_file
74       ~proc_set_variable
75       ~proc_get_variable
76       ~proc_get_variable_names
77       ~proc_exit_daemon
78       ~proc_get_jobs
79       ~proc_cancel_job
80       ~proc_start_job
81       ~proc_get_job
82       (Rpc_server.Unix addr)
83       Rpc.Tcp (* not TCP, this is the same as SOCK_STREAM *)
84       Rpc.Socket
85       esys
86   );
87
88   (* Handle SIGCHLD to clean up jobs. *)
89   Sys.set_signal Sys.sigchld (Sys.Signal_handle handle_sigchld);
90
91   (* Initialize the variables. *)
92   state := Whenstate.set_variable !state "JOBSERIAL" (T_int zero_big_int)
93
94 and proc_reload_file () =
95   if !debug then Syslog.notice "remote call: reload_file";
96
97   try reload_file (); `ok
98   with Failure err -> `error err
99
100 and proc_set_variable (name, value) =
101   if !debug then Syslog.notice "remote call: set_variable %s" name;
102
103   try
104     check_valid_variable_name name;
105
106     let value = variable_of_rpc value in
107     state := Whenstate.set_variable !state name value;
108
109     (* Which jobs need to be re-evaluated? *)
110     let jobs = Whenstate.get_dependencies !state name in
111     reevaluate_whenjobs jobs;
112
113     `ok
114   with
115     Failure msg -> `error msg
116
117 and proc_get_variable name =
118   if !debug then Syslog.notice "remote call: get_variable %s" name;
119
120   rpc_of_variable (Whenstate.get_variable !state name)
121
122 and proc_get_variable_names () =
123   if !debug then Syslog.notice "remote call: get_variable_names";
124
125   let vars = Whenstate.get_variable_names !state in
126
127   (* Return variable names as a sorted array. *)
128   let vars = Array.of_list vars in
129   Array.sort compare vars;
130   vars
131
132 and proc_exit_daemon () =
133   if !debug then Syslog.notice "remote call: exit_daemon";
134
135   match !server with
136   | None ->
137     `error "exit_daemon: no server handle"
138   | Some s ->
139     Rpc_server.stop_server ~graceful:true s;
140     server := None;
141     `ok
142
143 and proc_get_jobs () =
144   let running = Array.of_list (IntMap.values !runningmap) in
145   Array.map (
146     fun (job, dir, serial, start_time) ->
147       { Whenproto_aux.job_name = job.job_name;
148         job_serial = string_of_big_int serial;
149         job_tmpdir = dir; job_start_time = Int64.of_float start_time }
150   ) running
151
152 and proc_cancel_job serial =
153   try
154     let serial = big_int_of_string serial in
155     let pid = BigIntMap.find serial !serialmap in
156     kill pid 15;
157     `ok
158   with
159   | Not_found -> `error "job not found"
160   | exn -> `error (Printexc.to_string exn)
161
162 and proc_start_job jobname =
163   try
164     let job = Whenstate.get_job !state jobname in
165     run_job job;
166     `ok
167   with
168   | Not_found -> `error "job not found"
169   | exn -> `error (Printexc.to_string exn)
170
171 and proc_get_job serial =
172   try
173     let serial = big_int_of_string serial in
174     let pid = BigIntMap.find serial !serialmap in
175     let job, dir, serial, start_time = IntMap.find pid !runningmap in
176     { Whenproto_aux.job_name = job.job_name;
177       job_serial = string_of_big_int serial;
178       job_tmpdir = dir; job_start_time = Int64.of_float start_time }
179   with
180   | Not_found -> failwith "job not found"
181   | exn -> failwith (Printexc.to_string exn)
182
183 (* Reload the jobs file. *)
184 and reload_file () =
185   let file = sprintf "%s/jobs.cmo" !jobsdir in
186
187   (* As we are reloading the file, we want to create a new state
188    * that has no jobs, but has all the variables from the previous
189    * state.
190    *)
191   let s = Whenstate.copy_variables !state Whenstate.empty in
192   Whenfile.init s;
193
194   let s =
195     try
196       Dynlink.loadfile file;
197       let s = Whenfile.get_state () in
198       Syslog.notice "loaded %d job(s) from %s" (Whenstate.nr_jobs s) file;
199       s
200     with
201     | Dynlink.Error err ->
202       let err = Dynlink.error_message err in
203       Syslog.error "error loading jobs: %s" err;
204       failwith err
205     | exn ->
206       failwith (Printexc.to_string exn) in
207
208   let s = Whenstate.copy_prev_state !state s in
209   state := s;
210
211   (* Re-evaluate all when jobs. *)
212   reevaluate_whenjobs ~onload:true (Whenstate.get_whenjobs !state);
213
214   (* Schedule the next every job to run. *)
215   schedule_next_everyjob ()
216
217 (* Re-evaluate each when-statement job, in a loop until we reach
218  * a fixpoint.  Run those that need to be run.
219  *)
220 and reevaluate_whenjobs ?onload jobs =
221   let rec loop set jobs =
222     let set' =
223       List.fold_left (
224         fun set job ->
225           let r, state' =
226             try Whenstate.evaluate_whenjob ?onload !state job
227             with Invalid_argument err | Failure err ->
228               Syslog.error "error evaluating job %s (at %s): %s"
229                 job.job_name (Camlp4.PreCast.Ast.Loc.to_string job.job_loc) err;
230               false, !state in
231
232           state := state';
233
234           if !debug then
235             Syslog.notice "evaluate %s -> %b\n" job.job_name r;
236
237           if r then StringSet.add job.job_name set else set
238       ) set jobs in
239     if StringSet.compare set set' <> 0 then
240       loop set' jobs
241     else
242       set'
243   in
244   let set = loop StringSet.empty jobs in
245   let jobnames = StringSet.elements set in
246
247   (* Ensure the jobs always run in predictable (name) order. *)
248   let jobnames = List.sort compare_jobnames jobnames in
249
250   (* Run the jobs. *)
251   List.iter run_job (List.map (Whenstate.get_job !state) jobnames)
252
253 (* Schedule the next every-statement job to run, if there is one.  We
254  * look at the every jobs, work out the time that each must run at,
255  * pick the job(s) which must run soonest, and schedule a timer to run
256  * them.  When the timer fires, it runs those jobs, then calls this
257  * function again.
258  *)
259 and schedule_next_everyjob () =
260   let t = time () in
261
262   (* Get only everyjobs. *)
263   let jobs = Whenstate.get_everyjobs !state in
264   let jobs = List.map (
265     function
266     | { job_cond = Every_job period } as job -> (job, period)
267     | { job_cond = When_job _ } -> assert false
268   ) jobs in
269
270   (* Map everyjob to next time it must run. *)
271   let jobs = List.map (
272     fun (job, period) ->
273       let t' = next_periodexpr t period in
274       assert (t' > t); (* serious bug in next_periodexpr if false *)
275       job, t'
276   ) jobs in
277
278   (* Sort, soonest first. *)
279   let jobs = List.sort (fun (_,a) (_,b) -> compare a b) jobs in
280
281   if !debug then (
282     List.iter (
283       fun (job, t) ->
284         Syslog.notice "%s: next scheduled run at %s"
285           job.job_name (string_of_time_t t)
286     ) jobs
287   );
288
289   (* Pick the job(s) which run soonest. *)
290   let rec pick = function
291     | [] -> 0., []
292     | [j, t] -> t, [j]
293     | (j1, t) :: (j2, t') :: _ when t < t' -> t, [j1]
294     | (j1, t) :: (((j2, t') :: _) as rest) -> t, (j1 :: snd (pick rest))
295   in
296   let t, jobs = pick jobs in
297
298   if t > 0. then (
299     if jobs <> [] then (
300       (* Ensure the jobs always run in predictable (name) order. *)
301       let jobs =
302         List.sort (fun {job_name = a} {job_name = b} -> compare_jobnames a b)
303           jobs in
304
305       if !debug then
306         Syslog.notice "scheduling job(s) %s to run at %s"
307           (String.concat ", " (List.map (fun { job_name = name } -> name) jobs))
308           (string_of_time_t t);
309
310       (* Schedule them to run at time t. *)
311       let g = new_timer_group () in
312       let t_diff = t -. Unix.time () in
313       let t_diff = if t_diff < 0. then 0. else t_diff in
314       let run_jobs () =
315         delete_timer_group ();          (* Delete the timer. *)
316         List.iter run_job jobs;
317         schedule_next_everyjob ()
318       in
319       Unixqueue.weak_once esys g t_diff run_jobs;
320     )
321   )
322
323 and new_timer_group () =
324   delete_timer_group ();
325   let g = Unixqueue.new_group esys in
326   timer_group := Some g;
327   g
328
329 and delete_timer_group () =
330   match !timer_group with
331   | None -> ()
332   | Some g ->
333     Unixqueue.clear esys g;
334     timer_group := None
335
336 and run_job job =
337   (* Increment JOBSERIAL. *)
338   let serial =
339     match Whenstate.get_variable !state "JOBSERIAL" with
340     | T_int serial ->
341       let serial = succ_big_int serial in
342       state := Whenstate.set_variable !state "JOBSERIAL" (T_int serial);
343       serial
344     | _ -> assert false in
345
346   (* Call the pre-condition script.  Note this may decide not to run
347    * the job by returning false.
348    *)
349   let pre_condition () =
350     match job.job_pre with
351     | None -> true
352     | Some pre ->
353       let rs = ref [] in
354       IntMap.iter (
355         fun pid (job, _, serial, start_time) ->
356           let r = { pirun_job_name = job.job_name;
357                     pirun_serial = serial;
358                     pirun_start_time = start_time;
359                     pirun_pid = pid } in
360           rs := r :: !rs
361       ) !runningmap;
362       let preinfo = {
363         pi_job_name = job.job_name;
364         pi_serial = serial;
365         pi_variables = Whenstate.get_variables !state;
366         pi_running = !rs;
367       } in
368       pre preinfo
369   in
370   if pre_condition () then (
371     Syslog.notice "running %s (JOBSERIAL=%s)"
372       job.job_name (string_of_big_int serial);
373
374     (* Create a temporary directory.  The current directory of the job
375      * will be in this directory.  The directory is removed when the
376      * child process exits.
377      *)
378     let dir = tmpdir () in
379
380     let pid = fork () in
381     if pid = 0 then ( (* child process running the job *)
382       chdir dir;
383
384       (* Set environment variables corresponding to each variable. *)
385       List.iter
386         (fun (name, value) -> putenv name (string_of_variable value))
387         (Whenstate.get_variables !state);
388
389       (* Set the $JOBNAME environment variable. *)
390       putenv "JOBNAME" job.job_name;
391
392       (* Create a temporary file containing the shell script fragment. *)
393       let script = dir // "script.sh" in
394       let chan = open_out script in
395       fprintf chan "set -e\n"; (* So that jobs exit on error. *)
396       output_string chan job.job_script.sh_script;
397       close_out chan;
398       chmod script 0o700;
399
400       let shell = try getenv "SHELL" with Not_found -> "/bin/sh" in
401
402       (* Set output to file. *)
403       let output = dir // "output.txt" in
404       let fd = openfile output [O_WRONLY; O_CREAT; O_TRUNC; O_NOCTTY] 0o600 in
405       dup2 fd stdout;
406       dup2 fd stderr;
407       close fd;
408
409       (* Execute the shell script. *)
410       (try execvp shell [| shell; "-c"; script |];
411        with Unix_error (err, fn, _) ->
412          Syslog.error "%s failed: %s: %s" fn script (error_message err)
413       );
414       _exit 1
415     );
416
417     (* Remember this PID, the job and the temporary directory, so we
418      * can clean up when the child exits.
419      *)
420     runningmap := IntMap.add pid (job, dir, serial, time ()) !runningmap;
421     serialmap := BigIntMap.add serial pid !serialmap
422   )
423   else (
424     Syslog.notice "not running %s (JOBSERIAL=%s) because pre() condition returned false"
425       job.job_name (string_of_big_int serial);
426   )
427
428 and tmpdir () =
429   let chan = open_in "/dev/urandom" in
430   let data = String.create 16 in
431   really_input chan data 0 (String.length data);
432   close_in chan;
433   let data = Digest.to_hex (Digest.string data) in
434   let dir = Filename.temp_dir_name // sprintf "whenjobs%s" data in
435   mkdir dir 0o700;
436   dir
437
438 (* This is called when a job (child process) exits. *)
439 and handle_sigchld _ =
440   try
441     let pid, status = waitpid [WNOHANG] 0 in
442     if pid > 0 then (
443       (* Look up the PID in the running jobs map. *)
444       let job, dir, serial, time = IntMap.find pid !runningmap in
445       runningmap := IntMap.remove pid !runningmap;
446       serialmap := BigIntMap.remove serial !serialmap;
447       post_job job dir serial time status
448     )
449   with Unix_error _ | Not_found -> ()
450
451 and post_job job dir serial time status =
452   (* If there is a post function, run it. *)
453   (match job.job_post with
454   | None -> ()
455   | Some post ->
456     let code =
457       match status with
458       | WEXITED c -> c
459       | WSIGNALED s | WSTOPPED s -> 1 in
460     let result = {
461       res_job_name = job.job_name;
462       res_serial = serial;
463       res_code = code;
464       res_tmpdir = dir;
465       res_output = dir // "output.txt";
466       res_start_time = time
467     } in
468     try post result
469     with
470     | Failure msg ->
471       Syslog.error "job %s post function failed: %s" job.job_name msg
472     | exn ->
473       Syslog.error "job %s post function exception: %s"
474         job.job_name (Printexc.to_string exn)
475   );
476
477   (* This should be safe because the path cannot contain shell metachars. *)
478   let cmd = sprintf "rm -rf '%s'" dir in
479   ignore (Sys.command cmd)
480
481 (* Intelligent comparison of job names. *)
482 and compare_jobnames name1 name2 =
483   try
484     let len1 = String.length name1
485     and len2 = String.length name2 in
486     if len1 > 4 && len2 > 4 &&
487       String.sub name1 0 4 = "job$" && String.sub name2 0 4 = "job$"
488     then (
489       let i1 = int_of_string (String.sub name1 4 (len1-4)) in
490       let i2 = int_of_string (String.sub name2 4 (len2-4)) in
491       compare i1 i2
492     )
493     else raise Not_found
494   with _ ->
495     compare name1 name2
496
497 let main_loop () =
498   Unixqueue.run esys