Disable debugging.
[pxzcat.git] / pxzcat.c
index 5348be1..437d51e 100644 (file)
--- a/pxzcat.c
+++ b/pxzcat.c
 #include <sys/types.h>
 #include <error.h>
 #include <errno.h>
+#include <getopt.h>
+#include <pthread.h>
 
 #include <lzma.h>
 
-#define DEBUG 1
+#define DEBUG 0
 
 #if DEBUG
-#define debug(fs...) fprintf (stderr, "pxzcat: debug: " fs ##__VA_ARGS__)
+#define debug(fs,...) fprintf (stderr, "pxzcat: debug: " fs "\n", ## __VA_ARGS__)
 #else
-#define debug(fs...) /* nothing */
+#define debug(fs,...) /* nothing */
 #endif
 
+/* Size of buffers used in decompression loop. */
+#define BUFFER_SIZE (64*1024)
+
 #define XZ_HEADER_MAGIC     "\xfd" "7zXZ\0"
 #define XZ_HEADER_MAGIC_LEN 6
 #define XZ_FOOTER_MAGIC     "YZ"
 #define XZ_FOOTER_MAGIC_LEN 2
 
-static void xzfile_uncompress (const char *filename, const char *outputfile);
+static void usage (int exitcode);
+static void xzfile_uncompress (const char *filename, const char *outputfile, unsigned nr_threads);
 static int check_header_magic (int fd);
-static lzma_index *parse_indexes (const char *filename, int fd, size_t *);
-static void iter_indexes (lzma_index *idx);
+static lzma_index *parse_indexes (const char *filename, int fd);
+static void iter_blocks (lzma_index *idx, unsigned nr_threads, const char *filename, int fd, const char *outputfile, int ofd);
 
 static struct option long_options[] = {
   { "output",   required_argument,  0, 'o' },
+  { "threads",  required_argument,  0, 'T' },
+  { "help",     0,                  0, '?' },
   { NULL,       0,                  0, 0   }
 };
 
-static const char *options = "o:";
+static const char *options = "o:T:";
 
 int
 main (int argc, char *argv[])
 {
   int c;
-  int optind;
+  int longopt_index;
+  unsigned nr_threads = 0;
   const char *outputfile = NULL;
 
   for (;;) {
-    c = getopt_long (argc, argv, options, long_options, &optind);
+    c = getopt_long (argc, argv, options, long_options, &longopt_index);
     if (c == -1)
       break;
 
@@ -92,33 +101,55 @@ main (int argc, char *argv[])
       outputfile = optarg;
       break;
 
+    case 'T':
+      if (sscanf (optarg, "%u", &nr_threads) != 1)
+        error (EXIT_FAILURE, 0, "cannot parse -T option");
+      break;
+
     case '?':
+      usage (EXIT_SUCCESS);
+
     default:
-      error (EXIT_FAILURE, 0, "usage: %s -o output file\n", argv[0]);
+      usage (EXIT_FAILURE);
     }
   }
 
-  if (outputfile == NULL)
-    error (EXIT_FAILURE, 0, "%s: you must give the -o (output file) option\n",
-           argv[0]);
-
   if (optind != argc - 1)
-    error (EXIT_FAILURE, 0, "%s: input.xz\n", argv[0]);
+    usage (EXIT_FAILURE);
 
-  xzfile_uncompress (argv[optind], outputfile);
+  if (outputfile == NULL)
+    error (EXIT_FAILURE, 0, "you must give the -o (output file) option");
+
+  /* -T 0 (default) means use all cores. */
+  if (nr_threads == 0) {
+    long i = sysconf (_SC_NPROCESSORS_ONLN);
+    if (i <= 0)
+      error (EXIT_FAILURE, errno, "could not get number of cores");
+    nr_threads = (unsigned) i;
+  }
+
+  xzfile_uncompress (argv[optind], outputfile, nr_threads);
 
   exit (EXIT_SUCCESS);
 }
 
 static void
