Add threading support.
authorRichard W.M. Jones <rjones@redhat.com>
Mon, 21 Oct 2013 13:40:01 +0000 (14:40 +0100)
committerRichard W.M. Jones <rjones@redhat.com>
Mon, 21 Oct 2013 15:04:33 +0000 (16:04 +0100)
.gitignore
Makefile.am
pxzcat.c

index 483e91c..e288ce8 100644 (file)
@@ -2,6 +2,7 @@
 *.o
 
 .deps
+.gdb_history
 Makefile
 Makefile.in
 
index 324e946..ddc657a 100644 (file)
@@ -3,4 +3,5 @@ ACLOCAL_AMFLAGS = -I m4
 bin_PROGRAMS = pxzcat
 
 pxzcat_SOURCES = pxzcat.c
-pxzcat_LDADD = $(LIBLZMA_LIBS)
+pxzcat_CFLAGS = -pthread
+pxzcat_LDADD = $(LIBLZMA_LIBS) -lpthread
index 18cc751..35d0422 100644 (file)
--- a/pxzcat.c
+++ b/pxzcat.c
@@ -44,6 +44,7 @@
 #include <error.h>
 #include <errno.h>
 #include <getopt.h>
+#include <pthread.h>
 
 #include <lzma.h>
 
 #define debug(fs,...) /* nothing */
 #endif
 
+/* Size of buffers used in decompression loop. */
+#define BUFFER_SIZE (64*1024)
+
 #define XZ_HEADER_MAGIC     "\xfd" "7zXZ\0"
 #define XZ_HEADER_MAGIC_LEN 6
 #define XZ_FOOTER_MAGIC     "YZ"
 #define XZ_FOOTER_MAGIC_LEN 2
 
 static void usage (int exitcode);
-static void xzfile_uncompress (const char *filename, const char *outputfile);
+static void xzfile_uncompress (const char *filename, const char *outputfile, unsigned nr_threads);
 static int check_header_magic (int fd);
 static lzma_index *parse_indexes (const char *filename, int fd);
-static void iter_blocks (lzma_index *idx, const char *filename, int fd, const char *outputfile, int ofd);
+static void iter_blocks (lzma_index *idx, unsigned nr_threads, const char *filename, int fd, const char *outputfile, int ofd);
 
 static struct option long_options[] = {
   { "output",   required_argument,  0, 'o' },
+  { "threads",  required_argument,  0, 'T' },
   { "help",     0,                  0, '?' },
   { NULL,       0,                  0, 0   }
 };
 
-static const char *options = "o:";
+static const char *options = "o:T:";
 
 int
 main (int argc, char *argv[])
 {
   int c;
   int longopt_index;
+  unsigned nr_threads = 0;
   const char *outputfile = NULL;
 
   for (;;) {
@@ -95,6 +101,11 @@ main (int argc, char *argv[])
       outputfile = optarg;
       break;
 
+    case 'T':
+      if (sscanf (optarg, "%u", &nr_threads) != 1)
+        error (EXIT_FAILURE, 0, "cannot parse -T option");
+      break;
+
     case '?':
       usage (EXIT_SUCCESS);
 
@@ -103,13 +114,21 @@ main (int argc, char *argv[])
     }
   }
 
-  if (outputfile == NULL)
-    error (EXIT_FAILURE, 0, "you must give the -o (output file) option\n");
-
   if (optind != argc - 1)
     usage (EXIT_FAILURE);
 
-  xzfile_uncompress (argv[optind], outputfile);
+  if (outputfile == NULL)
+    error (EXIT_FAILURE, 0, "you must give the -o (output file) option");
+
+  /* -T 0 (default) means use all cores. */
+  if (nr_threads == 0) {
+    long i = sysconf (_SC_NPROCESSORS_ONLN);
+    if (i <= 0)
+      error (EXIT_FAILURE, errno, "could not get number of cores");
+    nr_threads = (unsigned) i;
+  }
+
+  xzfile_uncompress (argv[optind], outputfile, nr_threads);
 
   exit (EXIT_SUCCESS);
 }
