New event API (RHBZ#664558).
[libguestfs.git] / src / proto.c
1 /* libguestfs
2  * Copyright (C) 2009-2010 Red Hat Inc.
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Lesser General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Lesser General Public License for more details.
13  *
14  * You should have received a copy of the GNU Lesser General Public
15  * License along with this library; if not, write to the Free Software
16  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17  */
18
19 #include <config.h>
20
21 #define _BSD_SOURCE /* for mkdtemp, usleep */
22
23 #include <stdio.h>
24 #include <stdlib.h>
25 #include <stdarg.h>
26 #include <stddef.h>
27 #include <stdint.h>
28 #include <inttypes.h>
29 #include <unistd.h>
30 #include <string.h>
31 #include <fcntl.h>
32 #include <time.h>
33 #include <sys/stat.h>
34 #include <sys/select.h>
35 #include <dirent.h>
36 #include <signal.h>
37
38 #include <rpc/types.h>
39 #include <rpc/xdr.h>
40
41 #ifdef HAVE_ERRNO_H
42 #include <errno.h>
43 #endif
44
45 #ifdef HAVE_SYS_TYPES_H
46 #include <sys/types.h>
47 #endif
48
49 #ifdef HAVE_SYS_WAIT_H
50 #include <sys/wait.h>
51 #endif
52
53 #ifdef HAVE_SYS_SOCKET_H
54 #include <sys/socket.h>
55 #endif
56
57 #ifdef HAVE_SYS_UN_H
58 #include <sys/un.h>
59 #endif
60
61 #include <arpa/inet.h>
62 #include <netinet/in.h>
63
64 #include "c-ctype.h"
65 #include "glthread/lock.h"
66 #include "ignore-value.h"
67
68 #include "guestfs.h"
69 #include "guestfs-internal.h"
70 #include "guestfs-internal-actions.h"
71 #include "guestfs_protocol.h"
72
73 /* Size of guestfs_progress message on the wire. */
74 #define PROGRESS_MESSAGE_SIZE 24
75
76 /* This is the code used to send and receive RPC messages and (for
77  * certain types of message) to perform file transfers.  This code is
78  * driven from the generated actions (src/actions.c).  There
79  * are five different cases to consider:
80  *
81  * (1) A non-daemon function.  There is no RPC involved at all, it's
82  * all handled inside the library.
83  *
84  * (2) A simple RPC (eg. "mount").  We write the request, then read
85  * the reply.  The sequence of calls is:
86  *
87  *   guestfs___set_busy
88  *   guestfs___send
89  *   guestfs___recv
90  *   guestfs___end_busy
91  *
92  * (3) An RPC with FileOut parameters (eg. "upload").  We write the
93  * request, then write the file(s), then read the reply.  The sequence
94  * of calls is:
95  *
96  *   guestfs___set_busy
97  *   guestfs___send
98  *   guestfs___send_file  (possibly multiple times)
99  *   guestfs___recv
100  *   guestfs___end_busy
101  *
102  * (4) An RPC with FileIn parameters (eg. "download").  We write the
103  * request, then read the reply, then read the file(s).  The sequence
104  * of calls is:
105  *
106  *   guestfs___set_busy
107  *   guestfs___send
108  *   guestfs___recv
109  *   guestfs___recv_file  (possibly multiple times)
110  *   guestfs___end_busy
111  *
112  * (5) Both FileOut and FileIn parameters.  There are no calls like
113  * this in the current API, but they would be implemented as a
114  * combination of cases (3) and (4).
115  *
116  * During all writes and reads, we also select(2) on qemu stdout
117  * looking for messages (guestfsd stderr and guest kernel dmesg), and
118  * anything received is passed up through the log_message_cb.  This is
119  * also the reason why all the sockets are non-blocking.  We also have
120  * to check for EOF (qemu died).  All of this is handled by the
121  * functions send_to_daemon and recv_from_daemon.
122  */
123
124 static int
125 xwrite (int fd, const void *v_buf, size_t len)
126 {
127   const char *buf = v_buf;
128   int r;
129
130   while (len > 0) {
131     r = write (fd, buf, len);
132     if (r == -1)
133       return -1;
134
135     buf += r;
136     len -= r;
137   }
138
139   return 0;
140 }
141
142 int
143 guestfs___set_busy (guestfs_h *g)
144 {
145   if (g->state != READY) {
146     error (g, _("guestfs_set_busy: called when in state %d != READY"),
147            g->state);
148     return -1;
149   }
150   g->state = BUSY;
151   return 0;
152 }
153
154 int
155 guestfs___end_busy (guestfs_h *g)
156 {
157   switch (g->state)
158     {
159     case BUSY:
160       g->state = READY;
161       break;
162     case CONFIG:
163     case READY:
164       break;
165
166     case LAUNCHING:
167     case NO_HANDLE:
168     default:
169       error (g, _("guestfs_end_busy: called when in state %d"), g->state);
170       return -1;
171     }
172   return 0;
173 }
174
175 /* This is called if we detect EOF, ie. qemu died. */
176 static void
177 child_cleanup (guestfs_h *g)
178 {
179   debug (g, "child_cleanup: %p: child process died", g);
180
181   /*if (g->pid > 0) kill (g->pid, SIGTERM);*/
182   if (g->recoverypid > 0) kill (g->recoverypid, 9);
183   waitpid (g->pid, NULL, 0);
184   if (g->recoverypid > 0) waitpid (g->recoverypid, NULL, 0);
185   if (g->fd[0] >= 0) close (g->fd[0]);
186   if (g->fd[1] >= 0) close (g->fd[1]);
187   close (g->sock);
188   g->fd[0] = -1;
189   g->fd[1] = -1;
190   g->sock = -1;
191   g->pid = 0;
192   g->recoverypid = 0;
193   memset (&g->launch_t, 0, sizeof g->launch_t);
194   g->state = CONFIG;
195   guestfs___call_callbacks_void (g, GUESTFS_EVENT_SUBPROCESS_QUIT);
196 }
197
198 static int
199 read_log_message_or_eof (guestfs_h *g, int fd, int error_if_eof)
200 {
201   char buf[BUFSIZ];
202   int n;
203
204 #if 0
205   debug (g, "read_log_message_or_eof: %p g->state = %d, fd = %d",
206          g, g->state, fd);
207 #endif
208
209   /* QEMU's console emulates a 16550A serial port.  The real 16550A
210    * device has a small FIFO buffer (16 bytes) which means here we see
211    * lots of small reads of 1-16 bytes in length, usually single
212    * bytes.
213    */
214   n = read (fd, buf, sizeof buf);
215   if (n == 0) {
216     /* Hopefully this indicates the qemu child process has died. */
217     child_cleanup (g);
218
219     if (error_if_eof) {
220       /* We weren't expecting eof here (called from launch) so place
221        * something in the error buffer.  RHBZ#588851.
222        */
223       error (g, "child process died unexpectedly");
224     }
225     return -1;
226   }
227
228   if (n == -1) {
229     if (errno == EINTR || errno == EAGAIN)
230       return 0;
231
232     perrorf (g, "read");
233     return -1;
234   }
235
236   /* It's an actual log message, send it upwards if anyone is listening. */
237   guestfs___call_callbacks_message (g, GUESTFS_EVENT_APPLIANCE, buf, n);
238
239   return 0;
240 }
241
242 /* Read 'n' bytes, setting the socket to blocking temporarily so
243  * that we really read the number of bytes requested.
244  * Returns:  0 == EOF while reading
245  *          -1 == error, error() function has been called
246  *           n == read 'n' bytes in full
247  */
248 static ssize_t
249 really_read_from_socket (guestfs_h *g, int sock, char *buf, size_t n)
250 {
251   long flags;
252   ssize_t r;
253   size_t got;
254
255   /* Set socket to blocking. */
256   flags = fcntl (sock, F_GETFL);
257   if (flags == -1) {
258     perrorf (g, "fcntl");
259     return -1;
260   }
261   if (fcntl (sock, F_SETFL, flags & ~O_NONBLOCK) == -1) {
262     perrorf (g, "fcntl");
263     return -1;
264   }
265
266   got = 0;
267   while (got < n) {
268     r = read (sock, &buf[got], n-got);
269     if (r == -1) {
270       perrorf (g, "read");
271       return -1;
272     }
273     if (r == 0)
274       return 0; /* EOF */
275     got += r;
276   }
277
278   /* Restore original socket flags. */
279   if (fcntl (sock, F_SETFL, flags) == -1) {
280     perrorf (g, "fcntl");
281     return -1;
282   }
283
284   return (ssize_t) got;
285 }
286
287 static void
288 send_progress_message (guestfs_h *g, const guestfs_progress *message)
289 {
290   uint64_t array[4];
291
292   array[0] = message->proc;
293   array[1] = message->serial;
294   array[2] = message->position;
295   array[3] = message->total;
296
297   guestfs___call_callbacks_array (g, GUESTFS_EVENT_PROGRESS,
298                                   array, sizeof array / sizeof array[0]);
299 }
300
301 static int
302 check_for_daemon_cancellation_or_eof (guestfs_h *g, int fd)
303 {
304   char buf[4];
305   ssize_t n;
306   uint32_t flag;
307   XDR xdr;
308
309   debug (g, "check_for_daemon_cancellation_or_eof: %p g->state = %d, fd = %d",
310          g, g->state, fd);
311
312   n = really_read_from_socket (g, fd, buf, 4);
313   if (n == -1)
314     return -1;
315   if (n == 0) {
316     /* Hopefully this indicates the qemu child process has died. */
317     child_cleanup (g);
318     return -1;
319   }
320
321   xdrmem_create (&xdr, buf, 4, XDR_DECODE);
322   xdr_uint32_t (&xdr, &flag);
323   xdr_destroy (&xdr);
324
325   /* Read and process progress messages that happen during FileIn. */
326   if (flag == GUESTFS_PROGRESS_FLAG) {
327     char buf[PROGRESS_MESSAGE_SIZE];
328
329     n = really_read_from_socket (g, fd, buf, PROGRESS_MESSAGE_SIZE);
330     if (n == -1)
331       return -1;
332     if (n == 0) {
333       child_cleanup (g);
334       return -1;
335     }
336
337     if (g->state == BUSY) {
338       guestfs_progress message;
339
340       xdrmem_create (&xdr, buf, PROGRESS_MESSAGE_SIZE, XDR_DECODE);
341       xdr_guestfs_progress (&xdr, &message);
342       xdr_destroy (&xdr);
343
344       send_progress_message (g, &message);
345     }
346
347     return 0;
348   }
349
350   if (flag != GUESTFS_CANCEL_FLAG) {
351     error (g, _("check_for_daemon_cancellation_or_eof: read 0x%x from daemon, expected 0x%x\n"),
352            flag, GUESTFS_CANCEL_FLAG);
353     return -1;
354   }
355
356   return -2;
357 }
358
359 /* This writes the whole N bytes of BUF to the daemon socket.
360  *
361  * If the whole write is successful, it returns 0.
362  * If there was an error, it returns -1.
363  * If the daemon sent a cancellation message, it returns -2.
364  *
365  * It also checks qemu stdout for log messages and passes those up
366  * through log_message_cb.
367  *
368  * It also checks for EOF (qemu died) and passes that up through the
369  * child_cleanup function above.
370  */
371 int
372 guestfs___send_to_daemon (guestfs_h *g, const void *v_buf, size_t n)
373 {
374   const char *buf = v_buf;
375   fd_set rset, rset2;
376   fd_set wset, wset2;
377
378   debug (g, "send_to_daemon: %p g->state = %d, n = %zu", g, g->state, n);
379
380   FD_ZERO (&rset);
381   FD_ZERO (&wset);
382
383   if (g->fd[1] >= 0)            /* Read qemu stdout for log messages & EOF. */
384     FD_SET (g->fd[1], &rset);
385   FD_SET (g->sock, &rset);      /* Read socket for cancellation & EOF. */
386   FD_SET (g->sock, &wset);      /* Write to socket to send the data. */
387
388   int max_fd = MAX (g->sock, g->fd[1]);
389
390   while (n > 0) {
391     rset2 = rset;
392     wset2 = wset;
393     int r = select (max_fd+1, &rset2, &wset2, NULL, NULL);
394     if (r == -1) {
395       if (errno == EINTR || errno == EAGAIN)
396         continue;
397       perrorf (g, "select");
398       return -1;
399     }
400
401     if (g->fd[1] >= 0 && FD_ISSET (g->fd[1], &rset2)) {
402       if (read_log_message_or_eof (g, g->fd[1], 0) == -1)
403         return -1;
404     }
405     if (FD_ISSET (g->sock, &rset2)) {
406       r = check_for_daemon_cancellation_or_eof (g, g->sock);
407       if (r < 0)
408         return r;
409     }
410     if (FD_ISSET (g->sock, &wset2)) {
411       r = write (g->sock, buf, n);
412       if (r == -1) {
413         if (errno == EINTR || errno == EAGAIN)
414           continue;
415         perrorf (g, "write");
416         if (errno == EPIPE) /* Disconnected from guest (RHBZ#508713). */
417           child_cleanup (g);
418         return -1;
419       }
420       buf += r;
421       n -= r;
422     }
423   }
424
425   return 0;
426 }
427
428 /* This reads a single message, file chunk, launch flag or
429  * cancellation flag from the daemon.  If something was read, it
430  * returns 0, otherwise -1.
431  *
432  * Both size_rtn and buf_rtn must be passed by the caller as non-NULL.
433  *
434  * *size_rtn returns the size of the returned message or it may be
435  * GUESTFS_LAUNCH_FLAG or GUESTFS_CANCEL_FLAG.
436  *
437  * *buf_rtn is returned containing the message (if any) or will be set
438  * to NULL.  *buf_rtn must be freed by the caller.
439  *
440  * It also checks qemu stdout for log messages and passes those up
441  * through log_message_cb.
442  *
443  * It also checks for EOF (qemu died) and passes that up through the
444  * child_cleanup function above.
445  *
446  * Progress notifications are handled transparently by this function.
447  * If the callback exists, it is called.  The caller of this function
448  * will not see GUESTFS_PROGRESS_FLAG.
449  */
450
451 int
452 guestfs___recv_from_daemon (guestfs_h *g, uint32_t *size_rtn, void **buf_rtn)
453 {
454   fd_set rset, rset2;
455
456   debug (g, "recv_from_daemon: %p g->state = %d, size_rtn = %p, buf_rtn = %p",
457          g, g->state, size_rtn, buf_rtn);
458
459   FD_ZERO (&rset);
460
461   if (g->fd[1] >= 0)            /* Read qemu stdout for log messages & EOF. */
462     FD_SET (g->fd[1], &rset);
463   FD_SET (g->sock, &rset);      /* Read socket for data & EOF. */
464
465   int max_fd = MAX (g->sock, g->fd[1]);
466
467   *size_rtn = 0;
468   *buf_rtn = NULL;
469
470   char lenbuf[4];
471   /* nr is the size of the message, but we prime it as -4 because we
472    * have to read the message length word first.
473    */
474   ssize_t nr = -4;
475
476   for (;;) {
477     ssize_t message_size =
478       *size_rtn != GUESTFS_PROGRESS_FLAG ?
479       *size_rtn : PROGRESS_MESSAGE_SIZE;
480     if (nr >= message_size)
481       break;
482
483     rset2 = rset;
484     int r = select (max_fd+1, &rset2, NULL, NULL, NULL);
485     if (r == -1) {
486       if (errno == EINTR || errno == EAGAIN)
487         continue;
488       perrorf (g, "select");
489       free (*buf_rtn);
490       *buf_rtn = NULL;
491       return -1;
492     }
493
494     if (g->fd[1] >= 0 && FD_ISSET (g->fd[1], &rset2)) {
495       if (read_log_message_or_eof (g, g->fd[1], 0) == -1) {
496         free (*buf_rtn);
497         *buf_rtn = NULL;
498         return -1;
499       }
500     }
501     if (FD_ISSET (g->sock, &rset2)) {
502       if (nr < 0) {    /* Have we read the message length word yet? */
503         r = read (g->sock, lenbuf+nr+4, -nr);
504         if (r == -1) {
505           if (errno == EINTR || errno == EAGAIN)
506             continue;
507           int err = errno;
508           perrorf (g, "read");
509           /* Under some circumstances we see "Connection reset by peer"
510            * here when the child dies suddenly.  Catch this and call
511            * the cleanup function, same as for EOF.
512            */
513           if (err == ECONNRESET)
514             child_cleanup (g);
515           return -1;
516         }
517         if (r == 0) {
518           error (g, _("unexpected end of file when reading from daemon"));
519           child_cleanup (g);
520           return -1;
521         }
522         nr += r;
523
524         if (nr < 0)         /* Still not got the whole length word. */
525           continue;
526
527         XDR xdr;
528         xdrmem_create (&xdr, lenbuf, 4, XDR_DECODE);
529         xdr_uint32_t (&xdr, size_rtn);
530         xdr_destroy (&xdr);
531
532         /* *size_rtn changed, recalculate message_size */
533         message_size =
534           *size_rtn != GUESTFS_PROGRESS_FLAG ?
535           *size_rtn : PROGRESS_MESSAGE_SIZE;
536
537         if (*size_rtn == GUESTFS_LAUNCH_FLAG) {
538           if (g->state != LAUNCHING)
539             error (g, _("received magic signature from guestfsd, but in state %d"),
540                    g->state);
541           else {
542             g->state = READY;
543             guestfs___call_callbacks_void (g, GUESTFS_EVENT_LAUNCH_DONE);
544           }
545           return 0;
546         }
547         else if (*size_rtn == GUESTFS_CANCEL_FLAG)
548           return 0;
549         else if (*size_rtn == GUESTFS_PROGRESS_FLAG)
550           /*FALLTHROUGH*/;
551         /* If this happens, it's pretty bad and we've probably lost
552          * synchronization.
553          */
554         else if (*size_rtn > GUESTFS_MESSAGE_MAX) {
555           error (g, _("message length (%u) > maximum possible size (%d)"),
556                  (unsigned) *size_rtn, GUESTFS_MESSAGE_MAX);
557           return -1;
558         }
559
560         /* Allocate the complete buffer, size now known. */
561         *buf_rtn = safe_malloc (g, message_size);
562         /*FALLTHROUGH*/
563       }
564
565       size_t sizetoread = message_size - nr;
566       if (sizetoread > BUFSIZ) sizetoread = BUFSIZ;
567
568       r = read (g->sock, (char *) (*buf_rtn) + nr, sizetoread);
569       if (r == -1) {
570         if (errno == EINTR || errno == EAGAIN)
571           continue;
572         perrorf (g, "read");
573         free (*buf_rtn);
574         *buf_rtn = NULL;
575         return -1;
576       }
577       if (r == 0) {
578         error (g, _("unexpected end of file when reading from daemon"));
579         child_cleanup (g);
580         free (*buf_rtn);
581         *buf_rtn = NULL;
582         return -1;
583       }
584       nr += r;
585     }
586   }
587
588   /* Got the full message, caller can start processing it. */
589 #ifdef ENABLE_PACKET_DUMP
590   if (g->verbose) {
591     ssize_t i, j;
592
593     for (i = 0; i < nr; i += 16) {
594       printf ("%04zx: ", i);
595       for (j = i; j < MIN (i+16, nr); ++j)
596         printf ("%02x ", (*(unsigned char **)buf_rtn)[j]);
597       for (; j < i+16; ++j)
598         printf ("   ");
599       printf ("|");
600       for (j = i; j < MIN (i+16, nr); ++j)
601         if (c_isprint ((*(char **)buf_rtn)[j]))
602           printf ("%c", (*(char **)buf_rtn)[j]);
603         else
604           printf (".");
605       for (; j < i+16; ++j)
606         printf (" ");
607       printf ("|\n");
608     }
609   }
610 #endif
611
612   if (*size_rtn == GUESTFS_PROGRESS_FLAG) {
613     if (g->state == BUSY) {
614       guestfs_progress message;
615       XDR xdr;
616       xdrmem_create (&xdr, *buf_rtn, PROGRESS_MESSAGE_SIZE, XDR_DECODE);
617       xdr_guestfs_progress (&xdr, &message);
618       xdr_destroy (&xdr);
619
620       send_progress_message (g, &message);
621     }
622
623     free (*buf_rtn);
624     *buf_rtn = NULL;
625
626     /* Process next message. */
627     return guestfs___recv_from_daemon (g, size_rtn, buf_rtn);
628   }
629
630   return 0;
631 }
632
633 /* This is very much like recv_from_daemon above, but g->sock is
634  * a listening socket and we are accepting a new connection on
635  * that socket instead of reading anything.  Returns the newly
636  * accepted socket.
637  */
638 int
639 guestfs___accept_from_daemon (guestfs_h *g)
640 {
641   fd_set rset, rset2;
642
643   debug (g, "accept_from_daemon: %p g->state = %d", g, g->state);
644
645   FD_ZERO (&rset);
646
647   if (g->fd[1] >= 0)            /* Read qemu stdout for log messages & EOF. */
648     FD_SET (g->fd[1], &rset);
649   FD_SET (g->sock, &rset);      /* Read socket for accept. */
650
651   int max_fd = MAX (g->sock, g->fd[1]);
652   int sock = -1;
653
654   while (sock == -1) {
655     /* If the qemu process has died, clean up the zombie (RHBZ#579155).
656      * By partially polling in the select below we ensure that this
657      * function will be called eventually.
658      */
659     waitpid (g->pid, NULL, WNOHANG);
660
661     rset2 = rset;
662
663     struct timeval tv = { .tv_sec = 1, .tv_usec = 0 };
664     int r = select (max_fd+1, &rset2, NULL, NULL, &tv);
665     if (r == -1) {
666       if (errno == EINTR || errno == EAGAIN)
667         continue;
668       perrorf (g, "select");
669       return -1;
670     }
671
672     if (g->fd[1] >= 0 && FD_ISSET (g->fd[1], &rset2)) {
673       if (read_log_message_or_eof (g, g->fd[1], 1) == -1)
674         return -1;
675     }
676     if (FD_ISSET (g->sock, &rset2)) {
677       sock = accept (g->sock, NULL, NULL);
678       if (sock == -1) {
679         if (errno == EINTR || errno == EAGAIN)
680           continue;
681         perrorf (g, "accept");
682         return -1;
683       }
684     }
685   }
686
687   return sock;
688 }
689
690 int
691 guestfs___send (guestfs_h *g, int proc_nr,
692                 uint64_t progress_hint, uint64_t optargs_bitmask,
693                 xdrproc_t xdrp, char *args)
694 {
695   struct guestfs_message_header hdr;
696   XDR xdr;
697   u_int32_t len;
698   int serial = g->msg_next_serial++;
699   int r;
700   char *msg_out;
701   size_t msg_out_size;
702
703   if (g->state != BUSY) {
704     error (g, _("guestfs___send: state %d != BUSY"), g->state);
705     return -1;
706   }
707
708   /* We have to allocate this message buffer on the heap because
709    * it is quite large (although will be mostly unused).  We
710    * can't allocate it on the stack because in some environments
711    * we have quite limited stack space available, notably when
712    * running in the JVM.
713    */
714   msg_out = safe_malloc (g, GUESTFS_MESSAGE_MAX + 4);
715   xdrmem_create (&xdr, msg_out + 4, GUESTFS_MESSAGE_MAX, XDR_ENCODE);
716
717   /* Serialize the header. */
718   hdr.prog = GUESTFS_PROGRAM;
719   hdr.vers = GUESTFS_PROTOCOL_VERSION;
720   hdr.proc = proc_nr;
721   hdr.direction = GUESTFS_DIRECTION_CALL;
722   hdr.serial = serial;
723   hdr.status = GUESTFS_STATUS_OK;
724   hdr.progress_hint = progress_hint;
725   hdr.optargs_bitmask = optargs_bitmask;
726
727   if (!xdr_guestfs_message_header (&xdr, &hdr)) {
728     error (g, _("xdr_guestfs_message_header failed"));
729     goto cleanup1;
730   }
731
732   /* Serialize the args.  If any, because some message types
733    * have no parameters.
734    */
735   if (xdrp) {
736     if (!(*xdrp) (&xdr, args)) {
737       error (g, _("dispatch failed to marshal args"));
738       goto cleanup1;
739     }
740   }
741
742   /* Get the actual length of the message, resize the buffer to match
743    * the actual length, and write the length word at the beginning.
744    */
745   len = xdr_getpos (&xdr);
746   xdr_destroy (&xdr);
747
748   msg_out = safe_realloc (g, msg_out, len + 4);
749   msg_out_size = len + 4;
750
751   xdrmem_create (&xdr, msg_out, 4, XDR_ENCODE);
752   xdr_uint32_t (&xdr, &len);
753
754  again:
755   r = guestfs___send_to_daemon (g, msg_out, msg_out_size);
756   if (r == -2)                  /* Ignore stray daemon cancellations. */
757     goto again;
758   if (r == -1)
759     goto cleanup1;
760   free (msg_out);
761
762   return serial;
763
764  cleanup1:
765   free (msg_out);
766   return -1;
767 }
768
769 static int cancel = 0; /* XXX Implement file cancellation. */
770 static int send_file_chunk (guestfs_h *g, int cancel, const char *buf, size_t len);
771 static int send_file_data (guestfs_h *g, const char *buf, size_t len);
772 static int send_file_cancellation (guestfs_h *g);
773 static int send_file_complete (guestfs_h *g);
774
775 /* Send a file.
776  * Returns:
777  *   0 OK
778  *   -1 error
779  *   -2 daemon cancelled (we must read the error message)
780  */
781 int
782 guestfs___send_file (guestfs_h *g, const char *filename)
783 {
784   char buf[GUESTFS_MAX_CHUNK_SIZE];
785   int fd, r, err;
786
787   fd = open (filename, O_RDONLY);
788   if (fd == -1) {
789     perrorf (g, "open: %s", filename);
790     send_file_cancellation (g);
791     /* Daemon sees cancellation and won't reply, so caller can
792      * just return here.
793      */
794     return -1;
795   }
796
797   /* Send file in chunked encoding. */
798   while (!cancel) {
799     r = read (fd, buf, sizeof buf);
800     if (r == -1 && (errno == EINTR || errno == EAGAIN))
801       continue;
802     if (r <= 0) break;
803     err = send_file_data (g, buf, r);
804     if (err < 0) {
805       if (err == -2)            /* daemon sent cancellation */
806         send_file_cancellation (g);
807       return err;
808     }
809   }
810
811   if (cancel) {                 /* cancel from either end */
812     send_file_cancellation (g);
813     return -1;
814   }
815
816   if (r == -1) {
817     perrorf (g, "read: %s", filename);
818     send_file_cancellation (g);
819     return -1;
820   }
821
822   /* End of file, but before we send that, we need to close
823    * the file and check for errors.
824    */
825   if (close (fd) == -1) {
826     perrorf (g, "close: %s", filename);
827     send_file_cancellation (g);
828     return -1;
829   }
830
831   return send_file_complete (g);
832 }
833
834 /* Send a chunk of file data. */
835 static int
836 send_file_data (guestfs_h *g, const char *buf, size_t len)
837 {
838   return send_file_chunk (g, 0, buf, len);
839 }
840
841 /* Send a cancellation message. */
842 static int
843 send_file_cancellation (guestfs_h *g)
844 {
845   return send_file_chunk (g, 1, NULL, 0);
846 }
847
848 /* Send a file complete chunk. */
849 static int
850 send_file_complete (guestfs_h *g)
851 {
852   char buf[1];
853   return send_file_chunk (g, 0, buf, 0);
854 }
855
856 static int
857 send_file_chunk (guestfs_h *g, int cancel, const char *buf, size_t buflen)
858 {
859   u_int32_t len;
860   int r;
861   guestfs_chunk chunk;
862   XDR xdr;
863   char *msg_out;
864   size_t msg_out_size;
865
866   if (g->state != BUSY) {
867     error (g, _("send_file_chunk: state %d != READY"), g->state);
868     return -1;
869   }
870
871   /* Allocate the chunk buffer.  Don't use the stack to avoid
872    * excessive stack usage and unnecessary copies.
873    */
874   msg_out = safe_malloc (g, GUESTFS_MAX_CHUNK_SIZE + 4 + 48);
875   xdrmem_create (&xdr, msg_out + 4, GUESTFS_MAX_CHUNK_SIZE + 48, XDR_ENCODE);
876
877   /* Serialize the chunk. */
878   chunk.cancel = cancel;
879   chunk.data.data_len = buflen;
880   chunk.data.data_val = (char *) buf;
881
882   if (!xdr_guestfs_chunk (&xdr, &chunk)) {
883     error (g, _("xdr_guestfs_chunk failed (buf = %p, buflen = %zu)"),
884            buf, buflen);
885     xdr_destroy (&xdr);
886     goto cleanup1;
887   }
888
889   len = xdr_getpos (&xdr);
890   xdr_destroy (&xdr);
891
892   /* Reduce the size of the outgoing message buffer to the real length. */
893   msg_out = safe_realloc (g, msg_out, len + 4);
894   msg_out_size = len + 4;
895
896   xdrmem_create (&xdr, msg_out, 4, XDR_ENCODE);
897   xdr_uint32_t (&xdr, &len);
898
899   r = guestfs___send_to_daemon (g, msg_out, msg_out_size);
900
901   /* Did the daemon send a cancellation message? */
902   if (r == -2) {
903     debug (g, "got daemon cancellation");
904     return -2;
905   }
906
907   if (r == -1)
908     goto cleanup1;
909
910   free (msg_out);
911
912   return 0;
913
914  cleanup1:
915   free (msg_out);
916   return -1;
917 }
918
919 /* Receive a reply. */
920 int
921 guestfs___recv (guestfs_h *g, const char *fn,
922                 guestfs_message_header *hdr,
923                 guestfs_message_error *err,
924                 xdrproc_t xdrp, char *ret)
925 {
926   XDR xdr;
927   void *buf;
928   uint32_t size;
929   int r;
930
931  again:
932   r = guestfs___recv_from_daemon (g, &size, &buf);
933   if (r == -1)
934     return -1;
935
936   /* This can happen if a cancellation happens right at the end
937    * of us sending a FileIn parameter to the daemon.  Discard.  The
938    * daemon should send us an error message next.
939    */
940   if (size == GUESTFS_CANCEL_FLAG)
941     goto again;
942
943   if (size == GUESTFS_LAUNCH_FLAG) {
944     error (g, "%s: received unexpected launch flag from daemon when expecting reply", fn);
945     return -1;
946   }
947
948   xdrmem_create (&xdr, buf, size, XDR_DECODE);
949
950   if (!xdr_guestfs_message_header (&xdr, hdr)) {
951     error (g, "%s: failed to parse reply header", fn);
952     xdr_destroy (&xdr);
953     free (buf);
954     return -1;
955   }
956   if (hdr->status == GUESTFS_STATUS_ERROR) {
957     if (!xdr_guestfs_message_error (&xdr, err)) {
958       error (g, "%s: failed to parse reply error", fn);
959       xdr_destroy (&xdr);
960       free (buf);
961       return -1;
962     }
963   } else {
964     if (xdrp && ret && !xdrp (&xdr, ret)) {
965       error (g, "%s: failed to parse reply", fn);
966       xdr_destroy (&xdr);
967       free (buf);
968       return -1;
969     }
970   }
971   xdr_destroy (&xdr);
972   free (buf);
973
974   return 0;
975 }
976
977 /* Receive a file. */
978
979 /* Returns -1 = error, 0 = EOF, > 0 = more data */
980 static ssize_t receive_file_data (guestfs_h *g, void **buf);
981
982 int
983 guestfs___recv_file (guestfs_h *g, const char *filename)
984 {
985   void *buf;
986   int fd, r;
987
988   fd = open (filename, O_WRONLY|O_CREAT|O_TRUNC|O_NOCTTY, 0666);
989   if (fd == -1) {
990     perrorf (g, "open: %s", filename);
991     goto cancel;
992   }
993
994   /* Receive the file in chunked encoding. */
995   while ((r = receive_file_data (g, &buf)) > 0) {
996     if (xwrite (fd, buf, r) == -1) {
997       perrorf (g, "%s: write", filename);
998       free (buf);
999       goto cancel;
1000     }
1001     free (buf);
1002   }
1003
1004   if (r == -1) {
1005     error (g, _("%s: error in chunked encoding"), filename);
1006     return -1;
1007   }
1008
1009   if (close (fd) == -1) {
1010     perrorf (g, "close: %s", filename);
1011     return -1;
1012   }
1013
1014   return 0;
1015
1016  cancel: ;
1017   /* Send cancellation message to daemon, then wait until it
1018    * cancels (just throwing away data).
1019    */
1020   XDR xdr;
1021   char fbuf[4];
1022   uint32_t flag = GUESTFS_CANCEL_FLAG;
1023
1024   debug (g, "%s: waiting for daemon to acknowledge cancellation",
1025          __func__);
1026
1027   xdrmem_create (&xdr, fbuf, sizeof fbuf, XDR_ENCODE);
1028   xdr_uint32_t (&xdr, &flag);
1029   xdr_destroy (&xdr);
1030
1031   if (xwrite (g->sock, fbuf, sizeof fbuf) == -1) {
1032     perrorf (g, _("write to daemon socket"));
1033     return -1;
1034   }
1035
1036   while (receive_file_data (g, NULL) > 0)
1037     ;                           /* just discard it */
1038
1039   return -1;
1040 }
1041
1042 /* Receive a chunk of file data. */
1043 /* Returns -1 = error, 0 = EOF, > 0 = more data */
1044 static ssize_t
1045 receive_file_data (guestfs_h *g, void **buf_r)
1046 {
1047   int r;
1048   void *buf;
1049   uint32_t len;
1050   XDR xdr;
1051   guestfs_chunk chunk;
1052
1053   r = guestfs___recv_from_daemon (g, &len, &buf);
1054   if (r == -1) {
1055     error (g, _("receive_file_data: parse error in reply callback"));
1056     return -1;
1057   }
1058
1059   if (len == GUESTFS_LAUNCH_FLAG || len == GUESTFS_CANCEL_FLAG) {
1060     error (g, _("receive_file_data: unexpected flag received when reading file chunks"));
1061     return -1;
1062   }
1063
1064   memset (&chunk, 0, sizeof chunk);
1065
1066   xdrmem_create (&xdr, buf, len, XDR_DECODE);
1067   if (!xdr_guestfs_chunk (&xdr, &chunk)) {
1068     error (g, _("failed to parse file chunk"));
1069     free (buf);
1070     return -1;
1071   }
1072   xdr_destroy (&xdr);
1073   /* After decoding, the original buffer is no longer used. */
1074   free (buf);
1075
1076   if (chunk.cancel) {
1077     error (g, _("file receive cancelled by daemon"));
1078     free (chunk.data.data_val);
1079     return -1;
1080   }
1081
1082   if (chunk.data.data_len == 0) { /* end of transfer */
1083     free (chunk.data.data_val);
1084     return 0;
1085   }
1086
1087   if (buf_r) *buf_r = chunk.data.data_val;
1088   else free (chunk.data.data_val); /* else caller frees */
1089
1090   return chunk.data.data_len;
1091 }