mclu version 2
[mclu.git] / parallel.ml
diff --git a/parallel.ml b/parallel.ml
new file mode 100644 (file)
index 0000000..086611e
--- /dev/null
@@ -0,0 +1,84 @@
+(* mclu: Mini Cloud
+ * Copyright (C) 2014-2015 Red Hat Inc.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ *)
+
+(* Simple forking parallel primitives. *)
+
+open Printf
+open Unix
+
+let map f xs =
+  let xs = List.map (
+    fun x ->
+      let rfd, wfd = pipe () in
+      (x, rfd, wfd)
+  ) xs in
+
+  let xs = List.map (
+    fun (x, rfd, wfd) ->
+      match fork () with
+      | 0 ->                            (* child *)
+        close rfd;
+        let y = Printexc.catch f x in
+        (* Write the final value to the pipe. *)
+        output_value (out_channel_of_descr wfd) y;
+        exit 0
+
+      | pid ->                          (* parent *)
+        close wfd;
+        (pid, rfd)
+  ) xs in
+
+  let errors = ref 0 in
+  let xs = List.map (
+    fun (pid, rfd) ->
+      (* Read all the output from the pipe. *)
+      let buf = Buffer.create 13 in
+      let bytes = Bytes.create 4096 in
+      let rec loop () =
+        let len = read rfd bytes 0 (Bytes.length bytes) in
+        if len > 0 then (
+          Buffer.add_subbytes buf bytes 0 len;
+          loop ()
+        )
+      in
+      loop ();
+      let str = Buffer.to_bytes buf in
+
+      (* Wait for the subprocess. *)
+      match waitpid [] pid with
+      | _, WEXITED 0 -> str
+      | pid, WEXITED i ->
+        eprintf "mclu: subprocess pid %d died with exit status %d\n" pid i;
+        incr errors;
+        Bytes.empty
+      | pid, WSIGNALED i ->
+        eprintf "mclu: subprocess pid %d died with signal %d\n" pid i;
+        incr errors;
+        Bytes.empty
+      | pid, WSTOPPED i ->
+        eprintf "mclu: subprocess pid %d stopped with signal %d\n" pid i;
+        incr errors;
+        Bytes.empty
+  ) xs in
+
+  if !errors > 0 then
+    exit 1;
+
+  xs
+
+let iter f xs = ignore (map f xs)