Implement progress messages in the daemon and library.
authorRichard Jones <rjones@redhat.com>
Sat, 28 Aug 2010 09:33:24 +0000 (10:33 +0100)
committerRichard Jones <rjones@redhat.com>
Tue, 31 Aug 2010 18:27:34 +0000 (19:27 +0100)
This implements progress notification messages in the daemon, and
adds a callback in the library to handle them.

No calls are changed so far, so in fact no progress messages can
be generated by this commit.

For more details, see:
https://www.redhat.com/archives/libguestfs/2010-July/msg00003.html
https://www.redhat.com/archives/libguestfs/2010-July/msg00024.html

daemon/daemon.h
daemon/proto.c
src/generator.ml
src/guestfs-internal.h
src/guestfs.c
src/guestfs.h
src/guestfs.pod
src/proto.c

index 4c1b9b0..03e0d37 100644 (file)
@@ -21,6 +21,7 @@
 
 #include <stdio.h>
 #include <stdarg.h>
 
 #include <stdio.h>
 #include <stdarg.h>
+#include <stdint.h>
 #include <errno.h>
 #include <unistd.h>
 
 #include <errno.h>
 #include <unistd.h>
 
@@ -161,6 +162,12 @@ extern int send_file_end (int cancel);
 /* only call this if there is a FileOut parameter */
 extern void reply (xdrproc_t xdrp, char *ret);
 
 /* only call this if there is a FileOut parameter */
 extern void reply (xdrproc_t xdrp, char *ret);
 
+/* Notify progress to caller.  This function is self-rate-limiting so
+ * you can call it as often as necessary.  Actions which call this
+ * should add 'Progress' note in generator.
+ */
+extern void notify_progress (uint64_t position, uint64_t total);
+
 /* Helper for functions that need a root filesystem mounted.
  * NB. Cannot be used for FileIn functions.
  */
 /* Helper for functions that need a root filesystem mounted.
  * NB. Cannot be used for FileIn functions.
  */
index 628e86c..02ee692 100644 (file)
@@ -26,6 +26,7 @@
 #include <errno.h>
 #include <sys/param.h>         /* defines MIN */
 #include <sys/select.h>
 #include <errno.h>
 #include <sys/param.h>         /* defines MIN */
 #include <sys/select.h>
+#include <sys/time.h>
 #include <rpc/types.h>
 #include <rpc/xdr.h>
 
 #include <rpc/types.h>
 #include <rpc/xdr.h>
 
 int proc_nr;
 int serial;
 
 int proc_nr;
 int serial;
 
+/* Time at which we received the current request. */
+static struct timeval start_t;
+
+/* Time at which the last progress notification was sent. */
+static struct timeval last_progress_t;
+
+/* Counts the number of progress notifications sent during this call. */
+static int count_progress;
+
 /* The daemon communications socket. */
 static int sock;
 
 /* The daemon communications socket. */
 static int sock;
 
@@ -54,8 +64,6 @@ main_loop (int _sock)
   char lenbuf[4];
   uint32_t len;
   struct guestfs_message_header hdr;
   char lenbuf[4];
   uint32_t len;
   struct guestfs_message_header hdr;
-  struct timeval start_t, end_t;
-  int64_t start_us, end_us, elapsed_us;
 
   sock = _sock;
 
 
   sock = _sock;
 
@@ -112,9 +120,9 @@ main_loop (int _sock)
     }
 #endif
 
     }
 #endif
 
-    /* In verbose mode, display the time taken to run each command. */
-    if (verbose)
-      gettimeofday (&start_t, NULL);
+    gettimeofday (&start_t, NULL);
+    last_progress_t = start_t;
+    count_progress = 0;
 
     /* Decode the message header. */
     xdrmem_create (&xdr, buf, len, XDR_DECODE);
 
     /* Decode the message header. */
     xdrmem_create (&xdr, buf, len, XDR_DECODE);