@@ -117,12 +136,13 @@ main (int argc, char *argv[])
 static void
 usage (int exitcode)
 {
-  printf ("usage: pxzcat -o output input.xz\n");
+  printf ("usage: pxzcat -o output [-T #threads] input.xz\n");
   exit (exitcode);
 }
 
 static void
-xzfile_uncompress (const char *filename, const char *outputfile)
+xzfile_uncompress (const char *filename, const char *outputfile,
+                   unsigned nr_threads)
 {
   int fd, ofd;
   uint64_t size;
@@ -150,8 +170,8 @@ xzfile_uncompress (const char *filename, const char *outputfile)
   if (ftruncate (ofd, size) == -1)
     error (EXIT_FAILURE, errno, "ftruncate: %s", outputfile);
 
-  /* Iterate over blocks and uncompress. */
-  iter_blocks (idx, filename, fd, outputfile, ofd);
+  /* Iterate over blocks. */
+  iter_blocks (idx, nr_threads, filename, fd, outputfile, ofd);
 
   close (fd);
 }
@@ -325,8 +345,6 @@ parse_indexes (const char *filename, int fd)
   return combined_index;
 }
 
-#define BUFFER_SIZE (64*1024)
-
 /* Return true iff the buffer is all zero bytes.
  *
  * Note that gcc is smart enough to optimize this properly:
@@ -345,12 +363,95 @@ is_zero (const char *buffer, size_t size)
   return 1;
 }
 
-/* Iterate over the blocks and uncompress. */
+struct global_state {
+  /* Current iterator.  Threads update this, but it is protected by a
+   * mutex, and each thread takes a copy of it when working on it.
+   */
+  lzma_index_iter iter;
+  lzma_bool iter_finished;
+  pthread_mutex_t iter_mutex;
+
+  /* Note that all threads are accessing these fds, so you have
+   * to use pread/pwrite instead of lseek!
+   */
+
+  /* Input file. */
+  const char *filename;
+  int fd;
+
+  /* Output file. */
+  const char *outputfile;
+  int ofd;
+};
+
+struct per_thread_state {
+  unsigned thread_num;
+  struct global_state *global;
+  int status;
+};
+
+/* Create threads to iterate over the blocks and uncompress. */
+static void *worker_thread (void *vp);
+
 static void
