(* The state. *)
let state = ref Whenstate.empty
-(* Jobs that are running: a map of PID -> (job, tmpdir, serial).
+(* Jobs that are running: a map of PID -> (job, tmpdir, serial, start_time).
* Note that the job may no longer exist *OR* it may have been renamed,
* eg. if the jobs file was reloaded.
*)
-let running = ref IntMap.empty
+let runningmap = ref IntMap.empty
+
+(* Serial numbers of running jobs. Map of serial -> PID (in runningmap). *)
+let serialmap = ref BigIntMap.empty
(* Was debugging requested on the command line? *)
let debug = ref false
~proc_get_variable
~proc_get_variable_names
~proc_exit_daemon
+ ~proc_get_jobs
+ ~proc_cancel_job
+ ~proc_start_job
(Rpc_server.Unix addr)
Rpc.Tcp (* not TCP, this is the same as SOCK_STREAM *)
Rpc.Socket
server := None;
`ok
+and proc_get_jobs () =
+ let running = Array.of_list (IntMap.values !runningmap) in
+ Array.map (
+ fun (job, dir, serial, start_time) ->
+ { Whenproto_aux.job_name = job.job_name;
+ job_serial = string_of_big_int serial;
+ job_tmpdir = dir; job_start_time = Int64.of_float start_time }
+ ) running
+
+and proc_cancel_job serial =
+ try
+ let serial = big_int_of_string serial in
+ let pid = BigIntMap.find serial !serialmap in
+ kill pid 15;
+ `ok
+ with
+ | Not_found -> `error "job not found"
+ | exn -> `error (Printexc.to_string exn)
+
+and proc_start_job jobname =
+ try
+ let job = Whenstate.get_job !state jobname in
+ run_job job;
+ `ok
+ with
+ | Not_found -> `error "job not found"
+ | exn -> `error (Printexc.to_string exn)
+
(* Reload the jobs file. *)
and reload_file () =
let file = sprintf "%s/jobs.cmo" !jobsdir in
Unixqueue.clear esys g;
timer_group := None
-and string_of_time_t t =
- let tm = gmtime t in
- sprintf "%04d-%02d-%02d %02d:%02d:%02d UTC"
- (1900+tm.tm_year) (1+tm.tm_mon) tm.tm_mday
- tm.tm_hour tm.tm_min tm.tm_sec
-
and run_job job =
(* Increment JOBSERIAL. *)
let serial =
(* Remember this PID, the job and the temporary directory, so we
* can clean up when the child exits.
*)
- running := IntMap.add pid (job, dir, serial) !running
+ runningmap := IntMap.add pid (job, dir, serial, time ()) !runningmap;
+ serialmap := BigIntMap.add serial pid !serialmap
and tmpdir () =
let chan = open_in "/dev/urandom" in
let pid, status = waitpid [WNOHANG] 0 in
if pid > 0 then (
(* Look up the PID in the running jobs map. *)
- let job, dir, serial = IntMap.find pid !running in
- running := IntMap.remove pid !running;
- cleanup_job job dir serial status
+ let job, dir, serial, time = IntMap.find pid !runningmap in
+ runningmap := IntMap.remove pid !runningmap;
+ serialmap := BigIntMap.remove serial !serialmap;
+ post_job job dir serial time status
)
with Unix_error _ | Not_found -> ()
-and cleanup_job job dir serial status =
- (* If there is a cleanup function, run it. *)
- (match job.job_cleanup with
+and post_job job dir serial time status =
+ (* If there is a post function, run it. *)
+ (match job.job_post with
| None -> ()
- | Some cleanup ->
+ | Some post ->
let code =
match status with
| WEXITED c -> c
res_serial = serial;
res_code = code;
res_tmpdir = dir;
- res_output = dir // "output.txt"
+ res_output = dir // "output.txt";
+ res_start_time = time
} in
- try cleanup result
+ try post result
with
| Failure msg ->
- Syslog.error "job %s cleanup function failed: %s" job.job_name msg
+ Syslog.error "job %s post function failed: %s" job.job_name msg
| exn ->
- Syslog.error "job %s cleanup function exception: %s"
+ Syslog.error "job %s post function exception: %s"
job.job_name (Printexc.to_string exn)
);