Disk usage dialog.
[guestfs-browser.git] / slave.ml
1 (* Guestfs Browser.
2  * Copyright (C) 2010 Red Hat Inc.
3  *
4  * This program is free software; you can redistribute it and/or modify
5  * it under the terms of the GNU General Public License as published by
6  * the Free Software Foundation; either version 2 of the License, or
7  * (at your option) any later version.
8  *
9  * This program is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  * GNU General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License along
15  * with this program; if not, write to the Free Software Foundation, Inc.,
16  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17  *)
18
19 open ExtList
20 open Printf
21 open Utils
22
23 module C = Libvirt.Connect
24 module Cond = Condition
25 module D = Libvirt.Domain
26 module G = Guestfs
27 module M = Mutex
28 module Q = Queue
29
30 type 'a callback = 'a -> unit
31
32 (* The commands. *)
33 type command =
34   | Exit_thread
35   | Connect of string option * unit callback
36   | Get_domains of domain list callback
37   | Open_domain of string * rw_flag callback
38   | Open_images of string list * rw_flag callback
39   | Get_volumes of volume callback
40   | Read_directory of string * string * direntry list callback
41   | Disk_usage of string * string * int64 callback
42   | Export_dir_to of export_t * string * string * string * unit callback
43
44 and domain = {
45   dom_id : int;
46   dom_name : string;
47   dom_state : D.state;
48 }
49
50 and rw_flag = RO | RW
51
52 and volume = {
53   vol_device : string;
54   vol_type : string;
55   vol_label : string;
56   vol_uuid : string;
57   vol_statvfs : Guestfs.statvfs;
58 }
59
60 and direntry = {
61   dent_name : string;
62   dent_stat : Guestfs.stat;
63   dent_link : string;
64 }
65
66 and export_t =
67   | Export_tar
68   | Export_tgz
69   | Export_checksums of string
70   | Export_list
71
72 let rec string_of_command = function
73   | Exit_thread -> "Exit_thread"
74   | Connect (Some name, _) -> sprintf "Connect %s" name
75   | Connect (None, _) -> "Connect NULL"
76   | Get_domains _ -> "Get_domains"
77   | Open_domain (name, _) -> sprintf "Open_domain %s" name
78   | Open_images (images, _) ->
79       sprintf "Open_images [%s]" (String.concat "; " images)
80   | Get_volumes _ -> "Get_volumes"
81   | Read_directory (dev, dir, _) -> sprintf "Read_directory %s %s" dev dir
82   | Disk_usage (dev, dir, _) -> sprintf "Disk_usage %s %s" dev dir
83   | Export_dir_to (t, dev, dir, file, _) ->
84       sprintf "Export_dir_to %s %s %s %s" (string_of_export_t t) dev dir file
85
86 and string_of_export_t = function
87   | Export_tar -> "Export_tar"
88   | Export_tgz -> "Export_tgz"
89   | Export_checksums alg -> sprintf "Export_checksums %s" alg
90   | Export_list -> "Export_list"
91
92 and string_of_rw_flag = function RO -> "RO" | RW -> "RW"
93
94 let no_callback _ = ()
95
96 let failure_hook = ref (fun _ -> ())
97 let busy_hook = ref (fun _ -> ())
98 let idle_hook = ref (fun _ -> ())
99
100 let set_failure_hook cb = failure_hook := cb
101 let set_busy_hook cb = busy_hook := cb
102 let set_idle_hook cb = idle_hook := cb
103
104 (* Execute a function, while holding a mutex.  If the function
105  * fails, ensure we release the mutex before rethrowing the
106  * exception.
107  *)
108 let with_lock m f =
109   M.lock m;
110   let r = try Left (f ()) with exn -> Right exn in
111   M.unlock m;
112   match r with
113   | Left r -> r
114   | Right exn -> raise exn
115
116 (* The queue of commands, and a lock and condition to protect it. *)
117 let q = Q.create ()
118 let q_lock = M.create ()
119 let q_cond = Cond.create ()
120
121 (* Send a command message to the slave thread. *)
122 let send_to_slave cmd =
123   debug "sending message %s to slave thread ..." (string_of_command cmd);
124   with_lock q_lock (
125     fun () ->
126       Q.push cmd q;
127       Cond.signal q_cond
128   )
129
130 let discard_command_queue () = with_lock q_lock (fun () -> Q.clear q)
131
132 let connect uri cb = send_to_slave (Connect (uri, cb))
133 let get_domains cb = send_to_slave (Get_domains cb)
134 let get_volumes cb = send_to_slave (Get_volumes cb)
135 let open_domain name cb = send_to_slave (Open_domain (name, cb))
136 let open_images images cb = send_to_slave (Open_images (images, cb))
137 let read_directory dev dir cb = send_to_slave (Read_directory (dev, dir, cb))
138 let disk_usage dev dir cb = send_to_slave (Disk_usage (dev, dir, cb))
139 let export_dir_to t dev dir file cb =
140   send_to_slave (Export_dir_to (t, dev, dir, file, cb))
141
142 (*----- Slave thread starts here -----*)
143
144 (* Set this to true to exit the thread. *)
145 let quit = ref false
146
147 (* Handles.  These are not protected by locks because only the slave
148  * thread has access to them.
149  *)
150 let conn = ref None
151 let g = ref None
152
153 (* Call 'f ()' with 'dev' mounted read-only.  Ensure that everything
154  * is unmounted even if an exception is thrown.
155  *)
156 let with_mount_ro g dev (f : unit -> 'a) : 'a =
157   Std.finally (fun () -> G.umount_all g) (
158     fun () ->
159       G.mount_ro g dev "/";
160       f ()
161   ) ()
162
163 let rec loop () =
164   debug "thread id %d: top of slave loop ..." (Thread.id (Thread.self ()));
165
166   (* Get the next command. *)
167   let cmd =
168     with_lock q_lock (
169       fun () ->
170         while Q.is_empty q do
171           Cond.wait q_cond q_lock
172         done;
173         Q.pop q
174     ) in
175
176   debug "thread id %d: slave processing command %s ..."
177     (Thread.id (Thread.self ())) (string_of_command cmd);
178
179   (try
180      GtkThread.async !busy_hook ();
181      execute_command cmd;
182    with exn ->
183      (* If a command fails, clear the command queue and run the
184       * failure hook in the main thread.
185       *)
186      discard_command_queue ();
187      GtkThread.async !failure_hook exn
188   );
189
190   (* If there are no more commands in the queue, run the idle hook. *)
191   let r = with_lock q_lock (fun () -> Q.is_empty q) in
192   if r then GtkThread.async !idle_hook ();
193
194   if !quit then Thread.exit ();
195   loop ()
196
197 and execute_command = function
198   | Exit_thread ->
199       quit := true;
200       close_all ()
201
202   | Connect (name, cb) ->
203       close_all ();
204       conn := Some (C.connect_readonly ?name ());
205       GtkThread.async cb ()
206
207   | Get_domains cb ->
208       let conn = get_conn () in
209       let doms = D.get_domains conn [D.ListAll] in
210       let doms = List.map (
211         fun d ->
212           { dom_id = D.get_id d;
213             dom_name = D.get_name d;
214             dom_state = (D.get_info d).D.state }
215       ) doms in
216       let cmp { dom_name = n1 } { dom_name = n2 } = compare n1 n2 in
217       let doms = List.sort ~cmp doms in
218       GtkThread.async cb doms
219
220   | Open_domain (name, cb) ->
221       let conn = get_conn () in
222       let dom = D.lookup_by_name conn name in
223       (* Only permit writes to shut off domains.  This isn't foolproof
224        * since the user could start up the domain while we're running,
225        * which would cause disk corruption.  Until we can negotiate a
226        * feasible locking scheme with libvirt/qemu, this is the best we
227        * can do.
228        *)
229       let rw = write_flag () && (D.get_info dom).D.state = D.InfoShutoff in
230       let rw = if rw then RW else RO in
231       let xml = D.get_xml_desc dom in
232       let images = get_disk_images_from_xml xml in
233       open_disk_images rw images cb
234
235   | Open_images (images, cb) ->
236       let rw = write_flag () in
237       let rw = if rw then RW else RO in
238       open_disk_images rw images cb
239
240   | Get_volumes cb ->
241       let g = get_g () in
242       (* Devices which directly contain filesystems (RHBZ#590167). *)
243       let devices = G.list_devices g in
244       Array.iter (if_mountable_vol g cb) devices;
245       let partitions = G.list_partitions g in
246       Array.iter (if_mountable_vol g cb) partitions;
247       let lvs = G.lvs g in
248       Array.iter (if_mountable_vol g cb) lvs
249
250   | Read_directory (dev, dir, cb) ->
251       let g = get_g () in
252       let names, stats, links =
253         with_mount_ro g dev (
254           fun () ->
255             let names = G.ls g dir in (* sorted and without . and .. *)
256             let names = Array.to_list names in
257             let stats = lstatlist_wrapper g dir names in
258             let links = readlinklist_wrapper g dir names in
259             names, stats, links
260         ) in
261       assert (
262         let n = List.length names in
263         n = List.length stats && n = List.length links
264       );
265       let entries = List.combine (List.combine names stats) links in
266       let entries = List.map (
267         fun ((name, stat), link) ->
268           { dent_name = name; dent_stat = stat; dent_link = link }
269       ) entries in
270       GtkThread.async cb entries
271
272   | Disk_usage (dev, dir, cb) ->
273       let g = get_g () in
274       let kb = with_mount_ro g dev (fun () -> G.du g dir) in
275       GtkThread.async cb kb
276
277   | Export_dir_to (t, dev, dir, file, cb) ->
278       let g = get_g () in
279       with_mount_ro g dev (
280         fun () ->
281           (match t with
282            | Export_tar -> G.tar_out g
283            | Export_tgz -> G.tgz_out g
284            | Export_checksums alg -> G.checksums_out g alg
285            | Export_list -> G.find0 g) dir file
286       );
287       GtkThread.async cb ()
288
289 (* Expect to be connected, and return the current libvirt connection. *)
290 and get_conn () =
291   match !conn with
292   | Some conn -> conn
293   | None -> failwith "not connected to libvirt"
294
295 and get_g () =
296   match !g with
297   | Some g -> g
298   | None -> failwith "no domain or disk image is open"
299
300 (* Close all libvirt and libguestfs handles. *)
301 and close_all () =
302   (match !conn with Some conn -> C.close conn | None -> ());
303   conn := None;
304   close_g ()
305
306 and close_g () =
307   (match !g with Some g -> G.close g | None -> ());
308   g := None
309
310 and get_disk_images_from_xml xml =
311   let xml = Xml.parse_string xml in
312   let devices =
313     match xml with
314     | Xml.Element ("domain", _, children) ->
315         let devices =
316           List.filter_map (
317             function
318             | Xml.Element ("devices", _, devices) -> Some devices
319             | _ -> None
320           ) children in
321         List.concat devices
322     | _ ->
323         failwith "get_xml_desc didn't return <domain/>" in
324   let rec source_of = function          (* <source file|dev=...> *)
325     | [] -> None
326     | Xml.Element ("source", attrs, _) :: rest ->
327         (try Some (List.assoc "dev" attrs)
328          with Not_found ->
329            try Some (List.assoc "file" attrs)
330            with Not_found ->
331              source_of rest)
332     | _ :: rest -> source_of rest
333   in
334   let blkdevs =
335     List.filter_map (
336       function
337       | Xml.Element ("disk", _, children) -> source_of children
338       | _ -> None
339     ) devices in
340   blkdevs
341
342 (* The common code for Open_domain and Open_images which opens the
343  * libguestfs handle, adds the disks, and launches the appliance.
344  *)
345 and open_disk_images rw images cb =
346   debug "opening disk image [%s] in %s mode"
347     (String.concat "; " images) (string_of_rw_flag rw);
348
349   close_g ();
350   let g' = G.create () in
351   g := Some g';
352   let g = g' in
353
354   G.set_verbose g (verbose ());
355
356   let add = (match rw with RO -> G.add_drive_ro | RW -> G.add_drive) g in
357   List.iter add images;
358
359   G.launch g;
360   GtkThread.async cb rw
361
362 (* This is the common function implementing Get_volumes.  Test if a
363  * particular partition contains a mountable filesystem.  We do this
364  * simply by trying to mount it.  If it does, get the rest of the
365  * information for the volume, and call the callback.
366  *)
367 and if_mountable_vol g cb dev =
368   try
369     with_mount_ro g dev (
370       fun () ->
371         let vol_type = G.vfs_type g dev in
372         let vol_label = G.vfs_label g dev in
373         let vol_uuid = G.vfs_uuid g dev in
374         let vol_statvfs = G.statvfs g "/" in
375         let vol = {
376           vol_device = dev; vol_type = vol_type; vol_label = vol_label;
377           vol_uuid = vol_uuid; vol_statvfs = vol_statvfs
378         } in
379         GtkThread.async cb vol
380     )
381   with G.Error msg ->
382     debug "is_mountable: %s: not mountable because: %s" dev msg
383
384 (* guestfs_lstatlist has a "hidden" limit of the protocol message size.
385  * Call this function, but split the list of names into chunks.
386  *)
387 and lstatlist_wrapper g dir = function
388   | [] -> []
389   | names ->
390       let names', names = List.take 1000 names, List.drop 1000 names in
391       let xs = G.lstatlist g dir (Array.of_list names') in
392       let xs = Array.to_list xs in
393       xs @ lstatlist_wrapper g dir names
394
395 (* Same as above for guestfs_readlinklist. *)
396 and readlinklist_wrapper g dir = function
397   | [] -> []
398   | names ->
399       let names', names = List.take 1000 names, List.drop 1000 names in
400       let xs = G.readlinklist g dir (Array.of_list names') in
401       let xs = Array.to_list xs in
402       xs @ readlinklist_wrapper g dir names
403
404 (* Start up one slave thread. *)
405 let slave_thread = Thread.create loop ()
406
407 (* Note the following function is called from the main thread. *)
408 let exit_thread () =
409   discard_command_queue ();
410   send_to_slave Exit_thread;
411   Thread.join slave_thread