X-Git-Url: http://git.annexia.org/?p=guestfs-browser.git;a=blobdiff_plain;f=slave.ml;h=d1cff80b5064f165b1a8050dabab55171d3203eb;hp=1017dd89624a0ae1c856441575cc0ee7080bde10;hb=b07102fda0034da5840a9f33bd6d404a195b8cc9;hpb=bbfe03c47f1d7f03c3e6c0cab9e4f500f588c80a;ds=sidebyside diff --git a/slave.ml b/slave.ml index 1017dd8..d1cff80 100644 --- a/slave.ml +++ b/slave.ml @@ -16,6 +16,8 @@ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. *) +open ExtList +open Printf open Utils module C = Libvirt.Connect @@ -32,6 +34,10 @@ type command = | Exit_thread | Connect of string option * unit callback | Get_domains of domain list callback + | Open_domain of string * rw_flag callback + | Open_images of string list * rw_flag callback + | Get_volumes of volume callback + | Read_directory of string * string * direntry list callback and domain = { dom_id : int; @@ -39,6 +45,35 @@ and domain = { dom_state : D.state; } +and rw_flag = RO | RW + +and volume = { + vol_device : string; + vol_type : string; + vol_label : string; + vol_uuid : string; + vol_statvfs : Guestfs.statvfs; +} + +and direntry = { + dent_name : string; + dent_stat : Guestfs.stat; + dent_link : string; +} + +let string_of_command = function + | Exit_thread -> "Exit_thread" + | Connect (Some name, _) -> sprintf "Connect %s" name + | Connect (None, _) -> "Connect NULL" + | Get_domains _ -> "Get_domains" + | Open_domain (name, _) -> sprintf "Open_domain %s" name + | Open_images (images, _) -> + sprintf "Open_images [%s]" (String.concat "; " images) + | Get_volumes _ -> "Get_volumes" + | Read_directory (dev, dir, _) -> sprintf "Read_directory %s %s" dev dir + +let string_of_rw_flag = function RO -> "RO" | RW -> "RW" + let no_callback _ = () let failure_hook = ref (fun _ -> ()) @@ -68,7 +103,7 @@ let q_cond = Cond.create () (* Send a command message to the slave thread. *) let send_to_slave cmd = - debug "sending message %s to slave thread ..." (string_of_command cmd) + debug "sending message %s to slave thread ..." (string_of_command cmd); with_lock q_lock ( fun () -> Q.push cmd q; @@ -79,12 +114,32 @@ let discard_command_queue () = with_lock q_lock (fun () -> Q.clear q) let connect uri cb = send_to_slave (Connect (uri, cb)) let get_domains cb = send_to_slave (Get_domains cb) +let get_volumes cb = send_to_slave (Get_volumes cb) +let open_domain name cb = send_to_slave (Open_domain (name, cb)) +let open_images images cb = send_to_slave (Open_images (images, cb)) +let read_directory dev dir cb = send_to_slave (Read_directory (dev, dir, cb)) (*----- Slave thread starts here -----*) (* Set this to true to exit the thread. *) let quit = ref false +(* Handles. These are not protected by locks because only the slave + * thread has access to them. + *) +let conn = ref None +let g = ref None + +(* Call 'f ()' with 'dev' mounted read-only. Ensure that everything + * is unmounted even if an exception is thrown. + *) +let with_mount_ro g dev (f : unit -> 'a) : 'a = + Std.finally (fun () -> G.umount_all g) ( + fun () -> + G.mount_ro g dev "/"; + f () + ) () + let rec loop () = (* Get the next command. *) let cmd = @@ -96,57 +151,216 @@ let rec loop () = Q.pop q ) in - debug "slave thread processing command %s ..." (string_of_command cmd); + debug "thread id %d: slave processing command %s ..." + (Thread.id (Thread.self ())) (string_of_command cmd); (try - call_callback !busy_hook (); + GtkThread.async !busy_hook (); execute_command cmd; - call_callback !idle_hook (); with exn -> (* If a command fails, clear the command queue and run the * failure hook in the main thread. *) - call_callback !idle_hook (); discard_command_queue (); - call_callback !failure_hook exn + GtkThread.async !failure_hook exn ); + (* If there are no more commands in the queue, run the idle hook. *) + let r = with_lock q_lock (fun () -> Q.is_empty q) in + if r then GtkThread.async !idle_hook (); + if !quit then Thread.exit (); loop () and execute_command = function | Exit_thread -> quit := true; - disconnect_all () + close_all () - | Connect (uri, cb) -> - disconnect_all (); - conn := Some (C.connect_readonly ?uri ()); - call_callback cb () + | Connect (name, cb) -> + close_all (); + conn := Some (C.connect_readonly ?name ()); + GtkThread.async cb () | Get_domains cb -> let conn = get_conn () in let doms = D.get_domains conn [D.ListAll] in let doms = List.map ( fun d -> - D.get_id d, D.get_name d, (D.get_info d).D.state + { dom_id = D.get_id d; + dom_name = D.get_name d; + dom_state = (D.get_info d).D.state } ) doms in - call_callback cb doms + let cmp { dom_name = n1 } { dom_name = n2 } = compare n1 n2 in + let doms = List.sort ~cmp doms in + GtkThread.async cb doms + + | Open_domain (name, cb) -> + let conn = get_conn () in + let dom = D.lookup_by_name conn name in + (* Only permit writes to shut off domains. This isn't foolproof + * since the user could start up the domain while we're running, + * which would cause disk corruption. Until we can negotiate a + * feasible locking scheme with libvirt/qemu, this is the best we + * can do. + *) + let rw = write_flag () && (D.get_info dom).D.state = D.InfoShutoff in + let rw = if rw then RW else RO in + let xml = D.get_xml_desc dom in + let images = get_disk_images_from_xml xml in + open_disk_images rw images cb + + | Open_images (images, cb) -> + let rw = write_flag () in + let rw = if rw then RW else RO in + open_disk_images rw images cb -(* Call a callback function or hook in the main thread. *) -and call_callback cb arg = - GtkThread.async cb arg + | Get_volumes cb -> + let g = get_g () in + (* Devices which directly contain filesystems (RHBZ#590167). *) + let devices = G.list_devices g in + Array.iter (if_mountable_vol g cb) devices; + let partitions = G.list_partitions g in + Array.iter (if_mountable_vol g cb) partitions; + let lvs = G.lvs g in + Array.iter (if_mountable_vol g cb) lvs + + | Read_directory (dev, dir, cb) -> + let g = get_g () in + let names, stats, links = + with_mount_ro g dev ( + fun () -> + let names = G.ls g dir in (* sorted and without . and .. *) + let names = Array.to_list names in + let stats = lstatlist_wrapper g dir names in + let links = readlinklist_wrapper g dir names in + names, stats, links + ) in + assert ( + let n = List.length names in + n = List.length stats && n = List.length links + ); + let entries = List.combine (List.combine names stats) links in + let entries = List.map ( + fun ((name, stat), link) -> + { dent_name = name; dent_stat = stat; dent_link = link } + ) entries in + GtkThread.async cb entries (* Expect to be connected, and return the current libvirt connection. *) -let get_conn () = +and get_conn () = match !conn with | Some conn -> conn | None -> failwith "not connected to libvirt" +and get_g () = + match !g with + | Some g -> g + | None -> failwith "no domain or disk image is open" + (* Close all libvirt and libguestfs handles. *) -and disconnect_all () = +and close_all () = (match !conn with Some conn -> C.close conn | None -> ()); - conn := None + conn := None; + close_g () + +and close_g () = + (match !g with Some g -> G.close g | None -> ()); + g := None + +and get_disk_images_from_xml xml = + let xml = Xml.parse_string xml in + let devices = + match xml with + | Xml.Element ("domain", _, children) -> + let devices = + List.filter_map ( + function + | Xml.Element ("devices", _, devices) -> Some devices + | _ -> None + ) children in + List.concat devices + | _ -> + failwith "get_xml_desc didn't return " in + let rec source_of = function (* *) + | [] -> None + | Xml.Element ("source", attrs, _) :: rest -> + (try Some (List.assoc "dev" attrs) + with Not_found -> + try Some (List.assoc "file" attrs) + with Not_found -> + source_of rest) + | _ :: rest -> source_of rest + in + let blkdevs = + List.filter_map ( + function + | Xml.Element ("disk", _, children) -> source_of children + | _ -> None + ) devices in + blkdevs + +(* The common code for Open_domain and Open_images which opens the + * libguestfs handle, adds the disks, and launches the appliance. + *) +and open_disk_images rw images cb = + debug "opening disk image [%s] in %s mode" + (String.concat "; " images) (string_of_rw_flag rw); + + close_g (); + let g' = G.create () in + g := Some g'; + let g = g' in + + G.set_verbose g (verbose ()); + + let add = (match rw with RO -> G.add_drive_ro | RW -> G.add_drive) g in + List.iter add images; + + G.launch g; + GtkThread.async cb rw + +(* This is the common function implementing Get_volumes. Test if a + * particular partition contains a mountable filesystem. We do this + * simply by trying to mount it. If it does, get the rest of the + * information for the volume, and call the callback. + *) +and if_mountable_vol g cb dev = + try + with_mount_ro g dev ( + fun () -> + let vol_type = G.vfs_type g dev in + let vol_label = G.vfs_label g dev in + let vol_uuid = G.vfs_uuid g dev in + let vol_statvfs = G.statvfs g "/" in + let vol = { + vol_device = dev; vol_type = vol_type; vol_label = vol_label; + vol_uuid = vol_uuid; vol_statvfs = vol_statvfs + } in + GtkThread.async cb vol + ) + with G.Error msg -> + debug "is_mountable: %s: not mountable because: %s" dev msg + +(* guestfs_lstatlist has a "hidden" limit of the protocol message size. + * Call this function, but split the list of names into chunks. + *) +and lstatlist_wrapper g dir = function + | [] -> [] + | names -> + let names', names = List.take 1000 names, List.drop 1000 names in + let xs = G.lstatlist g dir (Array.of_list names') in + let xs = Array.to_list xs in + xs @ lstatlist_wrapper g dir names + +(* Same as above for guestfs_readlinklist. *) +and readlinklist_wrapper g dir = function + | [] -> [] + | names -> + let names', names = List.take 1000 names, List.drop 1000 names in + let xs = G.readlinklist g dir (Array.of_list names') in + let xs = Array.to_list xs in + xs @ readlinklist_wrapper g dir names (* Start up one slave thread. *) let slave_thread = Thread.create loop ()