Scheduling of every-jobs.
authorRichard W.M. Jones <rjones@redhat.com>
Mon, 20 Feb 2012 16:02:06 +0000 (16:02 +0000)
committerRichard W.M. Jones <rjones@redhat.com>
Mon, 20 Feb 2012 17:00:35 +0000 (17:00 +0000)
daemon/Makefile.am
daemon/daemon.ml
lib/Makefile.am
lib/whenutils.ml
lib/whenutils.mli
tools/Makefile.am

index 55ac8a1..6068594 100644 (file)
@@ -17,7 +17,7 @@
 
 sbin_SCRIPTS = whenjobsd
 
-OCAMLPACKAGES = -package unix,num,camlp4.lib,rpc -I ../lib
+OCAMLPACKAGES = -package unix,num,camlp4.lib,calendar,rpc -I ../lib
 
 OCAMLCFLAGS = -g -warn-error CDEFLMPSUVYZX $(OCAMLPACKAGES)
 OCAMLOPTFLAGS = $(OCAMLCFLAGS)
index fba3ae5..80f2cbd 100644 (file)
@@ -36,6 +36,9 @@ let dependencies = ref StringMap.empty
  *)
 let variables : variables ref = ref StringMap.empty
 
+(* Last time that an every job ran.  See schedule_next_everyjob below. *)
+let last_t = ref (time ())
+
 (* $HOME/.whenjobs *)
 let jobsdir = ref ""
 
@@ -43,6 +46,7 @@ let jobsdir = ref ""
 let debug = ref false
 
 let esys = Unixqueue.standard_event_system ()
+let timer_group = ref None
 
 let rec init j d =
   jobsdir := j;
@@ -80,8 +84,7 @@ and proc_set_variable (name, value) =
 
   (* Which jobs need to be re-evaluated? *)
   let jobnames = try StringMap.find name !dependencies with Not_found -> [] in
-  let jobs = reevaluate_jobs jobnames in
-  run_jobs jobs
+  reevaluate_whenjobs jobnames
 
 and proc_get_variable name =
   if !debug then Syslog.notice "remote call: get_variable %s" name;
@@ -142,14 +145,18 @@ and reload_file () =
     ) StringMap.empty js in
     dependencies := map in
 
-  (* Re-evaluate all jobs. *)
-  let jobs = reevaluate_jobs (StringMap.keys !jobs) in
-  run_jobs jobs
+  (* Re-evaluate all when jobs. *)
+  reevaluate_whenjobs (StringMap.keys !jobs);
+
+  (* Schedule the next every job to run. *)
+  last_t := time ();
+  schedule_next_everyjob ()
 