-iter_blocks (lzma_index *idx,
+iter_blocks (lzma_index *idx, unsigned nr_threads,
              const char *filename, int fd, const char *outputfile, int ofd)
 {
+  struct global_state global;
+  struct per_thread_state per_thread[nr_threads];
+  pthread_t thread[nr_threads];
+  unsigned u, nr_errors;
+  int err;
+  void *status;
+
+  lzma_index_iter_init (&global.iter, idx);
+  global.iter_finished = 0;
+  err = pthread_mutex_init (&global.iter_mutex, NULL);
+  if (err != 0)
+    error (EXIT_FAILURE, err, "pthread_mutex_init");
+
+  global.filename = filename;
+  global.fd = fd;
+  global.outputfile = outputfile;
+  global.ofd = ofd;
+
+  for (u = 0; u < nr_threads; ++u) {
+    per_thread[u].thread_num = u;
+    per_thread[u].global = &global;
+  }
+
+  /* Start the threads. */
+  for (u = 0; u < nr_threads; ++u) {
+    err = pthread_create (&thread[u], NULL, worker_thread, &per_thread[u]);
+    if (err != 0)
+      error (EXIT_FAILURE, err, "pthread_create (%u)", u);
+  }
+
+  /* Wait for the threads to exit. */
+  nr_errors = 0;
+  for (u = 0; u < nr_threads; ++u) {
+    err = pthread_join (thread[u], &status);
+    if (err != 0) {
+      error (0, err, "pthread_join (%u)", u);
+      nr_errors++;
+    }
+    if (*(int *)status == -1)
+      nr_errors++;
+  }
+
+  if (nr_errors > 0)
+    exit (EXIT_FAILURE);
+}
+
+/* Iterate over the blocks and uncompress. */
+static void *
+worker_thread (void *vp)
+{
+  struct per_thread_state *state = vp;
+  struct global_state *global = state->global;
   lzma_index_iter iter;
+  int err;
+  off_t position, oposition;
   uint8_t header[LZMA_BLOCK_HEADER_SIZE_MAX];
   ssize_t n;
   lzma_block block;
@@ -359,28 +460,52 @@ iter_blocks (lzma_index *idx,
   lzma_stream strm = LZMA_STREAM_INIT;
   char outbuf[BUFFER_SIZE];
   size_t i;
+  lzma_bool iter_finished;
 
-  lzma_index_iter_init (&iter, idx);
-  while (!lzma_index_iter_next (&iter, LZMA_INDEX_ITER_NONEMPTY_BLOCK)) {
-    /* Seek to the start of the block in the input file. */
-    if (lseek (fd, iter.block.compressed_file_offset, SEEK_SET) == -1)
-      error (EXIT_FAILURE, errno, "lseek");
+  state->status = -1;
+
+  for (;;) {
+    /* Get the next block. */
+    err = pthread_mutex_lock (&global->iter_mutex);
+    if (err != 0) abort ();
+    iter_finished = global->iter_finished;
+    if (!iter_finished) {
+      iter_finished = global->iter_finished =
+        lzma_index_iter_next (&global->iter, LZMA_INDEX_ITER_NONEMPTY_BLOCK);
+      if (!iter_finished)
+        /* Take a local copy of this iterator since another thread will
+         * update the global version.
+         */
+        iter = global->iter;
+    }
+    err = pthread_mutex_unlock (&global->iter_mutex);
+    if (err != 0) abort ();
+    if (iter_finished)
+      break;
 
     /* Read the block header.  Start by reading a single byte which
      * tell us how big the block header is.
      */
-    n = read (fd, header, 1);
-    if (n == 0)
-      error (EXIT_FAILURE, 0,
+    position = iter.block.compressed_file_offset;
+    n = pread (global->fd, header, 1, position);
+    if (n == 0) {
+      error (0, 0,
              "%s: read: unexpected end of file reading block header byte",
-             filename);
-    if (n == -1)
-      error (EXIT_FAILURE, errno, "%s: read", filename);
+             global->filename);
+      return &state->status;
+    }
+    if (n == -1) {
+      error (0, errno, "%s: read", global->filename);
+      return &state->status;
+    }
+    position++;
 
-    if (header[0] == '\0')
-      error (EXIT_FAILURE, errno,
+    if (header[0] == '\0') {
+      error (0, errno,
              "%s: read: unexpected invalid block in file, header[0] = 0",
-             filename);
+             global->filename);
+      return &state->status;
+    }
 
     block.version = 0;
     block.check = iter.stream.flags->check;
@@ -388,31 +513,46 @@ iter_blocks (lzma_index *idx,
     block.header_size = lzma_block_header_size_decode (header[0]);
 
     /* Now read and decode the block header. */
-    n = read (fd, &header[1], block.header_size-1);
-    if (n >= 0 && n != block.header_size-1)
-      error (EXIT_FAILURE, 0,
+    n = pread (global->fd, &header[1], block.header_size-1, position);
+    if (n >= 0 && n != block.header_size-1) {
+      error (0, 0,
              "%s: read: unexpected end of file reading block header",
-             filename);
-    if (n == -1)
-      error (EXIT_FAILURE, errno, "%s: read", filename);
+             global->filename);
+      return &state->status;
+    }
+    if (n == -1) {
+      error (0, errno, "%s: read", global->filename);
+      return &state->status;
+    }
+    position += n;
 
     r = lzma_block_header_decode (&block, NULL, header);
-    if (r != LZMA_OK)
-      error (EXIT_FAILURE, errno, "%s: invalid block header (error %d)",
-             filename, r);
+    if (r != LZMA_OK) {
+      error (0, errno, "%s: invalid block header (error %d)",
+             global->filename, r);
+      return &state->status;
+    }
 
     /* What this actually does is it checks that the block header
      * matches the index.
      */
     r = lzma_block_compressed_size (&block, iter.block.unpadded_size);
-    if (r != LZMA_OK)
-      error (EXIT_FAILURE, errno,
-             "%s: cannot calculate compressed size (error %d)", filename, r);
+    if (r != LZMA_OK) {
+      error (0, errno,
+             "%s: cannot calculate compressed size (error %d)",
+             global->filename, r);
+      return &state->status;
+    }
+
+    /* Where we will start writing to. */
+    oposition = iter.block.uncompressed_file_offset;
 
     /* Read the block data and uncompress it. */
     r = lzma_block_decoder (&strm, &block);
-    if (r != LZMA_OK)
-      error (EXIT_FAILURE, 0, "%s: invalid block (error %d)", filename, r);
+    if (r != LZMA_OK) {
+      error (0, 0, "%s: invalid block (error %d)", global->filename, r);
+      return &state->status;
+    }
 
     strm.next_in = NULL;
     strm.avail_in = 0;
@@ -425,9 +565,12 @@ iter_blocks (lzma_index *idx,
 
       if (strm.avail_in == 0) {
         strm.next_in = buf;
-        n = read (fd, buf, sizeof buf);
-        if (n == -1)
-          error (EXIT_FAILURE, errno, "%s: read", filename);
+        n = pread (global->fd, buf, sizeof buf, position);
+        if (n == -1) {
+          error (0, errno, "%s: read", global->filename);
+          return &state->status;
+        }
+        position += n;
         strm.avail_in = n;
         if (n == 0)
           action = LZMA_FINISH;
@@ -438,14 +581,17 @@ iter_blocks (lzma_index *idx,
       if (strm.avail_out == 0 || r == LZMA_STREAM_END) {
         size_t wsz = sizeof outbuf - strm.avail_out;
 
-        if (is_zero (outbuf, wsz)) { /* Seek to preserve sparseness. */
-          if (lseek (ofd, wsz, SEEK_CUR) == (off_t) -1)
-            error (EXIT_FAILURE, errno, "%s: seek", filename);
-        } else {
-          if (write (ofd, outbuf, wsz) != wsz)
+        /* Don't write if the block is all zero, to preserve output file
+         * sparseness.  However we have to update oposition.
+         */
+        if (!is_zero (outbuf, wsz)) {
+          if (pwrite (global->ofd, outbuf, wsz, oposition) != wsz) {
             /* XXX Handle short writes. */
-            error (EXIT_FAILURE, errno, "%s: write", filename);
+            error (0, errno, "%s: write", global->filename);
+            return &state->status;
+          }
         }
+        oposition += wsz;
 
         strm.next_out = outbuf;
         strm.avail_out = sizeof outbuf;
@@ -453,9 +599,12 @@ iter_blocks (lzma_index *idx,
 
       if (r == LZMA_STREAM_END)
         break;
-      if (r != LZMA_OK)
-        error (EXIT_FAILURE, 0,
-               "%s: could not parse block data (error %d)", filename, r);
+      if (r != LZMA_OK) {
+        error (0, 0,
+               "%s: could not parse block data (error %d)",
+               global->filename, r);
+        return &state->status;
+      }
     }
 
     lzma_end (&strm);
@@ -463,4 +612,7 @@ iter_blocks (lzma_index *idx,
     for (i = 0; filters[i].id != LZMA_VLI_UNKNOWN; ++i)
       free (filters[i].options);
   }
+
+  state->status = 0;
+  return &state->status;
 }