#include "guestfs-internal-actions.h"
#include "guestfs_protocol.h"
+/* Size of guestfs_progress message on the wire. */
+#define PROGRESS_MESSAGE_SIZE 24
+
/* This is the code used to send and receive RPC messages and (for
* certain types of message) to perform file transfers. This code is
* driven from the generated actions (src/actions.c). There
return 0;
}
+/* Read 'n' bytes, setting the socket to blocking temporarily so
+ * that we really read the number of bytes requested.
+ * Returns: 0 == EOF while reading
+ * -1 == error, error() function has been called
+ * n == read 'n' bytes in full
+ */
+static ssize_t
+really_read_from_socket (guestfs_h *g, int sock, char *buf, size_t n)
+{
+ long flags;
+ ssize_t r;
+ size_t got;
+
+ /* Set socket to blocking. */
+ flags = fcntl (sock, F_GETFL);
+ if (flags == -1) {
+ perrorf (g, "fcntl");
+ return -1;
+ }
+ if (fcntl (sock, F_SETFL, flags & ~O_NONBLOCK) == -1) {
+ perrorf (g, "fcntl");
+ return -1;
+ }
+
+ got = 0;
+ while (got < n) {
+ r = read (sock, &buf[got], n-got);
+ if (r == -1) {
+ perrorf (g, "read");
+ return -1;
+ }
+ if (r == 0)
+ return 0; /* EOF */
+ got += r;
+ }
+
+ /* Restore original socket flags. */
+ if (fcntl (sock, F_SETFL, flags) == -1) {
+ perrorf (g, "fcntl");
+ return -1;
+ }
+
+ return (ssize_t) got;
+}
+
static int
check_for_daemon_cancellation_or_eof (guestfs_h *g, int fd)
{
char buf[4];
- int n;
+ ssize_t n;
uint32_t flag;
XDR xdr;
"check_for_daemon_cancellation_or_eof: %p g->state = %d, fd = %d\n",
g, g->state, fd);
- n = read (fd, buf, 4);
+ n = really_read_from_socket (g, fd, buf, 4);
+ if (n == -1)
+ return -1;
if (n == 0) {
/* Hopefully this indicates the qemu child process has died. */
child_cleanup (g);
return -1;
}
- if (n == -1) {
- if (errno == EINTR || errno == EAGAIN)
- return 0;
-
- perrorf (g, "read");
- return -1;
- }
-
xdrmem_create (&xdr, buf, 4, XDR_DECODE);
xdr_uint32_t (&xdr, &flag);
xdr_destroy (&xdr);
+ /* Read and process progress messages that happen during FileIn. */
+ if (flag == GUESTFS_PROGRESS_FLAG) {
+ char buf[PROGRESS_MESSAGE_SIZE];
+
+ n = really_read_from_socket (g, fd, buf, PROGRESS_MESSAGE_SIZE);
+ if (n == -1)
+ return -1;
+ if (n == 0) {
+ child_cleanup (g);
+ return -1;
+ }
+
+ if (g->state == BUSY && g->progress_cb) {
+ guestfs_progress message;
+
+ xdrmem_create (&xdr, buf, PROGRESS_MESSAGE_SIZE, XDR_DECODE);
+ xdr_guestfs_progress (&xdr, &message);
+ xdr_destroy (&xdr);
+
+ g->progress_cb (g, g->progress_cb_data,
+ message.proc, message.serial,
+ message.position, message.total);
+ }
+
+ return 0;
+ }
+
if (flag != GUESTFS_CANCEL_FLAG) {
error (g, _("check_for_daemon_cancellation_or_eof: read 0x%x from daemon, expected 0x%x\n"),
flag, GUESTFS_CANCEL_FLAG);
* child_cleanup function above.
*/
int
-guestfs__send_to_daemon (guestfs_h *g, const void *v_buf, size_t n)
+guestfs___send_to_daemon (guestfs_h *g, const void *v_buf, size_t n)
{
const char *buf = v_buf;
fd_set rset, rset2;
*
* It also checks for EOF (qemu died) and passes that up through the
* child_cleanup function above.
+ *
+ * Progress notifications are handled transparently by this function.
+ * If the callback exists, it is called. The caller of this function
+ * will not see GUESTFS_PROGRESS_FLAG.
*/
+
int
-guestfs__recv_from_daemon (guestfs_h *g, uint32_t *size_rtn, void **buf_rtn)
+guestfs___recv_from_daemon (guestfs_h *g, uint32_t *size_rtn, void **buf_rtn)
{
fd_set rset, rset2;
*/
ssize_t nr = -4;
- while (nr < (ssize_t) *size_rtn) {
+ for (;;) {
+ ssize_t message_size =
+ *size_rtn != GUESTFS_PROGRESS_FLAG ?
+ *size_rtn : PROGRESS_MESSAGE_SIZE;
+ if (nr >= message_size)
+ break;
+
rset2 = rset;
int r = select (max_fd+1, &rset2, NULL, NULL, NULL);
if (r == -1) {
xdr_uint32_t (&xdr, size_rtn);
xdr_destroy (&xdr);
+ /* *size_rtn changed, recalculate message_size */
+ message_size =
+ *size_rtn != GUESTFS_PROGRESS_FLAG ?
+ *size_rtn : PROGRESS_MESSAGE_SIZE;
+
if (*size_rtn == GUESTFS_LAUNCH_FLAG) {
if (g->state != LAUNCHING)
error (g, _("received magic signature from guestfsd, but in state %d"),
}
else if (*size_rtn == GUESTFS_CANCEL_FLAG)
return 0;
+ else if (*size_rtn == GUESTFS_PROGRESS_FLAG)
+ /*FALLTHROUGH*/;
/* If this happens, it's pretty bad and we've probably lost
* synchronization.
*/
}
/* Allocate the complete buffer, size now known. */
- *buf_rtn = safe_malloc (g, *size_rtn);
+ *buf_rtn = safe_malloc (g, message_size);
/*FALLTHROUGH*/
}
- size_t sizetoread = *size_rtn - nr;
+ size_t sizetoread = message_size - nr;
if (sizetoread > BUFSIZ) sizetoread = BUFSIZ;
r = read (g->sock, (char *) (*buf_rtn) + nr, sizetoread);
}
#endif
+ if (*size_rtn == GUESTFS_PROGRESS_FLAG) {
+ if (g->state == BUSY && g->progress_cb) {
+ guestfs_progress message;
+ XDR xdr;
+ xdrmem_create (&xdr, *buf_rtn, PROGRESS_MESSAGE_SIZE, XDR_DECODE);
+ xdr_guestfs_progress (&xdr, &message);
+ xdr_destroy (&xdr);
+
+ g->progress_cb (g, g->progress_cb_data,
+ message.proc, message.serial,
+ message.position, message.total);
+ }
+
+ free (*buf_rtn);
+ *buf_rtn = NULL;
+
+ /* Process next message. */
+ return guestfs___recv_from_daemon (g, size_rtn, buf_rtn);
+ }
+
return 0;
}
* accepted socket.
*/
int
-guestfs__accept_from_daemon (guestfs_h *g)
+guestfs___accept_from_daemon (guestfs_h *g)
{
fd_set rset, rset2;
}
int
-guestfs___send (guestfs_h *g, int proc_nr, xdrproc_t xdrp, char *args)
+guestfs___send (guestfs_h *g, int proc_nr,
+ uint64_t progress_hint, uint64_t optargs_bitmask,
+ xdrproc_t xdrp, char *args)
{
struct guestfs_message_header hdr;
XDR xdr;
hdr.direction = GUESTFS_DIRECTION_CALL;
hdr.serial = serial;
hdr.status = GUESTFS_STATUS_OK;
+ hdr.progress_hint = progress_hint;
+ hdr.optargs_bitmask = optargs_bitmask;
if (!xdr_guestfs_message_header (&xdr, &hdr)) {
error (g, _("xdr_guestfs_message_header failed"));
xdr_uint32_t (&xdr, &len);
again:
- r = guestfs__send_to_daemon (g, msg_out, msg_out_size);
+ r = guestfs___send_to_daemon (g, msg_out, msg_out_size);
if (r == -2) /* Ignore stray daemon cancellations. */
goto again;
if (r == -1)
xdrmem_create (&xdr, msg_out, 4, XDR_ENCODE);
xdr_uint32_t (&xdr, &len);
- r = guestfs__send_to_daemon (g, msg_out, msg_out_size);
+ r = guestfs___send_to_daemon (g, msg_out, msg_out_size);
/* Did the daemon send a cancellation message? */
if (r == -2) {
int r;
again:
- r = guestfs__recv_from_daemon (g, &size, &buf);
+ r = guestfs___recv_from_daemon (g, &size, &buf);
if (r == -1)
return -1;
XDR xdr;
guestfs_chunk chunk;
- r = guestfs__recv_from_daemon (g, &len, &buf);
+ r = guestfs___recv_from_daemon (g, &len, &buf);
if (r == -1) {
error (g, _("receive_file_data: parse error in reply callback"));
return -1;