-(* Re-evaluate each named job, in a loop until we reach a fixpoint.
- * Return the names of all the jobs that need to be run.
+(* Re-evaluate each named when-statement job, in a loop until we reach
+ * a fixpoint.  Run those that need to be run.  every-statement jobs
+ * are ignored here.
  *)
-and reevaluate_jobs jobnames =
+and reevaluate_whenjobs jobnames =
   let rec loop set jobnames =
     let set' =
       List.fold_left (
@@ -173,16 +180,100 @@ and reevaluate_jobs jobnames =
       set'
   in
   let set = loop StringSet.empty jobnames in
-  StringSet.elements set
+  let jobnames = StringSet.elements set in
+  List.iter run_job
+    (List.map (fun jobname -> StringMap.find jobname !jobs) jobnames)
 
-and run_jobs jobnames =
-  let run_job job =
-    Syslog.notice "running %s" job.job_name;
-    () (* XXX *)
+(* Schedule the next every-statement job to run, if there is one.  We
+ * look at the every jobs, work out the time that each must run at,
+ * pick the job(s) which must run soonest, and schedule a timer to run
+ * them.  When the timer fires, it runs those jobs, then call this
+ * function again.  'last_t' is the base time used for scheduling (or
+ * the time that the file was last reloaded).
+ *)
+and schedule_next_everyjob () =
+  (* Get only everyjobs. *)
+  let jobs = StringMap.values !jobs in
+  let jobs = filter_map (
+    function
+    | { job_cond = Every_job period } as job -> Some (job, period)
+    | { job_cond = When_job _ } -> None
+  ) jobs in
+
+  (* Map everyjob to next time it must run. *)
+  let jobs = List.map (
+    fun (job, period) ->
+      let t' = next_periodexpr !last_t period in
+      assert (t' > !last_t); (* serious bug in next_periodexpr if false *)
+      job, t'
+  ) jobs in
+
+  (* Sort, soonest first. *)
+  let jobs = List.sort (fun (_,a) (_,b) -> compare a b) jobs in
+
+  if !debug then (
+    List.iter (
+      fun (job, t) ->
+        Syslog.notice "%s: next scheduled run at %s"
+          job.job_name (string_of_time_t t)
+    ) jobs
+  );
+
+  (* Pick the job(s) which run soonest. *)
+  let rec pick = function
+    | [] -> 0., []
+    | [j, t] -> t, [j]
+    | (j1, t) :: (j2, t') :: _ when t < t' -> t, [j1]
+    | (j1, t) :: (((j2, t') :: _) as rest) -> t, (j1 :: snd (pick rest))
   in
+  let t, jobs = pick jobs in
+
+  if t > 0. then (
+    last_t := t;
+
+    if jobs <> [] then (
+      if !debug then
+        Syslog.notice "scheduling job(s) %s to run at %s"
+          (String.concat ", " (List.map (fun { job_name = name } -> name) jobs))
+          (string_of_time_t t);
+
+      (* Schedule them to run at time t. *)
+      let g = new_timer_group () in
+      let w = Unixqueue.new_wait_id esys in
+      let t_diff = t -. Unix.time () in
+      let t_diff = if t_diff < 0. then 0. else t_diff in
+      Unixqueue.add_resource esys g (Unixqueue.Wait w, t_diff);
+      let run_jobs _ _ _ =
+        List.iter run_job jobs;
+        delete_timer ();
+        schedule_next_everyjob ();
+      in
+      Unixqueue.add_handler esys g run_jobs;
+    )
+  )
 
-  List.iter run_job
-    (List.map (fun jobname -> StringMap.find jobname !jobs) jobnames)
+and string_of_time_t t =
+  let tm = gmtime t in
+  sprintf "%04d-%02d-%02d %02d:%02d:%02d UTC"
+    (1900+tm.tm_year) (1+tm.tm_mon) tm.tm_mday
+    tm.tm_hour tm.tm_min tm.tm_sec
+
+and new_timer_group () =
+  delete_timer ();
+  let g = Unixqueue.new_group esys in
+  timer_group := Some g;
+  g
+
+and delete_timer () =
+  match !timer_group with
+  | None -> ()
+  | Some g ->
+    Unixqueue.clear esys g;
+    timer_group := None
+
+and run_job job =
+  Syslog.notice "running %s" job.job_name;
+  () (* XXX *)
 
 let main_loop () =
   Unixqueue.run esys
index 47285cd..898f204 100644 (file)
@@ -18,7 +18,7 @@
 libwhenjobsdir = $(libdir)/$(PACKAGE_NAME)
 libwhenjobs_SCRIPTS = whenlib.cma pa_when.cmo
 
-OCAMLPACKAGES = -package unix,num,camlp4.lib,rpc
+OCAMLPACKAGES = -package unix,num,camlp4.lib,calendar,rpc
 
 OCAMLCFLAGS = -g -warn-error CDEFLMPSUVYZX $(OCAMLPACKAGES)
 OCAMLOPTFLAGS = $(OCAMLCFLAGS)
index 8cab30e..362f4f6 100644 (file)
 open Camlp4.PreCast
 open Ast
 
+open CalendarLib
+
 open Big_int
+open Unix
 open Printf
 
 module StringMap = struct
@@ -358,3 +361,48 @@ let job_evaluate job variables =
                    job_prev_variables = variables } in
       let job = { job with job_private = jobp } in
       true, job
+
+let next_periodexpr =
+  (* Round up 'a' to the next multiple of 'i'. *)
+  let round_up_float a i =
+    let r = mod_float a i in
+    if r = 0. then a +. i else a +. (i -. r)
+  and round_up a i =
+    let r = a mod i in
+    if r = 0 then a + i else a + (i - r)
+  in
+
+  fun t -> function
+  | Every_seconds i ->
+    let i = float_of_int i in
+    round_up_float t i
+
+  | Every_years i ->
+    let tm = gmtime t in
+
+    (* Round 'tm' up to the first day of the next year. *)
+    let year = round_up tm.tm_year i in
+    let tm = { tm with tm_sec = 0; tm_min = 0; tm_hour = 0;
+                       tm_mday = 1; tm_mon = 0; tm_year = year } in
+    fst (mktime tm)
+
+  | Every_days i ->
+    let t = Date.from_unixfloat t in
+    let t0 = Date.make 1970 1 1 in
+
+    (* Number of whole days since Unix Epoch. *)
+    let nb_days = Date.Period.safe_nb_days (Date.sub t t0) in
+
+    let nb_days = round_up nb_days i in
+    let t' = Date.add t0 (Date.Period.day nb_days) in
+    Date.to_unixfloat t'
+
+  | Every_months i ->
+    (* Calculate number of whole months since Unix Epoch. *)
+    let tm = gmtime t in
+    let months = 12 * (tm.tm_year - 70) + tm.tm_mon in
+
+    let months = round_up months i in
+    let t0 = Date.make 1970 1 1 in
+    let t' = Date.add t0 (Date.Period.month months) in
+    Date.to_unixfloat t'
index c935906..2d5bbfa 100644 (file)
@@ -166,3 +166,38 @@ val job_evaluate : job -> variables -> bool * job
     possibly-updated [job] structure.
 
     This is a no-op for 'every' jobs. *)
+
+val next_periodexpr : float -> periodexpr -> float
+(** [next_periodexpr t period] returns the earliest event of [period]
+    strictly after time [t].
+
+    Visualising periods as repeated events on a timeline, this
+    returns [t']:
+
+    {v
+    events:  ---+---------+---------+---------+---------+---------+-----
+    times:          t     t'
+    }
+
+    Note that [periodexpr] events are not necessarily regular.
+    eg. The start of a month is not a fixed number of seconds
+    after the start of the previous month.  'Epoch' refers
+    to the Unix Epoch (ie. 1970-01-01 00:00:00 UTC).
+
+    If [period = Every_seconds i] then events are when
+    [t' mod i == 0] when t' is the number of seconds since
+    the Epoch.  This returns the next t' > t.
+
+    If [period = Every_days i] then events happen at
+    midnight UTC every [i] days since the Epoch.
+    This returns the next midnight > t.
+
+    If [period = Every_months i] then events happen at
+    midnight UTC on the 1st day of the month every [i] months
+    since the Epoch.  This returns midnight on the
+    1st day of the next month > t.
+
+    If [period = Every_years i] then events happen at
+    midnight UTC on the 1st day of the year when
+    [(y - 1970) mod i == 0].  This returns midnight on the
+    1st day of the next year > t. *)
index 1941e22..506bc1a 100644 (file)
@@ -17,9 +17,9 @@
 
 bin_SCRIPTS = whenjobs
 
-OCAMLPACKAGES = -package unix,num,camlp4.lib,rpc
+OCAMLPACKAGES = -package unix,num,camlp4.lib,calendar,rpc -I ../lib
 
-OCAMLCFLAGS = -g -warn-error CDEFLMPSUVYZX $(OCAMLPACKAGES) -I ../lib
+OCAMLCFLAGS = -g -warn-error CDEFLMPSUVYZX $(OCAMLPACKAGES)
 OCAMLOPTFLAGS = $(OCAMLCFLAGS)
 
 # These should be in alphabetical order.