~proc_cancel_job
~proc_start_job
~proc_get_job
+ ~proc_set_variables
+ ~proc_get_job_names
+ ~proc_test_variables
+ ~proc_ping_daemon
+ ~proc_whisper_variables
(Rpc_server.Unix addr)
Rpc.Tcp (* not TCP, this is the same as SOCK_STREAM *)
Rpc.Socket
and proc_reload_file () =
if !debug then Syslog.notice "remote call: reload_file";
- try reload_file (); `ok
+ try reload_files (); `ok
with Failure err -> `error err
and proc_set_variable (name, value) =
state := Whenstate.set_variable !state name value;
(* Which jobs need to be re-evaluated? *)
- let jobs = Whenstate.get_dependencies !state name in
- reevaluate_whenjobs jobs;
+ let jobs = Whenstate.get_dependencies !state [name] in
+ let jobnames, state' = reevaluate_whenjobs !state jobs in
+ let state' = run_whenjobs state' jobnames in
+ state := state';
`ok
with
| Some s ->
Rpc_server.stop_server ~graceful:true s;
server := None;
+ Gc.compact (); (* force the server handle to get cleaned up now *)
`ok
and proc_get_jobs () =
and proc_start_job jobname =
try
let job = Whenstate.get_job !state jobname in
- run_job job;
+ let state' = run_job !state job in
+ state := state';
`ok
with
| Not_found -> `error "job not found"
| Not_found -> failwith "job not found"
| exn -> failwith (Printexc.to_string exn)
-(* Reload the jobs file. *)
-and reload_file () =
- let file = sprintf "%s/jobs.cmo" !jobsdir in
+and proc_set_variables vars =
+ try
+ let vars = Array.map (
+ fun { Whenproto_aux.sv_name = name; sv_value = value } ->
+ name, variable_of_rpc value
+ ) vars in
+ let vars = Array.to_list vars in
+
+ if !debug then
+ Syslog.notice "remote call: set_variables (%s)"
+ (String.concat " "
+ (List.map (
+ fun (name, value) ->
+ sprintf "%s=%s" name (string_of_variable value)
+ ) vars));
+
+ List.iter (fun (name, _) -> check_valid_variable_name name) vars;
+
+ (* Update all the variables atomically. *)
+ let s = List.fold_left (
+ fun s (name, value) -> Whenstate.set_variable s name value
+ ) !state vars in
+ state := s;
+
+ (* Which jobs need to be re-evaluated? *)
+ let jobs = Whenstate.get_dependencies !state (List.map fst vars) in
+ let jobnames, state' = reevaluate_whenjobs !state jobs in
+ let state' = run_whenjobs state' jobnames in
+ state := state';
+
+ `ok
+ with
+ Failure msg -> `error msg
+
+and proc_get_job_names () =
+ Array.of_list (Whenstate.get_job_names !state)
+
+and proc_test_variables vars =
+ (* This is the same as proc_set_variables, except that it doesn't
+ * update the state, it just returns the jobs that *would* run if
+ * these variables were set to these values.
+ *)
+ let vars = Array.map (
+ fun { Whenproto_aux.sv_name = name; sv_value = value } ->
+ name, variable_of_rpc value
+ ) vars in
+ let vars = Array.to_list vars in
+
+ if !debug then
+ Syslog.notice "remote call: test_variables (%s)"
+ (String.concat " "
+ (List.map (
+ fun (name, value) ->
+ sprintf "%s=%s" name (string_of_variable value)
+ ) vars));
+
+ List.iter (fun (name, _) -> check_valid_variable_name name) vars;
+
+ (* Update all the variables atomically. *)
+ let state = List.fold_left (
+ fun s (name, value) -> Whenstate.set_variable s name value
+ ) !state vars in
+
+ (* Which jobs WOULD be re-evaluated? *)
+ let jobs = Whenstate.get_dependencies state (List.map fst vars) in
+ let jobnames, _ = reevaluate_whenjobs state jobs in
+
+ (* Return the names. *)
+ Array.of_list jobnames
+
+and proc_ping_daemon () = `ok
+
+and proc_whisper_variables vars =
+ try
+ let vars = Array.map (
+ fun { Whenproto_aux.sv_name = name; sv_value = value } ->
+ name, variable_of_rpc value
+ ) vars in
+ let vars = Array.to_list vars in
+
+ if !debug then
+ Syslog.notice "remote call: whisper_variables (%s)"
+ (String.concat " "
+ (List.map (
+ fun (name, value) ->
+ sprintf "%s=%s" name (string_of_variable value)
+ ) vars));
+
+ List.iter (fun (name, _) -> check_valid_variable_name name) vars;
+
+ (* Update all the variables atomically. *)
+ let s = List.fold_left (
+ fun s (name, value) -> Whenstate.set_variable s name value
+ ) !state vars in
+ state := s;
+
+ (* .. but don't reevaluate or run jobs. *)
+
+ `ok
+ with
+ Failure msg -> `error msg
+
+(* Reload the jobs file(s). *)
+and reload_files () =
+ (* Get the highest numbered dir/jobs__*.cmo (bytecode) or
+ * dir/jobs__*.cmxs (native code) file and load it. Delete
+ * lower-numbered (== older) files.
+ *)
+ let filename =
+ let suffix, slen =
+ if not Dynlink.is_native then ".cmo", 4 else ".cmxs", 5 in
+ let dir = !jobsdir in
+ let files = Array.to_list (Sys.readdir dir) in
+ let times = filter_map (
+ fun file ->
+ if not (string_startswith file "jobs__") ||
+ not (string_endswith file suffix) then
+ None
+ else (
+ let len = String.length file in
+ let t = String.sub file 6 (len-slen-6) in
+ (* Use int64 because t won't necessarily fit into 31 bit int. *)
+ try Some (Int64.of_string t)
+ with Failure "int_of_string" -> assert false
+ )
+ ) files in
+ let times = List.rev (List.sort compare times) in
+ match times with
+ | [] -> None
+ | x::xs ->
+ (* Unlink the older files. *)
+ List.iter (
+ fun t ->
+ try unlink (dir // sprintf "jobs__%Ld%s" t suffix)
+ with Unix_error _ -> ()
+ ) xs;
+ (* Return the newest (highest numbered) file. *)
+ Some (dir // sprintf "jobs__%Ld%s" x suffix) in
(* As we are reloading the file, we want to create a new state
* that has no jobs, but has all the variables from the previous
Whenfile.init s;
let s =
- try
- Dynlink.loadfile file;
- let s = Whenfile.get_state () in
- Syslog.notice "loaded %d job(s) from %s" (Whenstate.nr_jobs s) file;
+ match filename with
+ | None ->
+ (* no jobs file, return the same state *)
+ Syslog.notice "no jobs file found";
s
- with
- | Dynlink.Error err ->
- let err = Dynlink.error_message err in
- Syslog.error "error loading jobs: %s" err;
- failwith err
- | exn ->
- failwith (Printexc.to_string exn) in
+ | Some filename ->
+ try
+ Dynlink.loadfile filename;
+ let s = Whenfile.get_state () in
+ Syslog.notice "loaded %d job(s)" (Whenstate.nr_jobs s);
+ s
+ with
+ | Dynlink.Error err ->
+ let err = Dynlink.error_message err in
+ Syslog.error "error loading jobs: %s" err;
+ failwith err
+ | exn ->
+ failwith (Printexc.to_string exn) in
let s = Whenstate.copy_prev_state !state s in
state := s;
(* Re-evaluate all when jobs. *)
- reevaluate_whenjobs ~onload:true (Whenstate.get_whenjobs !state);
+ let jobs = Whenstate.get_whenjobs !state in
+ let jobnames, state' = reevaluate_whenjobs ~onload:true !state jobs in
+ let state' = run_whenjobs state' jobnames in
+ state := state';
(* Schedule the next every job to run. *)
schedule_next_everyjob ()
(* Re-evaluate each when-statement job, in a loop until we reach
- * a fixpoint. Run those that need to be run.
+ * a fixpoint. Return the list of job names that should run and
+ * the updated state.
*)
-and reevaluate_whenjobs ?onload jobs =
- let rec loop set jobs =
- let set' =
+and reevaluate_whenjobs ?onload state jobs =
+ let rec loop (set, state) jobs =
+ let set', state' =
List.fold_left (
- fun set job ->
+ fun (set, state) job ->
let r, state' =
- try Whenstate.evaluate_whenjob ?onload !state job
+ try Whenstate.evaluate_whenjob ?onload state job
with Invalid_argument err | Failure err ->
Syslog.error "error evaluating job %s (at %s): %s"
job.job_name (Camlp4.PreCast.Ast.Loc.to_string job.job_loc) err;
- false, !state in
-
- state := state';
+ false, state in
if !debug then
Syslog.notice "evaluate %s -> %b\n" job.job_name r;
- if r then StringSet.add job.job_name set else set
- ) set jobs in
+ (if r then StringSet.add job.job_name set else set), state'
+ ) (set, state) jobs in
+ (* reached a fixpoint? *)
if StringSet.compare set set' <> 0 then
- loop set' jobs
+ loop (set', state') jobs
else
- set'
+ (set', state')
in
- let set = loop StringSet.empty jobs in
+ let set, state = loop (StringSet.empty, state) jobs in
let jobnames = StringSet.elements set in
(* Ensure the jobs always run in predictable (name) order. *)
let jobnames = List.sort compare_jobnames jobnames in
+ jobnames, state
+and run_whenjobs state jobnames =
(* Run the jobs. *)
- List.iter run_job (List.map (Whenstate.get_job !state) jobnames)
+ let jobs = List.map (Whenstate.get_job state) jobnames in
+ List.fold_left run_job state jobs
(* Schedule the next every-statement job to run, if there is one. We
* look at the every jobs, work out the time that each must run at,
let t_diff = if t_diff < 0. then 0. else t_diff in
let run_jobs () =
delete_timer_group (); (* Delete the timer. *)
- List.iter run_job jobs;
+ let state' = List.fold_left run_job !state jobs in
+ state := state';
schedule_next_everyjob ()
in
Unixqueue.weak_once esys g t_diff run_jobs;
Unixqueue.clear esys g;
timer_group := None
-and run_job job =
+and run_job state job =
(* Increment JOBSERIAL. *)
- let serial =
- match Whenstate.get_variable !state "JOBSERIAL" with
+ let serial, state =
+ match Whenstate.get_variable state "JOBSERIAL" with
| T_int serial ->
let serial = succ_big_int serial in
- state := Whenstate.set_variable !state "JOBSERIAL" (T_int serial);
- serial
+ let state' = Whenstate.set_variable state "JOBSERIAL" (T_int serial) in
+ serial, state'
| _ -> assert false in
(* Call the pre-condition script. Note this may decide not to run
let preinfo = {
pi_job_name = job.job_name;
pi_serial = serial;
- pi_variables = Whenstate.get_variables !state;
+ pi_variables = Whenstate.get_variables state;
pi_running = !rs;
} in
pre preinfo
(* Set environment variables corresponding to each variable. *)
List.iter
(fun (name, value) -> putenv name (string_of_variable value))
- (Whenstate.get_variables !state);
+ (Whenstate.get_variables state);
(* Set the $JOBNAME environment variable. *)
putenv "JOBNAME" job.job_name;
* can clean up when the child exits.
*)
runningmap := IntMap.add pid (job, dir, serial, time ()) !runningmap;
- serialmap := BigIntMap.add serial pid !serialmap
+ serialmap := BigIntMap.add serial pid !serialmap;
+
+ state
)
else (
Syslog.notice "not running %s (JOBSERIAL=%s) because pre() condition returned false"
job.job_name (string_of_big_int serial);
+
+ state
)
and tmpdir () =