(* $HOME/.whenjobs *)
let jobsdir = ref ""
-(* The state. *)
+(* The state.
+ *
+ * Note that whenever this is updated, you need to consider if you
+ * should call 'save_variables ()' (which persists the variables to a
+ * file). XXX We should replace this ref with an accessor
+ * function.
+ *)
let state = ref Whenstate.empty
+(* Format used to save variables. Note we can't allow any internal
+ * types to "escape" into this definition, else the file format will
+ * change when parts of the program change.
+ *)
+type variables_file_v1 = (string * variable_v1) list
+and variable_v1 =
+ | Vv1_unit
+ | Vv1_bool of bool
+ | Vv1_string of string
+ | Vv1_int of big_int
+ | Vv1_float of float
+
+let variable_of_variable_v1 = function
+ | Vv1_unit -> T_unit
+ | Vv1_bool b -> T_bool b
+ | Vv1_string s -> T_string s
+ | Vv1_int i -> T_int i
+ | Vv1_float f -> T_float f
+
+let variable_v1_of_variable = function
+ | T_unit -> Vv1_unit
+ | T_bool b -> Vv1_bool b
+ | T_string s -> Vv1_string s
+ | T_int i -> Vv1_int i
+ | T_float f -> Vv1_float f
+
(* Jobs that are running: a map of PID -> (job, tmpdir, serial, start_time).
* Note that the job may no longer exist *OR* it may have been renamed,
* eg. if the jobs file was reloaded.
~proc_set_variables
~proc_get_job_names
~proc_test_variables
+ ~proc_ping_daemon
+ ~proc_whisper_variables
(Rpc_server.Unix addr)
Rpc.Tcp (* not TCP, this is the same as SOCK_STREAM *)
Rpc.Socket
(* Handle SIGCHLD to clean up jobs. *)
Sys.set_signal Sys.sigchld (Sys.Signal_handle handle_sigchld);
- (* Initialize the variables. *)
- state := Whenstate.set_variable !state "JOBSERIAL" (T_int zero_big_int)
+ (* Load or initialize the variables. *)
+ let variables_file = sprintf "%s/variables" !jobsdir in
+ state :=
+ try
+ let chan = open_in variables_file in
+ let r = load_variables !state chan in
+ close_in chan;
+ r
+ with
+ | Sys_error _ ->
+ Whenstate.set_variable !state "JOBSERIAL" (T_int zero_big_int)
+
+(* Try to load the variables from the file. If the file exists and
+ * cannot be read, raise an exception.
+ *)
+and load_variables state chan =
+ let signature = input_line chan in
+ if signature = "WHENJOBS VARIABLES VERSION 1" then (
+ let variables : variables_file_v1 = input_value chan in
+ List.fold_left (
+ fun state (n, v) ->
+ Whenstate.set_variable state n (variable_of_variable_v1 v)
+ ) state variables
+ ) else (* in future, other signatures, but for now ... *)
+ failwith (sprintf "cannot read variables file: invalid signature: %s"
+ signature)
+
+and save_variables () =
+ let variables_file = sprintf "%s/variables" !jobsdir in
+ let new_file = variables_file ^ ".new" in
+ let chan = open_out new_file in
+ fprintf chan "WHENJOBS VARIABLES VERSION 1\n";
+ let variables = Whenstate.get_variables !state in
+ let variables =
+ List.map (fun (n, v) -> n, variable_v1_of_variable v) variables in
+ output_value chan variables;
+
+ (* Try to arrange that the new file is updated atomically. *)
+ flush chan;
+ Netsys_posix.fsync (descr_of_out_channel chan);
+ close_out chan;
+ rename new_file variables_file
and proc_reload_file () =
if !debug then Syslog.notice "remote call: reload_file";
- try reload_file (); `ok
+ try reload_files (); `ok
with Failure err -> `error err
and proc_set_variable (name, value) =
let jobnames, state' = reevaluate_whenjobs !state jobs in
let state' = run_whenjobs state' jobnames in
state := state';
+ save_variables ();
`ok
with
| Some s ->
Rpc_server.stop_server ~graceful:true s;
server := None;
+ Gc.compact (); (* force the server handle to get cleaned up now *)
`ok
and proc_get_jobs () =
let job = Whenstate.get_job !state jobname in
let state' = run_job !state job in
state := state';
+ save_variables ();
`ok
with
| Not_found -> `error "job not found"
let jobnames, state' = reevaluate_whenjobs !state jobs in
let state' = run_whenjobs state' jobnames in
state := state';
+ save_variables ();
`ok
with
(* Return the names. *)
Array.of_list jobnames
-(* Reload the jobs file. *)
-and reload_file () =
- let file = sprintf "%s/jobs.cmo" !jobsdir in
+and proc_ping_daemon () = `ok
+
+and proc_whisper_variables vars =
+ try
+ let vars = Array.map (
+ fun { Whenproto_aux.sv_name = name; sv_value = value } ->
+ name, variable_of_rpc value
+ ) vars in
+ let vars = Array.to_list vars in
+
+ if !debug then
+ Syslog.notice "remote call: whisper_variables (%s)"
+ (String.concat " "
+ (List.map (
+ fun (name, value) ->
+ sprintf "%s=%s" name (string_of_variable value)
+ ) vars));
+
+ List.iter (fun (name, _) -> check_valid_variable_name name) vars;
+
+ (* Update all the variables atomically. *)
+ let s = List.fold_left (
+ fun s (name, value) -> Whenstate.set_variable s name value
+ ) !state vars in
+ state := s;
+ save_variables ();
+
+ (* .. but don't reevaluate or run jobs. *)
+
+ `ok
+ with
+ Failure msg -> `error msg
+
+(* Reload the jobs file(s). *)
+and reload_files () =
+ (* Get the highest numbered dir/jobs__*.cmo (bytecode) or
+ * dir/jobs__*.cmxs (native code) file and load it. Delete
+ * lower-numbered (== older) files.
+ *)
+ let filename =
+ let suffix, slen =
+ if not Dynlink.is_native then ".cmo", 4 else ".cmxs", 5 in
+ let dir = !jobsdir in
+ let files = Array.to_list (Sys.readdir dir) in
+ let times = filter_map (
+ fun file ->
+ if not (string_startswith file "jobs__") ||
+ not (string_endswith file suffix) then
+ None
+ else (
+ let len = String.length file in
+ let t = String.sub file 6 (len-slen-6) in
+ (* Use int64 because t won't necessarily fit into 31 bit int. *)
+ try Some (Int64.of_string t)
+ with Failure "int_of_string" -> assert false
+ )
+ ) files in
+ let times = List.rev (List.sort compare times) in
+ match times with
+ | [] -> None
+ | x::xs ->
+ (* Unlink the older files. *)
+ List.iter (
+ fun t ->
+ try unlink (dir // sprintf "jobs__%Ld%s" t suffix)
+ with Unix_error _ -> ()
+ ) xs;
+ (* Return the newest (highest numbered) file. *)
+ Some (dir // sprintf "jobs__%Ld%s" x suffix) in
(* As we are reloading the file, we want to create a new state
* that has no jobs, but has all the variables from the previous
Whenfile.init s;
let s =
- try
- Dynlink.loadfile file;
- let s = Whenfile.get_state () in
- Syslog.notice "loaded %d job(s) from %s" (Whenstate.nr_jobs s) file;
+ match filename with
+ | None ->
+ (* no jobs file, return the same state *)
+ Syslog.notice "no jobs file found";
s
- with
- | Dynlink.Error err ->
- let err = Dynlink.error_message err in
- Syslog.error "error loading jobs: %s" err;
- failwith err
- | exn ->
- failwith (Printexc.to_string exn) in
+ | Some filename ->
+ try
+ Dynlink.loadfile filename;
+ let s = Whenfile.get_state () in
+ Syslog.notice "loaded %d job(s)" (Whenstate.nr_jobs s);
+ s
+ with
+ | Dynlink.Error err ->
+ let err = Dynlink.error_message err in
+ Syslog.error "error loading jobs: %s" err;
+ failwith err
+ | exn ->
+ failwith (Printexc.to_string exn) in
let s = Whenstate.copy_prev_state !state s in
state := s;
let jobnames, state' = reevaluate_whenjobs ~onload:true !state jobs in
let state' = run_whenjobs state' jobnames in
state := state';
+ save_variables ();
(* Schedule the next every job to run. *)
schedule_next_everyjob ()
delete_timer_group (); (* Delete the timer. *)
let state' = List.fold_left run_job !state jobs in
state := state';
+ save_variables ();
schedule_next_everyjob ()
in
Unixqueue.weak_once esys g t_diff run_jobs;