From 0f58f891d531defd1fa923dd2da93678c9c6f35b Mon Sep 17 00:00:00 2001 From: "Richard W.M. Jones" Date: Mon, 20 Feb 2012 16:02:06 +0000 Subject: [PATCH] Scheduling of every-jobs. --- daemon/Makefile.am | 2 +- daemon/daemon.ml | 121 ++++++++++++++++++++++++++++++++++++++++++++++------- lib/Makefile.am | 2 +- lib/whenutils.ml | 48 +++++++++++++++++++++ lib/whenutils.mli | 35 ++++++++++++++++ tools/Makefile.am | 4 +- 6 files changed, 193 insertions(+), 19 deletions(-) diff --git a/daemon/Makefile.am b/daemon/Makefile.am index 55ac8a1..6068594 100644 --- a/daemon/Makefile.am +++ b/daemon/Makefile.am @@ -17,7 +17,7 @@ sbin_SCRIPTS = whenjobsd -OCAMLPACKAGES = -package unix,num,camlp4.lib,rpc -I ../lib +OCAMLPACKAGES = -package unix,num,camlp4.lib,calendar,rpc -I ../lib OCAMLCFLAGS = -g -warn-error CDEFLMPSUVYZX $(OCAMLPACKAGES) OCAMLOPTFLAGS = $(OCAMLCFLAGS) diff --git a/daemon/daemon.ml b/daemon/daemon.ml index fba3ae5..80f2cbd 100644 --- a/daemon/daemon.ml +++ b/daemon/daemon.ml @@ -36,6 +36,9 @@ let dependencies = ref StringMap.empty *) let variables : variables ref = ref StringMap.empty +(* Last time that an every job ran. See schedule_next_everyjob below. *) +let last_t = ref (time ()) + (* $HOME/.whenjobs *) let jobsdir = ref "" @@ -43,6 +46,7 @@ let jobsdir = ref "" let debug = ref false let esys = Unixqueue.standard_event_system () +let timer_group = ref None let rec init j d = jobsdir := j; @@ -80,8 +84,7 @@ and proc_set_variable (name, value) = (* Which jobs need to be re-evaluated? *) let jobnames = try StringMap.find name !dependencies with Not_found -> [] in - let jobs = reevaluate_jobs jobnames in - run_jobs jobs + reevaluate_whenjobs jobnames and proc_get_variable name = if !debug then Syslog.notice "remote call: get_variable %s" name; @@ -142,14 +145,18 @@ and reload_file () = ) StringMap.empty js in dependencies := map in - (* Re-evaluate all jobs. *) - let jobs = reevaluate_jobs (StringMap.keys !jobs) in - run_jobs jobs + (* Re-evaluate all when jobs. *) + reevaluate_whenjobs (StringMap.keys !jobs); + + (* Schedule the next every job to run. *) + last_t := time (); + schedule_next_everyjob () -(* Re-evaluate each named job, in a loop until we reach a fixpoint. - * Return the names of all the jobs that need to be run. +(* Re-evaluate each named when-statement job, in a loop until we reach + * a fixpoint. Run those that need to be run. every-statement jobs + * are ignored here. *) -and reevaluate_jobs jobnames = +and reevaluate_whenjobs jobnames = let rec loop set jobnames = let set' = List.fold_left ( @@ -173,16 +180,100 @@ and reevaluate_jobs jobnames = set' in let set = loop StringSet.empty jobnames in - StringSet.elements set + let jobnames = StringSet.elements set in + List.iter run_job + (List.map (fun jobname -> StringMap.find jobname !jobs) jobnames) -and run_jobs jobnames = - let run_job job = - Syslog.notice "running %s" job.job_name; - () (* XXX *) +(* Schedule the next every-statement job to run, if there is one. We + * look at the every jobs, work out the time that each must run at, + * pick the job(s) which must run soonest, and schedule a timer to run + * them. When the timer fires, it runs those jobs, then call this + * function again. 'last_t' is the base time used for scheduling (or + * the time that the file was last reloaded). + *) +and schedule_next_everyjob () = + (* Get only everyjobs. *) + let jobs = StringMap.values !jobs in + let jobs = filter_map ( + function + | { job_cond = Every_job period } as job -> Some (job, period) + | { job_cond = When_job _ } -> None + ) jobs in + + (* Map everyjob to next time it must run. *) + let jobs = List.map ( + fun (job, period) -> + let t' = next_periodexpr !last_t period in + assert (t' > !last_t); (* serious bug in next_periodexpr if false *) + job, t' + ) jobs in + + (* Sort, soonest first. *) + let jobs = List.sort (fun (_,a) (_,b) -> compare a b) jobs in + + if !debug then ( + List.iter ( + fun (job, t) -> + Syslog.notice "%s: next scheduled run at %s" + job.job_name (string_of_time_t t) + ) jobs + ); + + (* Pick the job(s) which run soonest. *) + let rec pick = function + | [] -> 0., [] + | [j, t] -> t, [j] + | (j1, t) :: (j2, t') :: _ when t < t' -> t, [j1] + | (j1, t) :: (((j2, t') :: _) as rest) -> t, (j1 :: snd (pick rest)) in + let t, jobs = pick jobs in + + if t > 0. then ( + last_t := t; + + if jobs <> [] then ( + if !debug then + Syslog.notice "scheduling job(s) %s to run at %s" + (String.concat ", " (List.map (fun { job_name = name } -> name) jobs)) + (string_of_time_t t); + + (* Schedule them to run at time t. *) + let g = new_timer_group () in + let w = Unixqueue.new_wait_id esys in + let t_diff = t -. Unix.time () in + let t_diff = if t_diff < 0. then 0. else t_diff in + Unixqueue.add_resource esys g (Unixqueue.Wait w, t_diff); + let run_jobs _ _ _ = + List.iter run_job jobs; + delete_timer (); + schedule_next_everyjob (); + in + Unixqueue.add_handler esys g run_jobs; + ) + ) - List.iter run_job - (List.map (fun jobname -> StringMap.find jobname !jobs) jobnames) +and string_of_time_t t = + let tm = gmtime t in + sprintf "%04d-%02d-%02d %02d:%02d:%02d UTC" + (1900+tm.tm_year) (1+tm.tm_mon) tm.tm_mday + tm.tm_hour tm.tm_min tm.tm_sec + +and new_timer_group () = + delete_timer (); + let g = Unixqueue.new_group esys in + timer_group := Some g; + g + +and delete_timer () = + match !timer_group with + | None -> () + | Some g -> + Unixqueue.clear esys g; + timer_group := None + +and run_job job = + Syslog.notice "running %s" job.job_name; + () (* XXX *) let main_loop () = Unixqueue.run esys diff --git a/lib/Makefile.am b/lib/Makefile.am index 47285cd..898f204 100644 --- a/lib/Makefile.am +++ b/lib/Makefile.am @@ -18,7 +18,7 @@ libwhenjobsdir = $(libdir)/$(PACKAGE_NAME) libwhenjobs_SCRIPTS = whenlib.cma pa_when.cmo -OCAMLPACKAGES = -package unix,num,camlp4.lib,rpc +OCAMLPACKAGES = -package unix,num,camlp4.lib,calendar,rpc OCAMLCFLAGS = -g -warn-error CDEFLMPSUVYZX $(OCAMLPACKAGES) OCAMLOPTFLAGS = $(OCAMLCFLAGS) diff --git a/lib/whenutils.ml b/lib/whenutils.ml index 8cab30e..362f4f6 100644 --- a/lib/whenutils.ml +++ b/lib/whenutils.ml @@ -19,7 +19,10 @@ open Camlp4.PreCast open Ast +open CalendarLib + open Big_int +open Unix open Printf module StringMap = struct @@ -358,3 +361,48 @@ let job_evaluate job variables = job_prev_variables = variables } in let job = { job with job_private = jobp } in true, job + +let next_periodexpr = + (* Round up 'a' to the next multiple of 'i'. *) + let round_up_float a i = + let r = mod_float a i in + if r = 0. then a +. i else a +. (i -. r) + and round_up a i = + let r = a mod i in + if r = 0 then a + i else a + (i - r) + in + + fun t -> function + | Every_seconds i -> + let i = float_of_int i in + round_up_float t i + + | Every_years i -> + let tm = gmtime t in + + (* Round 'tm' up to the first day of the next year. *) + let year = round_up tm.tm_year i in + let tm = { tm with tm_sec = 0; tm_min = 0; tm_hour = 0; + tm_mday = 1; tm_mon = 0; tm_year = year } in + fst (mktime tm) + + | Every_days i -> + let t = Date.from_unixfloat t in + let t0 = Date.make 1970 1 1 in + + (* Number of whole days since Unix Epoch. *) + let nb_days = Date.Period.safe_nb_days (Date.sub t t0) in + + let nb_days = round_up nb_days i in + let t' = Date.add t0 (Date.Period.day nb_days) in + Date.to_unixfloat t' + + | Every_months i -> + (* Calculate number of whole months since Unix Epoch. *) + let tm = gmtime t in + let months = 12 * (tm.tm_year - 70) + tm.tm_mon in + + let months = round_up months i in + let t0 = Date.make 1970 1 1 in + let t' = Date.add t0 (Date.Period.month months) in + Date.to_unixfloat t' diff --git a/lib/whenutils.mli b/lib/whenutils.mli index c935906..2d5bbfa 100644 --- a/lib/whenutils.mli +++ b/lib/whenutils.mli @@ -166,3 +166,38 @@ val job_evaluate : job -> variables -> bool * job possibly-updated [job] structure. This is a no-op for 'every' jobs. *) + +val next_periodexpr : float -> periodexpr -> float +(** [next_periodexpr t period] returns the earliest event of [period] + strictly after time [t]. + + Visualising periods as repeated events on a timeline, this + returns [t']: + + {v + events: ---+---------+---------+---------+---------+---------+----- + times: t t' + } + + Note that [periodexpr] events are not necessarily regular. + eg. The start of a month is not a fixed number of seconds + after the start of the previous month. 'Epoch' refers + to the Unix Epoch (ie. 1970-01-01 00:00:00 UTC). + + If [period = Every_seconds i] then events are when + [t' mod i == 0] when t' is the number of seconds since + the Epoch. This returns the next t' > t. + + If [period = Every_days i] then events happen at + midnight UTC every [i] days since the Epoch. + This returns the next midnight > t. + + If [period = Every_months i] then events happen at + midnight UTC on the 1st day of the month every [i] months + since the Epoch. This returns midnight on the + 1st day of the next month > t. + + If [period = Every_years i] then events happen at + midnight UTC on the 1st day of the year when + [(y - 1970) mod i == 0]. This returns midnight on the + 1st day of the next year > t. *) diff --git a/tools/Makefile.am b/tools/Makefile.am index 1941e22..506bc1a 100644 --- a/tools/Makefile.am +++ b/tools/Makefile.am @@ -17,9 +17,9 @@ bin_SCRIPTS = whenjobs -OCAMLPACKAGES = -package unix,num,camlp4.lib,rpc +OCAMLPACKAGES = -package unix,num,camlp4.lib,calendar,rpc -I ../lib -OCAMLCFLAGS = -g -warn-error CDEFLMPSUVYZX $(OCAMLPACKAGES) -I ../lib +OCAMLCFLAGS = -g -warn-error CDEFLMPSUVYZX $(OCAMLPACKAGES) OCAMLOPTFLAGS = $(OCAMLCFLAGS) # These should be in alphabetical order. -- 1.8.3.1