-xzfile_uncompress (const char *filename, const char *outputfile)
+usage (int exitcode)
 {
-  int fd;
+  printf ("usage: pxzcat -o output [-T #threads] input.xz\n");
+  exit (exitcode);
+}
+
+static void
+xzfile_uncompress (const char *filename, const char *outputfile,
+                   unsigned nr_threads)
+{
+  int fd, ofd;
   uint64_t size;
   lzma_index *idx;
 
   /* Open the file. */
-  fd = open (filename, O_RDONLY|O_CLOEXEC);
+  fd = open (filename, O_RDONLY);
   if (fd == -1)
     error (EXIT_FAILURE, errno, "open: %s", filename);
 
@@ -129,8 +160,18 @@ xzfile_uncompress (const char *filename, const char *outputfile)
   /* Read and parse the indexes. */
   idx = parse_indexes (filename, fd);
 
-  /* Iterate over indexes and uncompress. */
-  iter_indexes (idx);
+  /* Get the file uncompressed size, create the output file. */
+  size = lzma_index_uncompressed_size (idx);
+  debug ("uncompressed size = %" PRIu64 " bytes", size);
+
+  ofd = open (outputfile, O_WRONLY|O_CREAT|O_TRUNC|O_NOCTTY, 0644);
+  if (ofd == -1)
+    error (EXIT_FAILURE, errno, "open: %s", outputfile);
+  if (ftruncate (ofd, size) == -1)
+    error (EXIT_FAILURE, errno, "ftruncate: %s", outputfile);
+
+  /* Iterate over blocks. */
+  iter_blocks (idx, nr_threads, filename, fd, outputfile, ofd);
 
   close (fd);
 }
@@ -246,7 +287,7 @@ parse_indexes (const char *filename, int fd)
       r = lzma_code (&strm, LZMA_RUN);
     } while (r == LZMA_OK);
 
-    if (r != LZMA_STREAM_END) {
+    if (r != LZMA_STREAM_END)
       error (EXIT_FAILURE, 0, "%s: could not parse index (error %d)",
              filename, r);
 
@@ -304,168 +345,274 @@ parse_indexes (const char *filename, int fd)
   return combined_index;
 }
 
-/* Iterate over the indexes and uncompress.
+/* Return true iff the buffer is all zero bytes.
+ *
+ * Note that gcc is smart enough to optimize this properly:
+ * http://stackoverflow.com/questions/1493936/faster-means-of-checking-for-an-empty-buffer-in-c/1493989#1493989
  */
