Fix typo in documentation of 'whenjobs --test' option.
[whenjobs.git] / daemon / daemon.ml
1 (* whenjobs
2  * Copyright (C) 2012 Red Hat Inc.
3  *
4  * This program is free software; you can redistribute it and/or modify
5  * it under the terms of the GNU General Public License as published by
6  * the Free Software Foundation; either version 2 of the License, or
7  * (at your option) any later version.
8  *
9  * This program is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  * GNU General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License along
15  * with this program; if not, write to the Free Software Foundation, Inc.,
16  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17  *)
18
19 open Whenutils
20 open Whenexpr
21
22 open Big_int
23 open Unix
24 open Printf
25
26 (* See [exit.c]. *)
27 external _exit : int -> 'a = "whenjobs__exit"
28
29 (* $HOME/.whenjobs *)
30 let jobsdir = ref ""
31
32 (* The state. *)
33 let state = ref Whenstate.empty
34
35 (* Jobs that are running: a map of PID -> (job, tmpdir, serial, start_time).
36  * Note that the job may no longer exist *OR* it may have been renamed,
37  * eg. if the jobs file was reloaded.
38  *)
39 let runningmap = ref IntMap.empty
40
41 (* Serial numbers of running jobs.  Map of serial -> PID (in runningmap). *)
42 let serialmap = ref BigIntMap.empty
43
44 (* Was debugging requested on the command line? *)
45 let debug = ref false
46
47 (* The server. *)
48 let server = ref None
49
50 let esys = Unixqueue.standard_event_system ()
51
52 (* The timer.  It's convenient to have this as a global variable
53  * because (a) there should only be one timer (which fires when the
54  * soonest every-job becomes ready), and (b) it's complicated to track
55  * that timer and avoid it getting double-scheduled (eg.  when we
56  * reload the jobs file) without having a global variable.
57  *)
58 let timer_group = ref None
59
60 let rec init j d =
61   jobsdir := j;
62   debug := d;
63
64   Whenlock.create_lock !jobsdir;
65
66   (* Remove old socket if it exists. *)
67   let addr = sprintf "%s/socket" !jobsdir in
68   (try unlink addr with Unix_error _ -> ());
69
70   (* Create the Unix domain socket server. *)
71   server := Some (
72     Whenproto_srv.When.V1.create_server
73       ~proc_reload_file
74       ~proc_set_variable
75       ~proc_get_variable
76       ~proc_get_variable_names
77       ~proc_exit_daemon
78       ~proc_get_jobs
79       ~proc_cancel_job
80       ~proc_start_job
81       ~proc_get_job
82       ~proc_set_variables
83       ~proc_get_job_names
84       ~proc_test_variables
85       ~proc_ping_daemon
86       (Rpc_server.Unix addr)
87       Rpc.Tcp (* not TCP, this is the same as SOCK_STREAM *)
88       Rpc.Socket
89       esys
90   );
91
92   (* Handle SIGCHLD to clean up jobs. *)
93   Sys.set_signal Sys.sigchld (Sys.Signal_handle handle_sigchld);
94
95   (* Initialize the variables. *)
96   state := Whenstate.set_variable !state "JOBSERIAL" (T_int zero_big_int)
97
98 and proc_reload_file () =
99   if !debug then Syslog.notice "remote call: reload_file";
100
101   try reload_file (); `ok
102   with Failure err -> `error err
103
104 and proc_set_variable (name, value) =
105   if !debug then Syslog.notice "remote call: set_variable %s" name;
106
107   try
108     check_valid_variable_name name;
109
110     let value = variable_of_rpc value in
111     state := Whenstate.set_variable !state name value;
112
113     (* Which jobs need to be re-evaluated? *)
114     let jobs = Whenstate.get_dependencies !state [name] in
115     let jobnames, state' = reevaluate_whenjobs !state jobs in
116     let state' = run_whenjobs state' jobnames in
117     state := state';
118
119     `ok
120   with
121     Failure msg -> `error msg
122
123 and proc_get_variable name =
124   if !debug then Syslog.notice "remote call: get_variable %s" name;
125
126   rpc_of_variable (Whenstate.get_variable !state name)
127
128 and proc_get_variable_names () =
129   if !debug then Syslog.notice "remote call: get_variable_names";
130
131   let vars = Whenstate.get_variable_names !state in
132
133   (* Return variable names as a sorted array. *)
134   let vars = Array.of_list vars in
135   Array.sort compare vars;
136   vars
137
138 and proc_exit_daemon () =
139   if !debug then Syslog.notice "remote call: exit_daemon";
140
141   match !server with
142   | None ->
143     `error "exit_daemon: no server handle"
144   | Some s ->
145     Rpc_server.stop_server ~graceful:true s;
146     server := None;
147     `ok
148
149 and proc_get_jobs () =
150   let running = Array.of_list (IntMap.values !runningmap) in
151   Array.map (
152     fun (job, dir, serial, start_time) ->
153       { Whenproto_aux.job_name = job.job_name;
154         job_serial = string_of_big_int serial;
155         job_tmpdir = dir; job_start_time = Int64.of_float start_time }
156   ) running
157
158 and proc_cancel_job serial =
159   try
160     let serial = big_int_of_string serial in
161     let pid = BigIntMap.find serial !serialmap in
162     kill pid 15;
163     `ok
164   with
165   | Not_found -> `error "job not found"
166   | exn -> `error (Printexc.to_string exn)
167
168 and proc_start_job jobname =
169   try
170     let job = Whenstate.get_job !state jobname in
171     let state' = run_job !state job in
172     state := state';
173     `ok
174   with
175   | Not_found -> `error "job not found"
176   | exn -> `error (Printexc.to_string exn)
177
178 and proc_get_job serial =
179   try
180     let serial = big_int_of_string serial in
181     let pid = BigIntMap.find serial !serialmap in
182     let job, dir, serial, start_time = IntMap.find pid !runningmap in
183     { Whenproto_aux.job_name = job.job_name;
184       job_serial = string_of_big_int serial;
185       job_tmpdir = dir; job_start_time = Int64.of_float start_time }
186   with
187   | Not_found -> failwith "job not found"
188   | exn -> failwith (Printexc.to_string exn)
189
190 and proc_set_variables vars =
191   try
192     let vars = Array.map (
193       fun { Whenproto_aux.sv_name = name; sv_value = value } ->
194         name, variable_of_rpc value
195     ) vars in
196     let vars = Array.to_list vars in
197
198     if !debug then
199       Syslog.notice "remote call: set_variables (%s)"
200         (String.concat " "
201            (List.map (
202              fun (name, value) ->
203                sprintf "%s=%s" name (string_of_variable value)
204             ) vars));
205
206     List.iter (fun (name, _) -> check_valid_variable_name name) vars;
207
208     (* Update all the variables atomically. *)
209     let s = List.fold_left (
210       fun s (name, value) -> Whenstate.set_variable s name value
211     ) !state vars in
212     state := s;
213
214     (* Which jobs need to be re-evaluated? *)
215     let jobs = Whenstate.get_dependencies !state (List.map fst vars) in
216     let jobnames, state' = reevaluate_whenjobs !state jobs in
217     let state' = run_whenjobs state' jobnames in
218     state := state';
219
220     `ok
221   with
222     Failure msg -> `error msg
223
224 and proc_get_job_names () =
225   Array.of_list (Whenstate.get_job_names !state)
226
227 and proc_test_variables vars =
228   (* This is the same as proc_set_variables, except that it doesn't
229    * update the state, it just returns the jobs that *would* run if
230    * these variables were set to these values.
231    *)
232   let vars = Array.map (
233     fun { Whenproto_aux.sv_name = name; sv_value = value } ->
234       name, variable_of_rpc value
235   ) vars in
236   let vars = Array.to_list vars in
237
238   if !debug then
239     Syslog.notice "remote call: test_variables (%s)"
240       (String.concat " "
241          (List.map (
242            fun (name, value) ->
243              sprintf "%s=%s" name (string_of_variable value)
244           ) vars));
245
246   List.iter (fun (name, _) -> check_valid_variable_name name) vars;
247
248   (* Update all the variables atomically. *)
249   let state = List.fold_left (
250     fun s (name, value) -> Whenstate.set_variable s name value
251   ) !state vars in
252
253   (* Which jobs WOULD be re-evaluated? *)
254   let jobs = Whenstate.get_dependencies state (List.map fst vars) in
255   let jobnames, _ = reevaluate_whenjobs state jobs in
256
257   (* Return the names. *)
258   Array.of_list jobnames
259
260 and proc_ping_daemon () = `ok
261
262 (* Reload the jobs file. *)
263 and reload_file () =
264   let file = sprintf "%s/jobs.cmo" !jobsdir in
265
266   (* As we are reloading the file, we want to create a new state
267    * that has no jobs, but has all the variables from the previous
268    * state.
269    *)
270   let s = Whenstate.copy_variables !state Whenstate.empty in
271   Whenfile.init s;
272
273   let s =
274     try
275       Dynlink.loadfile file;
276       let s = Whenfile.get_state () in
277       Syslog.notice "loaded %d job(s) from %s" (Whenstate.nr_jobs s) file;
278       s
279     with
280     | Dynlink.Error err ->
281       let err = Dynlink.error_message err in
282       Syslog.error "error loading jobs: %s" err;
283       failwith err
284     | exn ->
285       failwith (Printexc.to_string exn) in
286
287   let s = Whenstate.copy_prev_state !state s in
288   state := s;
289
290   (* Re-evaluate all when jobs. *)
291   let jobs = Whenstate.get_whenjobs !state in
292   let jobnames, state' = reevaluate_whenjobs ~onload:true !state jobs in
293   let state' = run_whenjobs state' jobnames in
294   state := state';
295
296   (* Schedule the next every job to run. *)
297   schedule_next_everyjob ()
298
299 (* Re-evaluate each when-statement job, in a loop until we reach
300  * a fixpoint.  Return the list of job names that should run and
301  * the updated state.
302  *)
303 and reevaluate_whenjobs ?onload state jobs =
304   let rec loop (set, state) jobs =
305     let set', state' =
306       List.fold_left (
307         fun (set, state) job ->
308           let r, state' =
309             try Whenstate.evaluate_whenjob ?onload state job
310             with Invalid_argument err | Failure err ->
311               Syslog.error "error evaluating job %s (at %s): %s"
312                 job.job_name (Camlp4.PreCast.Ast.Loc.to_string job.job_loc) err;
313               false, state in
314
315           if !debug then
316             Syslog.notice "evaluate %s -> %b\n" job.job_name r;
317
318           (if r then StringSet.add job.job_name set else set), state'
319       ) (set, state) jobs in
320     (* reached a fixpoint? *)
321     if StringSet.compare set set' <> 0 then
322       loop (set', state') jobs
323     else
324       (set', state')
325   in
326   let set, state = loop (StringSet.empty, state) jobs in
327   let jobnames = StringSet.elements set in
328
329   (* Ensure the jobs always run in predictable (name) order. *)
330   let jobnames = List.sort compare_jobnames jobnames in
331   jobnames, state
332
333 and run_whenjobs state jobnames =
334   (* Run the jobs. *)
335   let jobs = List.map (Whenstate.get_job state) jobnames in
336   List.fold_left run_job state jobs
337
338 (* Schedule the next every-statement job to run, if there is one.  We
339  * look at the every jobs, work out the time that each must run at,
340  * pick the job(s) which must run soonest, and schedule a timer to run
341  * them.  When the timer fires, it runs those jobs, then calls this
342  * function again.
343  *)
344 and schedule_next_everyjob () =
345   let t = time () in
346
347   (* Get only everyjobs. *)
348   let jobs = Whenstate.get_everyjobs !state in
349   let jobs = List.map (
350     function
351     | { job_cond = Every_job period } as job -> (job, period)
352     | { job_cond = When_job _ } -> assert false
353   ) jobs in
354
355   (* Map everyjob to next time it must run. *)
356   let jobs = List.map (
357     fun (job, period) ->
358       let t' = next_periodexpr t period in
359       assert (t' > t); (* serious bug in next_periodexpr if false *)
360       job, t'
361   ) jobs in
362
363   (* Sort, soonest first. *)
364   let jobs = List.sort (fun (_,a) (_,b) -> compare a b) jobs in
365
366   if !debug then (
367     List.iter (
368       fun (job, t) ->
369         Syslog.notice "%s: next scheduled run at %s"
370           job.job_name (string_of_time_t t)
371     ) jobs
372   );
373
374   (* Pick the job(s) which run soonest. *)
375   let rec pick = function
376     | [] -> 0., []
377     | [j, t] -> t, [j]
378     | (j1, t) :: (j2, t') :: _ when t < t' -> t, [j1]
379     | (j1, t) :: (((j2, t') :: _) as rest) -> t, (j1 :: snd (pick rest))
380   in
381   let t, jobs = pick jobs in
382
383   if t > 0. then (
384     if jobs <> [] then (
385       (* Ensure the jobs always run in predictable (name) order. *)
386       let jobs =
387         List.sort (fun {job_name = a} {job_name = b} -> compare_jobnames a b)
388           jobs in
389
390       if !debug then
391         Syslog.notice "scheduling job(s) %s to run at %s"
392           (String.concat ", " (List.map (fun { job_name = name } -> name) jobs))
393           (string_of_time_t t);
394
395       (* Schedule them to run at time t. *)
396       let g = new_timer_group () in
397       let t_diff = t -. Unix.time () in
398       let t_diff = if t_diff < 0. then 0. else t_diff in
399       let run_jobs () =
400         delete_timer_group ();          (* Delete the timer. *)
401         let state' = List.fold_left run_job !state jobs in
402         state := state';
403         schedule_next_everyjob ()
404       in
405       Unixqueue.weak_once esys g t_diff run_jobs;
406     )
407   )
408
409 and new_timer_group () =
410   delete_timer_group ();
411   let g = Unixqueue.new_group esys in
412   timer_group := Some g;
413   g
414
415 and delete_timer_group () =
416   match !timer_group with
417   | None -> ()
418   | Some g ->
419     Unixqueue.clear esys g;
420     timer_group := None
421
422 and run_job state job =
423   (* Increment JOBSERIAL. *)
424   let serial, state =
425     match Whenstate.get_variable state "JOBSERIAL" with
426     | T_int serial ->
427       let serial = succ_big_int serial in
428       let state' = Whenstate.set_variable state "JOBSERIAL" (T_int serial) in
429       serial, state'
430     | _ -> assert false in
431
432   (* Call the pre-condition script.  Note this may decide not to run
433    * the job by returning false.
434    *)
435   let pre_condition () =
436     match job.job_pre with
437     | None -> true
438     | Some pre ->
439       let rs = ref [] in
440       IntMap.iter (
441         fun pid (job, _, serial, start_time) ->
442           let r = { pirun_job_name = job.job_name;
443                     pirun_serial = serial;
444                     pirun_start_time = start_time;
445                     pirun_pid = pid } in
446           rs := r :: !rs
447       ) !runningmap;
448       let preinfo = {
449         pi_job_name = job.job_name;
450         pi_serial = serial;
451         pi_variables = Whenstate.get_variables state;
452         pi_running = !rs;
453       } in
454       pre preinfo
455   in
456   if pre_condition () then (
457     Syslog.notice "running %s (JOBSERIAL=%s)"
458       job.job_name (string_of_big_int serial);
459
460     (* Create a temporary directory.  The current directory of the job
461      * will be in this directory.  The directory is removed when the
462      * child process exits.
463      *)
464     let dir = tmpdir () in
465
466     let pid = fork () in
467     if pid = 0 then ( (* child process running the job *)
468       chdir dir;
469
470       (* Set environment variables corresponding to each variable. *)
471       List.iter
472         (fun (name, value) -> putenv name (string_of_variable value))
473         (Whenstate.get_variables state);
474
475       (* Set the $JOBNAME environment variable. *)
476       putenv "JOBNAME" job.job_name;
477
478       (* Create a temporary file containing the shell script fragment. *)
479       let script = dir // "script.sh" in
480       let chan = open_out script in
481       fprintf chan "set -e\n"; (* So that jobs exit on error. *)
482       output_string chan job.job_script.sh_script;
483       close_out chan;
484       chmod script 0o700;
485
486       let shell = try getenv "SHELL" with Not_found -> "/bin/sh" in
487
488       (* Set output to file. *)
489       let output = dir // "output.txt" in
490       let fd = openfile output [O_WRONLY; O_CREAT; O_TRUNC; O_NOCTTY] 0o600 in
491       dup2 fd stdout;
492       dup2 fd stderr;
493       close fd;
494
495       (* Execute the shell script. *)
496       (try execvp shell [| shell; "-c"; script |];
497        with Unix_error (err, fn, _) ->
498          Syslog.error "%s failed: %s: %s" fn script (error_message err)
499       );
500       _exit 1
501     );
502
503     (* Remember this PID, the job and the temporary directory, so we
504      * can clean up when the child exits.
505      *)
506     runningmap := IntMap.add pid (job, dir, serial, time ()) !runningmap;
507     serialmap := BigIntMap.add serial pid !serialmap;
508
509     state
510   )
511   else (
512     Syslog.notice "not running %s (JOBSERIAL=%s) because pre() condition returned false"
513       job.job_name (string_of_big_int serial);
514
515     state
516   )
517
518 and tmpdir () =
519   let chan = open_in "/dev/urandom" in
520   let data = String.create 16 in
521   really_input chan data 0 (String.length data);
522   close_in chan;
523   let data = Digest.to_hex (Digest.string data) in
524   let dir = Filename.temp_dir_name // sprintf "whenjobs%s" data in
525   mkdir dir 0o700;
526   dir
527
528 (* This is called when a job (child process) exits. *)
529 and handle_sigchld _ =
530   try
531     let pid, status = waitpid [WNOHANG] 0 in
532     if pid > 0 then (
533       (* Look up the PID in the running jobs map. *)
534       let job, dir, serial, time = IntMap.find pid !runningmap in
535       runningmap := IntMap.remove pid !runningmap;
536       serialmap := BigIntMap.remove serial !serialmap;
537       post_job job dir serial time status
538     )
539   with Unix_error _ | Not_found -> ()
540
541 and post_job job dir serial time status =
542   (* If there is a post function, run it. *)
543   (match job.job_post with
544   | None -> ()
545   | Some post ->
546     let code =
547       match status with
548       | WEXITED c -> c
549       | WSIGNALED s | WSTOPPED s -> 1 in
550     let result = {
551       res_job_name = job.job_name;
552       res_serial = serial;
553       res_code = code;
554       res_tmpdir = dir;
555       res_output = dir // "output.txt";
556       res_start_time = time
557     } in
558     try post result
559     with
560     | Failure msg ->
561       Syslog.error "job %s post function failed: %s" job.job_name msg
562     | exn ->
563       Syslog.error "job %s post function exception: %s"
564         job.job_name (Printexc.to_string exn)
565   );
566
567   (* This should be safe because the path cannot contain shell metachars. *)
568   let cmd = sprintf "rm -rf '%s'" dir in
569   ignore (Sys.command cmd)
570
571 (* Intelligent comparison of job names. *)
572 and compare_jobnames name1 name2 =
573   try
574     let len1 = String.length name1
575     and len2 = String.length name2 in
576     if len1 > 4 && len2 > 4 &&
577       String.sub name1 0 4 = "job$" && String.sub name2 0 4 = "job$"
578     then (
579       let i1 = int_of_string (String.sub name1 4 (len1-4)) in
580       let i2 = int_of_string (String.sub name2 4 (len2-4)) in
581       compare i1 i2
582     )
583     else raise Not_found
584   with _ ->
585     compare name1 name2
586
587 let main_loop () =
588   Unixqueue.run esys