@@ -160,11 +168,14 @@ main_loop (int _sock)
 
     /* In verbose mode, display the time taken to run each command. */
     if (verbose) {
 
     /* In verbose mode, display the time taken to run each command. */
     if (verbose) {
+      struct timeval end_t;
       gettimeofday (&end_t, NULL);
 
       gettimeofday (&end_t, NULL);
 
+      int64_t start_us, end_us, elapsed_us;
       start_us = (int64_t) start_t.tv_sec * 1000000 + start_t.tv_usec;
       end_us = (int64_t) end_t.tv_sec * 1000000 + end_t.tv_usec;
       elapsed_us = end_us - start_us;
       start_us = (int64_t) start_t.tv_sec * 1000000 + start_t.tv_usec;
       end_us = (int64_t) end_t.tv_sec * 1000000 + end_t.tv_usec;
       elapsed_us = end_us - start_us;
+
       fprintf (stderr, "proc %d (%s) took %d.%02d seconds\n",
                proc_nr,
                proc_nr >= 0 && proc_nr < GUESTFS_PROC_NR_PROCS
       fprintf (stderr, "proc %d (%s) took %d.%02d seconds\n",
                proc_nr,
                proc_nr >= 0 && proc_nr < GUESTFS_PROC_NR_PROCS
@@ -533,3 +544,78 @@ send_chunk (const guestfs_chunk *chunk)
 
   return err;
 }
 
   return err;
 }
+
+/* Initial delay before sending notification messages, and
+ * the period at which we send them thereafter.  These times
+ * are in microseconds.
+ */
+#define NOTIFICATION_INITIAL_DELAY 2000000
+#define NOTIFICATION_PERIOD         333333
+
+void
+notify_progress (uint64_t position, uint64_t total)
+{
+  struct timeval now_t;
+  gettimeofday (&now_t, NULL);
+
+  /* Always send a notification at 100%.  This simplifies callers by
+   * allowing them to 'finish' the progress bar at 100% without
+   * needing special code.
+   */
+  if (count_progress > 0 && position == total)
+    goto send;
+
+  /* Calculate time in microseconds since the last progress message
+   * was sent out (or since the start of the call).
+   */
+  int64_t last_us, now_us, elapsed_us;
+  last_us =
+    (int64_t) last_progress_t.tv_sec * 1000000 + last_progress_t.tv_usec;
+  now_us = (int64_t) now_t.tv_sec * 1000000 + now_t.tv_usec;
+  elapsed_us = now_us - last_us;
+
+  /* Rate limit. */
+  if ((count_progress == 0 && elapsed_us < NOTIFICATION_INITIAL_DELAY) ||
+      (count_progress > 0 && elapsed_us < NOTIFICATION_PERIOD))
+    return;
+
+ send:
+  /* We're going to send a message now ... */
+  count_progress++;
+  last_progress_t = now_t;
+
+  /* Send the header word. */
+  XDR xdr;
+  char buf[128];
+  uint32_t i = GUESTFS_PROGRESS_FLAG;
+  size_t len;
+  xdrmem_create (&xdr, buf, 4, XDR_ENCODE);
+  xdr_u_int (&xdr, &i);
+  xdr_destroy (&xdr);
+
+  if (xwrite (sock, buf, 4) == -1) {
+    fprintf (stderr, "xwrite failed\n");
+    exit (EXIT_FAILURE);
+  }
+
+  guestfs_progress message = {
+    .proc = proc_nr,
+    .serial = serial,
+    .position = position,
+    .total = total,
+  };
+
+  xdrmem_create (&xdr, buf, sizeof buf, XDR_ENCODE);
+  if (!xdr_guestfs_progress (&xdr, &message)) {
+    fprintf (stderr, "xdr_guestfs_progress: failed to encode message\n");
+    xdr_destroy (&xdr);
+    return;
+  }
+  len = xdr_getpos (&xdr);
+  xdr_destroy (&xdr);
+
+  if (xwrite (sock, buf, len) == -1) {
+    fprintf (stderr, "xwrite failed\n");
+    exit (EXIT_FAILURE);
+  }
+}
index c25c871..bbf313a 100755 (executable)
@@ -6327,11 +6327,12 @@ and generate_xdr () =
  */
 
 const GUESTFS_PROGRAM = 0x2000F5F5;
  */
 
 const GUESTFS_PROGRAM = 0x2000F5F5;
-const GUESTFS_PROTOCOL_VERSION = 1;
+const GUESTFS_PROTOCOL_VERSION = 2;
 
 /* These constants must be larger than any possible message length. */
 const GUESTFS_LAUNCH_FLAG = 0xf5f55ff5;
 const GUESTFS_CANCEL_FLAG = 0xffffeeee;
 
 /* These constants must be larger than any possible message length. */
 const GUESTFS_LAUNCH_FLAG = 0xf5f55ff5;
 const GUESTFS_CANCEL_FLAG = 0xffffeeee;
+const GUESTFS_PROGRESS_FLAG = 0xffff5555;
 
 enum guestfs_message_direction {
   GUESTFS_DIRECTION_CALL = 0,        /* client -> daemon */
 
 enum guestfs_message_direction {
   GUESTFS_DIRECTION_CALL = 0,        /* client -> daemon */
@@ -6370,6 +6371,23 @@ struct guestfs_chunk {
   /* data size is 0 bytes if the transfer has finished successfully */
   opaque data<GUESTFS_MAX_CHUNK_SIZE>;
 };
   /* data size is 0 bytes if the transfer has finished successfully */
   opaque data<GUESTFS_MAX_CHUNK_SIZE>;
 };
+
+/* Progress notifications.  Daemon self-limits these messages to
+ * at most one per second.  The daemon can send these messages
+ * at any time, and the caller should discard unexpected messages.
+ * 'position' and 'total' have undefined units; however they may
+ * have meaning for some calls.
+ *
+ * NB. guestfs___recv_from_daemon assumes the XDR-encoded
+ * structure is 24 bytes long.
+ */
+struct guestfs_progress {
+  guestfs_procedure proc;            /* @0:  GUESTFS_PROC_x */
+  unsigned serial;                   /* @4:  message serial number */
+  unsigned hyper position;           /* @8:  0 <= position <= total */
+  unsigned hyper total;              /* @16: total size of operation */
+                                     /* @24: size of structure */
+};
 "
 
 (* Generate the guestfs-structs.h file. *)
 "
 
 (* Generate the guestfs-structs.h file. *)
@@ -6869,6 +6887,7 @@ and generate_linker_script () =
     "guestfs_set_launch_done_callback";
     "guestfs_set_log_message_callback";
     "guestfs_set_out_of_memory_handler";
     "guestfs_set_launch_done_callback";
     "guestfs_set_log_message_callback";
     "guestfs_set_out_of_memory_handler";
+    "guestfs_set_progress_callback";
     "guestfs_set_subprocess_quit_callback";
 
     (* Unofficial parts of the API: the bindings code use these
     "guestfs_set_subprocess_quit_callback";
 
     (* Unofficial parts of the API: the bindings code use these
index e37c9c2..32a6c2a 100644 (file)
@@ -122,6 +122,8 @@ struct guestfs_h
   void *                     launch_done_cb_data;
   guestfs_close_cb           close_cb;
   void *                     close_cb_data;
   void *                     launch_done_cb_data;
   guestfs_close_cb           close_cb;
   void *                     close_cb_data;
+  guestfs_progress_cb        progress_cb;
+  void *                     progress_cb_data;
 
   int msg_next_serial;
 
 
   int msg_next_serial;
 
index eaacd39..206347e 100644 (file)
@@ -645,3 +645,11 @@ guestfs_set_close_callback (guestfs_h *g,
   g->close_cb = cb;
   g->close_cb_data = opaque;
 }
   g->close_cb = cb;
   g->close_cb_data = opaque;
 }
+
+void
+guestfs_set_progress_callback (guestfs_h *g,
+                               guestfs_progress_cb cb, void *opaque)
+{
+  g->progress_cb = cb;
+  g->progress_cb_data = opaque;
+}
index 3cff484..ec88f22 100644 (file)
@@ -34,6 +34,8 @@
 extern "C" {
 #endif
 
 extern "C" {
 #endif
 
+#include <stdint.h>
+
 typedef struct guestfs_h guestfs_h;
 
 /*--- Connection management ---*/
 typedef struct guestfs_h guestfs_h;
 
 /*--- Connection management ---*/
@@ -57,14 +59,15 @@ typedef void (*guestfs_log_message_cb) (guestfs_h *g, void *data, char *buf, int
 typedef void (*guestfs_subprocess_quit_cb) (guestfs_h *g, void *data);
 typedef void (*guestfs_launch_done_cb) (guestfs_h *g, void *data);
 typedef void (*guestfs_close_cb) (guestfs_h *g, void *data);
 typedef void (*guestfs_subprocess_quit_cb) (guestfs_h *g, void *data);
 typedef void (*guestfs_launch_done_cb) (guestfs_h *g, void *data);
 typedef void (*guestfs_close_cb) (guestfs_h *g, void *data);
+typedef void (*guestfs_progress_cb) (guestfs_h *g, void *data, int proc_nr, int serial, uint64_t position, uint64_t total);
 
 extern void guestfs_set_log_message_callback (guestfs_h *g, guestfs_log_message_cb cb, void *opaque);
 extern void guestfs_set_subprocess_quit_callback (guestfs_h *g, guestfs_subprocess_quit_cb cb, void *opaque);
 extern void guestfs_set_launch_done_callback (guestfs_h *g, guestfs_launch_done_cb cb, void *opaque);
 extern void guestfs_set_close_callback (guestfs_h *g, guestfs_close_cb cb, void *opaque);
 
 extern void guestfs_set_log_message_callback (guestfs_h *g, guestfs_log_message_cb cb, void *opaque);
 extern void guestfs_set_subprocess_quit_callback (guestfs_h *g, guestfs_subprocess_quit_cb cb, void *opaque);
 extern void guestfs_set_launch_done_callback (guestfs_h *g, guestfs_launch_done_cb cb, void *opaque);
 extern void guestfs_set_close_callback (guestfs_h *g, guestfs_close_cb cb, void *opaque);
+extern void guestfs_set_progress_callback (guestfs_h *g, guestfs_progress_cb cb, void *opaque);
 
 /*--- Structures and actions ---*/
 
 /*--- Structures and actions ---*/
-#include <stdint.h>
 #include <rpc/types.h>
 #include <rpc/xdr.h>
 #include <guestfs-structs.h>
 #include <rpc/types.h>
 #include <rpc/xdr.h>
 #include <guestfs-structs.h>
index 590c768..6a956ed 100644 (file)
@@ -1186,6 +1186,56 @@ languages (eg. if your HLL interpreter has already been cleaned
 up by the time this is called, and if your callback then jumps
 into some HLL function).
 
 up by the time this is called, and if your callback then jumps
 into some HLL function).
 
+=head2 guestfs_set_progress_callback
+
+ typedef void (*guestfs_progress_cb) (guestfs_h *g, void *opaque,
+                                      int proc_nr, int serial,
+                                      uint64_t position, uint64_t total);
+ void guestfs_set_progress_callback (guestfs_h *g,
+                                     guestfs_progress_cb cb,
+                                     void *opaque);
+
+Some long-running operations can generate progress messages.  If
+this callback is registered, then it will be called each time a
+progress message is generated (usually two seconds after the
+operation started, and three times per second thereafter until
+it completes, although the frequency may change in future versions).
+
+The callback receives two numbers: C<position> and C<total>.
+The units of C<total> are not defined, although for some
+operations C<total> may relate in some way to the amount of
+data to be transferred (eg. in bytes or megabytes), and
+C<position> may be the portion which has been transferred.
+
+The only defined and stable parts of the API are:
+
+=over 4
+
+=item *
+
+The callback can display to the user some type of progress bar or
+indicator which shows the ratio of C<position>:C<total>.
+
+=item *
+
+0 E<lt>= C<position> E<lt>= C<total>
+
+=item *
+
+If any progress notification is sent during a call, then a final
+progress notification is always sent when C<position> = C<total>.
+
+This is to simplify caller code, so callers can easily set the
+progress indicator to "100%" at the end of the operation, without
+requiring special code to detect this case.
+
+=back
+
+The callback also receives the procedure number and serial number of
+the call.  These are only useful for debugging protocol issues, and
+the callback can normally ignore them.  The callback may want to
+print these numbers in error messages or debugging messages.
+
 =head1 BLOCK DEVICE NAMING
 
 In the kernel there is now quite a profusion of schemata for naming
 =head1 BLOCK DEVICE NAMING
 
 In the kernel there is now quite a profusion of schemata for naming
index ad173c6..5d924e8 100644 (file)
@@ -373,7 +373,15 @@ guestfs___send_to_daemon (guestfs_h *g, const void *v_buf, size_t n)
  *
  * It also checks for EOF (qemu died) and passes that up through the
  * child_cleanup function above.
  *
  * It also checks for EOF (qemu died) and passes that up through the
  * child_cleanup function above.
+ *
+ * Progress notifications are handled transparently by this function.
+ * If the callback exists, it is called.  The caller of this function
+ * will not see GUESTFS_PROGRESS_FLAG.
  */
  */
+
+/* Size of guestfs_progress message on the wire. */
+#define PROGRESS_MESSAGE_SIZE 24
+
 int
 guestfs___recv_from_daemon (guestfs_h *g, uint32_t *size_rtn, void **buf_rtn)
 {
 int
 guestfs___recv_from_daemon (guestfs_h *g, uint32_t *size_rtn, void **buf_rtn)
 {
@@ -400,7 +408,13 @@ guestfs___recv_from_daemon (guestfs_h *g, uint32_t *size_rtn, void **buf_rtn)
    */
   ssize_t nr = -4;
 
    */
   ssize_t nr = -4;
 
-  while (nr < (ssize_t) *size_rtn) {
+  for (;;) {
+    ssize_t message_size =
+      *size_rtn != GUESTFS_PROGRESS_FLAG ?
+      *size_rtn : PROGRESS_MESSAGE_SIZE;
+    if (nr >= message_size)
+      break;
+
     rset2 = rset;
     int r = select (max_fd+1, &rset2, NULL, NULL, NULL);
     if (r == -1) {
     rset2 = rset;
     int r = select (max_fd+1, &rset2, NULL, NULL, NULL);
     if (r == -1) {
@@ -450,6 +464,11 @@ guestfs___recv_from_daemon (guestfs_h *g, uint32_t *size_rtn, void **buf_rtn)
         xdr_uint32_t (&xdr, size_rtn);
         xdr_destroy (&xdr);
 
         xdr_uint32_t (&xdr, size_rtn);
         xdr_destroy (&xdr);
 
+        /* *size_rtn changed, recalculate message_size */
+        message_size =
+          *size_rtn != GUESTFS_PROGRESS_FLAG ?
+          *size_rtn : PROGRESS_MESSAGE_SIZE;
+
         if (*size_rtn == GUESTFS_LAUNCH_FLAG) {
           if (g->state != LAUNCHING)
             error (g, _("received magic signature from guestfsd, but in state %d"),
         if (*size_rtn == GUESTFS_LAUNCH_FLAG) {
           if (g->state != LAUNCHING)
             error (g, _("received magic signature from guestfsd, but in state %d"),
@@ -463,6 +482,8 @@ guestfs___recv_from_daemon (guestfs_h *g, uint32_t *size_rtn, void **buf_rtn)
         }
         else if (*size_rtn == GUESTFS_CANCEL_FLAG)
           return 0;
         }
         else if (*size_rtn == GUESTFS_CANCEL_FLAG)
           return 0;
+        else if (*size_rtn == GUESTFS_PROGRESS_FLAG)
+          /*FALLTHROUGH*/;
         /* If this happens, it's pretty bad and we've probably lost
          * synchronization.
          */
         /* If this happens, it's pretty bad and we've probably lost
          * synchronization.
          */
@@ -473,11 +494,11 @@ guestfs___recv_from_daemon (guestfs_h *g, uint32_t *size_rtn, void **buf_rtn)
         }
 
         /* Allocate the complete buffer, size now known. */
         }
 
         /* Allocate the complete buffer, size now known. */
-        *buf_rtn = safe_malloc (g, *size_rtn);
+        *buf_rtn = safe_malloc (g, message_size);
         /*FALLTHROUGH*/
       }
 
         /*FALLTHROUGH*/
       }
 
-      size_t sizetoread = *size_rtn - nr;
+      size_t sizetoread = message_size - nr;
       if (sizetoread > BUFSIZ) sizetoread = BUFSIZ;
 
       r = read (g->sock, (char *) (*buf_rtn) + nr, sizetoread);
       if (sizetoread > BUFSIZ) sizetoread = BUFSIZ;
 
       r = read (g->sock, (char *) (*buf_rtn) + nr, sizetoread);
@@ -524,6 +545,26 @@ guestfs___recv_from_daemon (guestfs_h *g, uint32_t *size_rtn, void **buf_rtn)
   }
 #endif
 
   }
 #endif
 
+  if (*size_rtn == GUESTFS_PROGRESS_FLAG) {
+    if (g->state == BUSY && g->progress_cb) {
+      guestfs_progress message;
+      XDR xdr;
+      xdrmem_create (&xdr, *buf_rtn, PROGRESS_MESSAGE_SIZE, XDR_DECODE);
+      xdr_guestfs_progress (&xdr, &message);
+      xdr_destroy (&xdr);
+
+      g->progress_cb (g, g->progress_cb_data,
+                      message.proc, message.serial,
+                      message.position, message.total);
+    }
+
+    free (*buf_rtn);
+    *buf_rtn = NULL;
+
+    /* Process next message. */
+    return guestfs___recv_from_daemon (g, size_rtn, buf_rtn);
+  }
+
   return 0;
 }
 
   return 0;
 }