X-Git-Url: http://git.annexia.org/?a=blobdiff_plain;f=src%2Fproto.c;h=549734b31f502e9da7b6ecbfb563ecdf800cc3cc;hb=6d6b7edd1102f8383643866bf358e494e0d518ef;hp=ad173c6b4263818ff860cc63e16229f77d18d818;hpb=737181bcd7b1de8c3a613d6282030c34efa78fb6;p=libguestfs.git diff --git a/src/proto.c b/src/proto.c index ad173c6..549734b 100644 --- a/src/proto.c +++ b/src/proto.c @@ -70,6 +70,9 @@ #include "guestfs-internal-actions.h" #include "guestfs_protocol.h" +/* Size of guestfs_progress message on the wire. */ +#define PROGRESS_MESSAGE_SIZE 24 + /* This is the code used to send and receive RPC messages and (for * certain types of message) to perform file transfers. This code is * driven from the generated actions (src/actions.c). There @@ -180,8 +183,8 @@ child_cleanup (guestfs_h *g) if (g->recoverypid > 0) kill (g->recoverypid, 9); waitpid (g->pid, NULL, 0); if (g->recoverypid > 0) waitpid (g->recoverypid, NULL, 0); - close (g->fd[0]); - close (g->fd[1]); + if (g->fd[0] >= 0) close (g->fd[0]); + if (g->fd[1] >= 0) close (g->fd[1]); close (g->sock); g->fd[0] = -1; g->fd[1] = -1; @@ -245,11 +248,56 @@ read_log_message_or_eof (guestfs_h *g, int fd, int error_if_eof) return 0; } +/* Read 'n' bytes, setting the socket to blocking temporarily so + * that we really read the number of bytes requested. + * Returns: 0 == EOF while reading + * -1 == error, error() function has been called + * n == read 'n' bytes in full + */ +static ssize_t +really_read_from_socket (guestfs_h *g, int sock, char *buf, size_t n) +{ + long flags; + ssize_t r; + size_t got; + + /* Set socket to blocking. */ + flags = fcntl (sock, F_GETFL); + if (flags == -1) { + perrorf (g, "fcntl"); + return -1; + } + if (fcntl (sock, F_SETFL, flags & ~O_NONBLOCK) == -1) { + perrorf (g, "fcntl"); + return -1; + } + + got = 0; + while (got < n) { + r = read (sock, &buf[got], n-got); + if (r == -1) { + perrorf (g, "read"); + return -1; + } + if (r == 0) + return 0; /* EOF */ + got += r; + } + + /* Restore original socket flags. */ + if (fcntl (sock, F_SETFL, flags) == -1) { + perrorf (g, "fcntl"); + return -1; + } + + return (ssize_t) got; +} + static int check_for_daemon_cancellation_or_eof (guestfs_h *g, int fd) { char buf[4]; - int n; + ssize_t n; uint32_t flag; XDR xdr; @@ -258,25 +306,46 @@ check_for_daemon_cancellation_or_eof (guestfs_h *g, int fd) "check_for_daemon_cancellation_or_eof: %p g->state = %d, fd = %d\n", g, g->state, fd); - n = read (fd, buf, 4); + n = really_read_from_socket (g, fd, buf, 4); + if (n == -1) + return -1; if (n == 0) { /* Hopefully this indicates the qemu child process has died. */ child_cleanup (g); return -1; } - if (n == -1) { - if (errno == EINTR || errno == EAGAIN) - return 0; - - perrorf (g, "read"); - return -1; - } - xdrmem_create (&xdr, buf, 4, XDR_DECODE); xdr_uint32_t (&xdr, &flag); xdr_destroy (&xdr); + /* Read and process progress messages that happen during FileIn. */ + if (flag == GUESTFS_PROGRESS_FLAG) { + char buf[PROGRESS_MESSAGE_SIZE]; + + n = really_read_from_socket (g, fd, buf, PROGRESS_MESSAGE_SIZE); + if (n == -1) + return -1; + if (n == 0) { + child_cleanup (g); + return -1; + } + + if (g->state == BUSY && g->progress_cb) { + guestfs_progress message; + + xdrmem_create (&xdr, buf, PROGRESS_MESSAGE_SIZE, XDR_DECODE); + xdr_guestfs_progress (&xdr, &message); + xdr_destroy (&xdr); + + g->progress_cb (g, g->progress_cb_data, + message.proc, message.serial, + message.position, message.total); + } + + return 0; + } + if (flag != GUESTFS_CANCEL_FLAG) { error (g, _("check_for_daemon_cancellation_or_eof: read 0x%x from daemon, expected 0x%x\n"), flag, GUESTFS_CANCEL_FLAG); @@ -312,7 +381,8 @@ guestfs___send_to_daemon (guestfs_h *g, const void *v_buf, size_t n) FD_ZERO (&rset); FD_ZERO (&wset); - FD_SET (g->fd[1], &rset); /* Read qemu stdout for log messages & EOF. */ + if (g->fd[1] >= 0) /* Read qemu stdout for log messages & EOF. */ + FD_SET (g->fd[1], &rset); FD_SET (g->sock, &rset); /* Read socket for cancellation & EOF. */ FD_SET (g->sock, &wset); /* Write to socket to send the data. */ @@ -329,7 +399,7 @@ guestfs___send_to_daemon (guestfs_h *g, const void *v_buf, size_t n) return -1; } - if (FD_ISSET (g->fd[1], &rset2)) { + if (g->fd[1] >= 0 && FD_ISSET (g->fd[1], &rset2)) { if (read_log_message_or_eof (g, g->fd[1], 0) == -1) return -1; } @@ -373,7 +443,12 @@ guestfs___send_to_daemon (guestfs_h *g, const void *v_buf, size_t n) * * It also checks for EOF (qemu died) and passes that up through the * child_cleanup function above. + * + * Progress notifications are handled transparently by this function. + * If the callback exists, it is called. The caller of this function + * will not see GUESTFS_PROGRESS_FLAG. */ + int guestfs___recv_from_daemon (guestfs_h *g, uint32_t *size_rtn, void **buf_rtn) { @@ -386,7 +461,8 @@ guestfs___recv_from_daemon (guestfs_h *g, uint32_t *size_rtn, void **buf_rtn) FD_ZERO (&rset); - FD_SET (g->fd[1], &rset); /* Read qemu stdout for log messages & EOF. */ + if (g->fd[1] >= 0) /* Read qemu stdout for log messages & EOF. */ + FD_SET (g->fd[1], &rset); FD_SET (g->sock, &rset); /* Read socket for data & EOF. */ int max_fd = MAX (g->sock, g->fd[1]); @@ -400,7 +476,13 @@ guestfs___recv_from_daemon (guestfs_h *g, uint32_t *size_rtn, void **buf_rtn) */ ssize_t nr = -4; - while (nr < (ssize_t) *size_rtn) { + for (;;) { + ssize_t message_size = + *size_rtn != GUESTFS_PROGRESS_FLAG ? + *size_rtn : PROGRESS_MESSAGE_SIZE; + if (nr >= message_size) + break; + rset2 = rset; int r = select (max_fd+1, &rset2, NULL, NULL, NULL); if (r == -1) { @@ -412,7 +494,7 @@ guestfs___recv_from_daemon (guestfs_h *g, uint32_t *size_rtn, void **buf_rtn) return -1; } - if (FD_ISSET (g->fd[1], &rset2)) { + if (g->fd[1] >= 0 && FD_ISSET (g->fd[1], &rset2)) { if (read_log_message_or_eof (g, g->fd[1], 0) == -1) { free (*buf_rtn); *buf_rtn = NULL; @@ -450,6 +532,11 @@ guestfs___recv_from_daemon (guestfs_h *g, uint32_t *size_rtn, void **buf_rtn) xdr_uint32_t (&xdr, size_rtn); xdr_destroy (&xdr); + /* *size_rtn changed, recalculate message_size */ + message_size = + *size_rtn != GUESTFS_PROGRESS_FLAG ? + *size_rtn : PROGRESS_MESSAGE_SIZE; + if (*size_rtn == GUESTFS_LAUNCH_FLAG) { if (g->state != LAUNCHING) error (g, _("received magic signature from guestfsd, but in state %d"), @@ -463,6 +550,8 @@ guestfs___recv_from_daemon (guestfs_h *g, uint32_t *size_rtn, void **buf_rtn) } else if (*size_rtn == GUESTFS_CANCEL_FLAG) return 0; + else if (*size_rtn == GUESTFS_PROGRESS_FLAG) + /*FALLTHROUGH*/; /* If this happens, it's pretty bad and we've probably lost * synchronization. */ @@ -473,11 +562,11 @@ guestfs___recv_from_daemon (guestfs_h *g, uint32_t *size_rtn, void **buf_rtn) } /* Allocate the complete buffer, size now known. */ - *buf_rtn = safe_malloc (g, *size_rtn); + *buf_rtn = safe_malloc (g, message_size); /*FALLTHROUGH*/ } - size_t sizetoread = *size_rtn - nr; + size_t sizetoread = message_size - nr; if (sizetoread > BUFSIZ) sizetoread = BUFSIZ; r = read (g->sock, (char *) (*buf_rtn) + nr, sizetoread); @@ -524,6 +613,26 @@ guestfs___recv_from_daemon (guestfs_h *g, uint32_t *size_rtn, void **buf_rtn) } #endif + if (*size_rtn == GUESTFS_PROGRESS_FLAG) { + if (g->state == BUSY && g->progress_cb) { + guestfs_progress message; + XDR xdr; + xdrmem_create (&xdr, *buf_rtn, PROGRESS_MESSAGE_SIZE, XDR_DECODE); + xdr_guestfs_progress (&xdr, &message); + xdr_destroy (&xdr); + + g->progress_cb (g, g->progress_cb_data, + message.proc, message.serial, + message.position, message.total); + } + + free (*buf_rtn); + *buf_rtn = NULL; + + /* Process next message. */ + return guestfs___recv_from_daemon (g, size_rtn, buf_rtn); + } + return 0; } @@ -543,7 +652,8 @@ guestfs___accept_from_daemon (guestfs_h *g) FD_ZERO (&rset); - FD_SET (g->fd[1], &rset); /* Read qemu stdout for log messages & EOF. */ + if (g->fd[1] >= 0) /* Read qemu stdout for log messages & EOF. */ + FD_SET (g->fd[1], &rset); FD_SET (g->sock, &rset); /* Read socket for accept. */ int max_fd = MAX (g->sock, g->fd[1]); @@ -567,7 +677,7 @@ guestfs___accept_from_daemon (guestfs_h *g) return -1; } - if (FD_ISSET (g->fd[1], &rset2)) { + if (g->fd[1] >= 0 && FD_ISSET (g->fd[1], &rset2)) { if (read_log_message_or_eof (g, g->fd[1], 1) == -1) return -1; } @@ -586,7 +696,9 @@ guestfs___accept_from_daemon (guestfs_h *g) } int -guestfs___send (guestfs_h *g, int proc_nr, xdrproc_t xdrp, char *args) +guestfs___send (guestfs_h *g, int proc_nr, + uint64_t progress_hint, uint64_t optargs_bitmask, + xdrproc_t xdrp, char *args) { struct guestfs_message_header hdr; XDR xdr; @@ -617,6 +729,8 @@ guestfs___send (guestfs_h *g, int proc_nr, xdrproc_t xdrp, char *args) hdr.direction = GUESTFS_DIRECTION_CALL; hdr.serial = serial; hdr.status = GUESTFS_STATUS_OK; + hdr.progress_hint = progress_hint; + hdr.optargs_bitmask = optargs_bitmask; if (!xdr_guestfs_message_header (&xdr, &hdr)) { error (g, _("xdr_guestfs_message_header failed"));