-static void
-iter_indexes (lzma_index *idx)
+static inline int
+is_zero (const char *buffer, size_t size)
 {
+  size_t i;
+
+  for (i = 0; i < size; ++i) {
+    if (buffer[i] != 0)
+      return 0;
+  }
+
+  return 1;
+}
+
+struct global_state {
+  /* Current iterator.  Threads update this, but it is protected by a
+   * mutex, and each thread takes a copy of it when working on it.
+   */
   lzma_index_iter iter;
+  lzma_bool iter_finished;
+  pthread_mutex_t iter_mutex;
+
+  /* Note that all threads are accessing these fds, so you have
+   * to use pread/pwrite instead of lseek!
+   */
+
+  /* Input file. */
+  const char *filename;
+  int fd;
 
-  lzma_index_iter_init (&iter, idx);
-  while (!lzma_index_iter_next (&iter, LZMA_INDEX_ITER_NONEMPTY_BLOCK)) {
-    abort ();
+  /* Output file. */
+  const char *outputfile;
+  int ofd;
+};
+
+struct per_thread_state {
+  unsigned thread_num;
+  struct global_state *global;
+  int status;
+};
 
+/* Create threads to iterate over the blocks and uncompress. */
+static void *worker_thread (void *vp);
 
+static void
+iter_blocks (lzma_index *idx, unsigned nr_threads,
+             const char *filename, int fd, const char *outputfile, int ofd)
+{
+  struct global_state global;
+  struct per_thread_state per_thread[nr_threads];
+  pthread_t thread[nr_threads];
+  unsigned u, nr_errors;
+  int err;
+  void *status;
+
+  lzma_index_iter_init (&global.iter, idx);
+  global.iter_finished = 0;
+  err = pthread_mutex_init (&global.iter_mutex, NULL);
+  if (err != 0)
+    error (EXIT_FAILURE, err, "pthread_mutex_init");
+
+  global.filename = filename;
+  global.fd = fd;
+  global.outputfile = outputfile;
+  global.ofd = ofd;
+
+  for (u = 0; u < nr_threads; ++u) {
+    per_thread[u].thread_num = u;
+    per_thread[u].global = &global;
+  }
 
+  /* Start the threads. */
+  for (u = 0; u < nr_threads; ++u) {
+    err = pthread_create (&thread[u], NULL, worker_thread, &per_thread[u]);
+    if (err != 0)
+      error (EXIT_FAILURE, err, "pthread_create (%u)", u);
+  }
 
+  /* Wait for the threads to exit. */
+  nr_errors = 0;
+  for (u = 0; u < nr_threads; ++u) {
+    err = pthread_join (thread[u], &status);
+    if (err != 0) {
+      error (0, err, "pthread_join (%u)", u);
+      nr_errors++;
+    }
+    if (*(int *)status == -1)
+      nr_errors++;
   }
 
-  return 0;
+  if (nr_errors > 0)
+    exit (EXIT_FAILURE);
 }
 
-#if 0
-char *
-xzfile_read_block (xzfile *xz, uint64_t offset,
-                   uint64_t *start_rtn, uint64_t *size_rtn)
+/* Iterate over the blocks and uncompress. */
+static void *
+worker_thread (void *vp)
 {
+  struct per_thread_state *state = vp;
+  struct global_state *global = state->global;
   lzma_index_iter iter;
+  int err;
+  off_t position, oposition;
   uint8_t header[LZMA_BLOCK_HEADER_SIZE_MAX];
+  ssize_t n;
   lzma_block block;
   lzma_filter filters[LZMA_FILTERS_MAX + 1];
   lzma_ret r;
   lzma_stream strm = LZMA_STREAM_INIT;
-  char *data;
-  ssize_t n;
+  char outbuf[BUFFER_SIZE];
   size_t i;
+  lzma_bool iter_finished;
 
-  /* Locate the block containing the uncompressed offset. */
-  lzma_index_iter_init (&iter, xz->idx);
-  if (lzma_index_iter_locate (&iter, offset)) {
-    nbdkit_error ("cannot find offset %" PRIu64 " in the xz file", offset);
-    return NULL;
-  }
-
-  *start_rtn = iter.block.uncompressed_file_offset;
-  *size_rtn = iter.block.uncompressed_size;
-
-  nbdkit_debug ("seek: block number %d at file offset %" PRIu64,
-                (int) iter.block.number_in_file,
-                (uint64_t) iter.block.compressed_file_offset);
-
-  if (lseek (xz->fd, iter.block.compressed_file_offset, SEEK_SET) == -1) {
-    nbdkit_error ("lseek: %m");
-    return NULL;
-  }
+  state->status = -1;
 
-  /* Read the block header.  Start by reading a single byte which
-   * tell us how big the block header is.
-   */
-  n = read (xz->fd, header, 1);
-  if (n == 0) {
-    nbdkit_error ("read: unexpected end of file reading block header byte");
-    return NULL;
-  }
-  if (n == -1) {
-    nbdkit_error ("read: %m");
-    return NULL;
-  }
+  for (;;) {
+    /* Get the next block. */
+    err = pthread_mutex_lock (&global->iter_mutex);
+    if (err != 0) abort ();
+    iter_finished = global->iter_finished;
+    if (!iter_finished) {
+      iter_finished = global->iter_finished =
+        lzma_index_iter_next (&global->iter, LZMA_INDEX_ITER_NONEMPTY_BLOCK);
+      if (!iter_finished)
+        /* Take a local copy of this iterator since another thread will
+         * update the global version.
+         */
+        iter = global->iter;
+    }
+    err = pthread_mutex_unlock (&global->iter_mutex);
+    if (err != 0) abort ();
+    if (iter_finished)
+      break;
 
-  if (header[0] == '\0') {
-    nbdkit_error ("read: unexpected invalid block in file, header[0] = 0");
-    return NULL;
-  }
+    /* Read the block header.  Start by reading a single byte which
+     * tell us how big the block header is.
+     */
+    position = iter.block.compressed_file_offset;
+    n = pread (global->fd, header, 1, position);
+    if (n == 0) {
+      error (0, 0,
+             "%s: read: unexpected end of file reading block header byte",
+             global->filename);
+      return &state->status;
+    }
+    if (n == -1) {
+      error (0, errno, "%s: read", global->filename);
+      return &state->status;
+    }
+    position++;
 
-  block.version = 0;
-  block.check = iter.stream.flags->check;
-  block.filters = filters;
-  block.header_size = lzma_block_header_size_decode (header[0]);
+    if (header[0] == '\0') {
+      error (0, errno,
+             "%s: read: unexpected invalid block in file, header[0] = 0",
+             global->filename);
+      return &state->status;
+    }
 
-  /* Now read and decode the block header. */
-  n = read (xz->fd, &header[1], block.header_size-1);
-  if (n >= 0 && n != block.header_size-1) {
-    nbdkit_error ("read: unexpected end of file reading block header");
-    return NULL;
-  }
-  if (n == -1) {
-    nbdkit_error ("read: %m");
-    return NULL;
-  }
+    block.version = 0;
+    block.check = iter.stream.flags->check;
+    block.filters = filters;
+    block.header_size = lzma_block_header_size_decode (header[0]);
+
+    /* Now read and decode the block header. */
+    n = pread (global->fd, &header[1], block.header_size-1, position);
+    if (n >= 0 && n != block.header_size-1) {
+      error (0, 0,
+             "%s: read: unexpected end of file reading block header",
+             global->filename);
+      return &state->status;
+    }
+    if (n == -1) {
+      error (0, errno, "%s: read", global->filename);
+      return &state->status;
+    }
+    position += n;
 
-  r = lzma_block_header_decode (&block, NULL, header);
-  if (r != LZMA_OK) {
-    nbdkit_error ("invalid block header (error %d)", r);
-    return NULL;
-  }
+    r = lzma_block_header_decode (&block, NULL, header);
+    if (r != LZMA_OK) {
+      error (0, errno, "%s: invalid block header (error %d)",
+             global->filename, r);
+      return &state->status;
+    }
 
-  /* What this actually does is it checks that the block header
-   * matches the index.
-   */
-  r = lzma_block_compressed_size (&block, iter.block.unpadded_size);
-  if (r != LZMA_OK) {
-    nbdkit_error ("cannot calculate compressed size (error %d)", r);
-    goto err1;
-  }
+    /* What this actually does is it checks that the block header
+     * matches the index.
+     */
+    r = lzma_block_compressed_size (&block, iter.block.unpadded_size);
+    if (r != LZMA_OK) {
+      error (0, errno,
+             "%s: cannot calculate compressed size (error %d)",
+             global->filename, r);
+      return &state->status;
+    }
 
-  /* Read the block data. */
-  r = lzma_block_decoder (&strm, &block);
-  if (r != LZMA_OK) {
-    nbdkit_error ("invalid block (error %d)", r);
-    goto err1;
-  }
+    /* Where we will start writing to. */
+    oposition = iter.block.uncompressed_file_offset;
 
-  data = malloc (*size_rtn);
-  if (data == NULL) {
-    nbdkit_error ("malloc (%zu bytes): %m\n"
-                  "NOTE: If this error occurs, you need to recompress your xz files with a smaller block size.  Use: 'xz --block-size=16777216 ...'.",
-                  *size_rtn);
-    goto err1;
-  }
+    /* Read the block data and uncompress it. */
+    r = lzma_block_decoder (&strm, &block);
+    if (r != LZMA_OK) {
+      error (0, 0, "%s: invalid block (error %d)", global->filename, r);
+      return &state->status;
+    }
 
-  strm.next_in = NULL;
-  strm.avail_in = 0;
-  strm.next_out = (uint8_t *) data;
-  strm.avail_out = block.uncompressed_size;
+    strm.next_in = NULL;
+    strm.avail_in = 0;
+    strm.next_out = outbuf;
+    strm.avail_out = sizeof outbuf;
+
+    for (;;) {
+      uint8_t buf[BUFFER_SIZE];
+      lzma_action action = LZMA_RUN;
+
+      if (strm.avail_in == 0) {
+        strm.next_in = buf;
+        n = pread (global->fd, buf, sizeof buf, position);
+        if (n == -1) {
+          error (0, errno, "%s: read", global->filename);
+          return &state->status;
+        }
+        position += n;
+        strm.avail_in = n;
+        if (n == 0)
+          action = LZMA_FINISH;
+      }
 
-  do {
-    uint8_t buf[BUFSIZ];
-    lzma_action action = LZMA_RUN;
+      r = lzma_code (&strm, action);
+
+      if (strm.avail_out == 0 || r == LZMA_STREAM_END) {
+        size_t wsz = sizeof outbuf - strm.avail_out;
+
+        /* Don't write if the block is all zero, to preserve output file
+         * sparseness.  However we have to update oposition.
+         */
+        if (!is_zero (outbuf, wsz)) {
+          if (pwrite (global->ofd, outbuf, wsz, oposition) != wsz) {
+            /* XXX Handle short writes. */
+            error (0, errno, "%s: write", global->filename);
+            return &state->status;
+          }
+        }
+        oposition += wsz;
+
+        strm.next_out = outbuf;
+        strm.avail_out = sizeof outbuf;
+      }
 
-    if (strm.avail_in == 0) {
-      strm.next_in = buf;
-      n = read (xz->fd, buf, sizeof buf);
-      if (n == -1) {
-        nbdkit_error ("read: %m");
-        goto err2;
+      if (r == LZMA_STREAM_END)
+        break;
+      if (r != LZMA_OK) {
+        error (0, 0,
+               "%s: could not parse block data (error %d)",
+               global->filename, r);
+        return &state->status;
       }
-      strm.avail_in = n;
-      if (n == 0)
-        action = LZMA_FINISH;
     }
 
-    strm.avail_in = n;
-    strm.next_in = buf;
-    r = lzma_code (&strm, action);
-  } while (r == LZMA_OK);
+    lzma_end (&strm);
 
-  if (r != LZMA_OK && r != LZMA_STREAM_END) {
-    nbdkit_error ("could not parse block data (error %d)", r);
-    goto err2;
+    for (i = 0; filters[i].id != LZMA_VLI_UNKNOWN; ++i)
+      free (filters[i].options);
   }
 
-  lzma_end (&strm);
-
-  for (i = 0; filters[i].id != LZMA_VLI_UNKNOWN; ++i)
-    free (filters[i].options);
-
-  return data;
-
- err2:
-  free (data);
-  lzma_end (&strm);
- err1:
-  for (i = 0; filters[i].id != LZMA_VLI_UNKNOWN; ++i)
-    free (filters[i].options);
-
-  return NULL;
+  state->status = 0;
+  return &state->status;
 }
-#endif