#include <sys/types.h>
#include <error.h>
#include <errno.h>
+#include <getopt.h>
+#include <pthread.h>
#include <lzma.h>
#define DEBUG 1
#if DEBUG
-#define debug(fs...) fprintf (stderr, "pxzcat: debug: " fs ##__VA_ARGS__)
+#define debug(fs,...) fprintf (stderr, "pxzcat: debug: " fs "\n", ## __VA_ARGS__)
#else
-#define debug(fs...) /* nothing */
+#define debug(fs,...) /* nothing */
#endif
+/* Size of buffers used in decompression loop. */
+#define BUFFER_SIZE (64*1024)
+
#define XZ_HEADER_MAGIC "\xfd" "7zXZ\0"
#define XZ_HEADER_MAGIC_LEN 6
#define XZ_FOOTER_MAGIC "YZ"
#define XZ_FOOTER_MAGIC_LEN 2
-static void xzfile_uncompress (const char *filename, const char *outputfile);
+static void usage (int exitcode);
+static void xzfile_uncompress (const char *filename, const char *outputfile, unsigned nr_threads);
static int check_header_magic (int fd);
-static lzma_index *parse_indexes (const char *filename, int fd, size_t *);
-static void iter_indexes (lzma_index *idx);
+static lzma_index *parse_indexes (const char *filename, int fd);
+static void iter_blocks (lzma_index *idx, unsigned nr_threads, const char *filename, int fd, const char *outputfile, int ofd);
static struct option long_options[] = {
{ "output", required_argument, 0, 'o' },
+ { "threads", required_argument, 0, 'T' },
+ { "help", 0, 0, '?' },
{ NULL, 0, 0, 0 }
};
-static const char *options = "o:";
+static const char *options = "o:T:";
int
main (int argc, char *argv[])
{
int c;
- int optind;
+ int longopt_index;
+ unsigned nr_threads = 0;
const char *outputfile = NULL;
for (;;) {
- c = getopt_long (argc, argv, options, long_options, &optind);
+ c = getopt_long (argc, argv, options, long_options, &longopt_index);
if (c == -1)
break;
outputfile = optarg;
break;
+ case 'T':
+ if (sscanf (optarg, "%u", &nr_threads) != 1)
+ error (EXIT_FAILURE, 0, "cannot parse -T option");
+ break;
+
case '?':
+ usage (EXIT_SUCCESS);
+
default:
- error (EXIT_FAILURE, 0, "usage: %s -o output file\n", argv[0]);
+ usage (EXIT_FAILURE);
}
}
- if (outputfile == NULL)
- error (EXIT_FAILURE, 0, "%s: you must give the -o (output file) option\n",
- argv[0]);
-
if (optind != argc - 1)
- error (EXIT_FAILURE, 0, "%s: input.xz\n", argv[0]);
+ usage (EXIT_FAILURE);
- xzfile_uncompress (argv[optind], outputfile);
+ if (outputfile == NULL)
+ error (EXIT_FAILURE, 0, "you must give the -o (output file) option");
+
+ /* -T 0 (default) means use all cores. */
+ if (nr_threads == 0) {
+ long i = sysconf (_SC_NPROCESSORS_ONLN);
+ if (i <= 0)
+ error (EXIT_FAILURE, errno, "could not get number of cores");
+ nr_threads = (unsigned) i;
+ }
+
+ xzfile_uncompress (argv[optind], outputfile, nr_threads);
exit (EXIT_SUCCESS);
}
static void
-xzfile_uncompress (const char *filename, const char *outputfile)
+usage (int exitcode)
{
- int fd;
+ printf ("usage: pxzcat -o output [-T #threads] input.xz\n");
+ exit (exitcode);
+}
+
+static void
+xzfile_uncompress (const char *filename, const char *outputfile,
+ unsigned nr_threads)
+{
+ int fd, ofd;
uint64_t size;
lzma_index *idx;
/* Open the file. */
- fd = open (filename, O_RDONLY|O_CLOEXEC);
+ fd = open (filename, O_RDONLY);
if (fd == -1)
error (EXIT_FAILURE, errno, "open: %s", filename);
/* Read and parse the indexes. */
idx = parse_indexes (filename, fd);
- /* Iterate over indexes and uncompress. */
- iter_indexes (idx);
+ /* Get the file uncompressed size, create the output file. */
+ size = lzma_index_uncompressed_size (idx);
+ debug ("uncompressed size = %" PRIu64 " bytes", size);
+
+ ofd = open (outputfile, O_WRONLY|O_CREAT|O_TRUNC|O_NOCTTY, 0644);
+ if (ofd == -1)
+ error (EXIT_FAILURE, errno, "open: %s", outputfile);
+ if (ftruncate (ofd, size) == -1)
+ error (EXIT_FAILURE, errno, "ftruncate: %s", outputfile);
+
+ /* Iterate over blocks. */
+ iter_blocks (idx, nr_threads, filename, fd, outputfile, ofd);
close (fd);
}
r = lzma_code (&strm, LZMA_RUN);
} while (r == LZMA_OK);
- if (r != LZMA_STREAM_END) {
+ if (r != LZMA_STREAM_END)
error (EXIT_FAILURE, 0, "%s: could not parse index (error %d)",
filename, r);
return combined_index;
}
-/* Iterate over the indexes and uncompress.
+/* Return true iff the buffer is all zero bytes.
+ *
+ * Note that gcc is smart enough to optimize this properly:
+ * http://stackoverflow.com/questions/1493936/faster-means-of-checking-for-an-empty-buffer-in-c/1493989#1493989
*/
-static void
-iter_indexes (lzma_index *idx)
+static inline int
+is_zero (const char *buffer, size_t size)
{
+ size_t i;
+
+ for (i = 0; i < size; ++i) {
+ if (buffer[i] != 0)
+ return 0;
+ }
+
+ return 1;
+}
+
+struct global_state {
+ /* Current iterator. Threads update this, but it is protected by a
+ * mutex, and each thread takes a copy of it when working on it.
+ */
lzma_index_iter iter;
+ lzma_bool iter_finished;
+ pthread_mutex_t iter_mutex;
+
+ /* Note that all threads are accessing these fds, so you have
+ * to use pread/pwrite instead of lseek!
+ */
+
+ /* Input file. */
+ const char *filename;
+ int fd;
- lzma_index_iter_init (&iter, idx);
- while (!lzma_index_iter_next (&iter, LZMA_INDEX_ITER_NONEMPTY_BLOCK)) {
- abort ();
+ /* Output file. */
+ const char *outputfile;
+ int ofd;
+};
+
+struct per_thread_state {
+ unsigned thread_num;
+ struct global_state *global;
+ int status;
+};
+/* Create threads to iterate over the blocks and uncompress. */
+static void *worker_thread (void *vp);
+static void
+iter_blocks (lzma_index *idx, unsigned nr_threads,
+ const char *filename, int fd, const char *outputfile, int ofd)
+{
+ struct global_state global;
+ struct per_thread_state per_thread[nr_threads];
+ pthread_t thread[nr_threads];
+ unsigned u, nr_errors;
+ int err;
+ void *status;
+
+ lzma_index_iter_init (&global.iter, idx);
+ global.iter_finished = 0;
+ err = pthread_mutex_init (&global.iter_mutex, NULL);
+ if (err != 0)
+ error (EXIT_FAILURE, err, "pthread_mutex_init");
+
+ global.filename = filename;
+ global.fd = fd;
+ global.outputfile = outputfile;
+ global.ofd = ofd;
+
+ for (u = 0; u < nr_threads; ++u) {
+ per_thread[u].thread_num = u;
+ per_thread[u].global = &global;
+ }
+ /* Start the threads. */
+ for (u = 0; u < nr_threads; ++u) {
+ err = pthread_create (&thread[u], NULL, worker_thread, &per_thread[u]);
+ if (err != 0)
+ error (EXIT_FAILURE, err, "pthread_create (%u)", u);
+ }
+ /* Wait for the threads to exit. */
+ nr_errors = 0;
+ for (u = 0; u < nr_threads; ++u) {
+ err = pthread_join (thread[u], &status);
+ if (err != 0) {
+ error (0, err, "pthread_join (%u)", u);
+ nr_errors++;
+ }
+ if (*(int *)status == -1)
+ nr_errors++;
}
- return 0;
+ if (nr_errors > 0)
+ exit (EXIT_FAILURE);
}
-#if 0
-char *
-xzfile_read_block (xzfile *xz, uint64_t offset,
- uint64_t *start_rtn, uint64_t *size_rtn)
+/* Iterate over the blocks and uncompress. */
+static void *
+worker_thread (void *vp)
{
+ struct per_thread_state *state = vp;
+ struct global_state *global = state->global;
lzma_index_iter iter;
+ int err;
+ off_t position, oposition;
uint8_t header[LZMA_BLOCK_HEADER_SIZE_MAX];
+ ssize_t n;
lzma_block block;
lzma_filter filters[LZMA_FILTERS_MAX + 1];
lzma_ret r;
lzma_stream strm = LZMA_STREAM_INIT;
- char *data;
- ssize_t n;
+ char outbuf[BUFFER_SIZE];
size_t i;
+ lzma_bool iter_finished;
- /* Locate the block containing the uncompressed offset. */
- lzma_index_iter_init (&iter, xz->idx);
- if (lzma_index_iter_locate (&iter, offset)) {
- nbdkit_error ("cannot find offset %" PRIu64 " in the xz file", offset);
- return NULL;
- }
-
- *start_rtn = iter.block.uncompressed_file_offset;
- *size_rtn = iter.block.uncompressed_size;
-
- nbdkit_debug ("seek: block number %d at file offset %" PRIu64,
- (int) iter.block.number_in_file,
- (uint64_t) iter.block.compressed_file_offset);
-
- if (lseek (xz->fd, iter.block.compressed_file_offset, SEEK_SET) == -1) {
- nbdkit_error ("lseek: %m");
- return NULL;
- }
+ state->status = -1;
- /* Read the block header. Start by reading a single byte which
- * tell us how big the block header is.
- */
- n = read (xz->fd, header, 1);
- if (n == 0) {
- nbdkit_error ("read: unexpected end of file reading block header byte");
- return NULL;
- }
- if (n == -1) {
- nbdkit_error ("read: %m");
- return NULL;
- }
+ for (;;) {
+ /* Get the next block. */
+ err = pthread_mutex_lock (&global->iter_mutex);
+ if (err != 0) abort ();
+ iter_finished = global->iter_finished;
+ if (!iter_finished) {
+ iter_finished = global->iter_finished =
+ lzma_index_iter_next (&global->iter, LZMA_INDEX_ITER_NONEMPTY_BLOCK);
+ if (!iter_finished)
+ /* Take a local copy of this iterator since another thread will
+ * update the global version.
+ */
+ iter = global->iter;
+ }
+ err = pthread_mutex_unlock (&global->iter_mutex);
+ if (err != 0) abort ();
+ if (iter_finished)
+ break;
- if (header[0] == '\0') {
- nbdkit_error ("read: unexpected invalid block in file, header[0] = 0");
- return NULL;
- }
+ /* Read the block header. Start by reading a single byte which
+ * tell us how big the block header is.
+ */
+ position = iter.block.compressed_file_offset;
+ n = pread (global->fd, header, 1, position);
+ if (n == 0) {
+ error (0, 0,
+ "%s: read: unexpected end of file reading block header byte",
+ global->filename);
+ return &state->status;
+ }
+ if (n == -1) {
+ error (0, errno, "%s: read", global->filename);
+ return &state->status;
+ }
+ position++;
- block.version = 0;
- block.check = iter.stream.flags->check;
- block.filters = filters;
- block.header_size = lzma_block_header_size_decode (header[0]);
+ if (header[0] == '\0') {
+ error (0, errno,
+ "%s: read: unexpected invalid block in file, header[0] = 0",
+ global->filename);
+ return &state->status;
+ }
- /* Now read and decode the block header. */
- n = read (xz->fd, &header[1], block.header_size-1);
- if (n >= 0 && n != block.header_size-1) {
- nbdkit_error ("read: unexpected end of file reading block header");
- return NULL;
- }
- if (n == -1) {
- nbdkit_error ("read: %m");
- return NULL;
- }
+ block.version = 0;
+ block.check = iter.stream.flags->check;
+ block.filters = filters;
+ block.header_size = lzma_block_header_size_decode (header[0]);
+
+ /* Now read and decode the block header. */
+ n = pread (global->fd, &header[1], block.header_size-1, position);
+ if (n >= 0 && n != block.header_size-1) {
+ error (0, 0,
+ "%s: read: unexpected end of file reading block header",
+ global->filename);
+ return &state->status;
+ }
+ if (n == -1) {
+ error (0, errno, "%s: read", global->filename);
+ return &state->status;
+ }
+ position += n;
- r = lzma_block_header_decode (&block, NULL, header);
- if (r != LZMA_OK) {
- nbdkit_error ("invalid block header (error %d)", r);
- return NULL;
- }
+ r = lzma_block_header_decode (&block, NULL, header);
+ if (r != LZMA_OK) {
+ error (0, errno, "%s: invalid block header (error %d)",
+ global->filename, r);
+ return &state->status;
+ }
- /* What this actually does is it checks that the block header
- * matches the index.
- */
- r = lzma_block_compressed_size (&block, iter.block.unpadded_size);
- if (r != LZMA_OK) {
- nbdkit_error ("cannot calculate compressed size (error %d)", r);
- goto err1;
- }
+ /* What this actually does is it checks that the block header
+ * matches the index.
+ */
+ r = lzma_block_compressed_size (&block, iter.block.unpadded_size);
+ if (r != LZMA_OK) {
+ error (0, errno,
+ "%s: cannot calculate compressed size (error %d)",
+ global->filename, r);
+ return &state->status;
+ }
- /* Read the block data. */
- r = lzma_block_decoder (&strm, &block);
- if (r != LZMA_OK) {
- nbdkit_error ("invalid block (error %d)", r);
- goto err1;
- }
+ /* Where we will start writing to. */
+ oposition = iter.block.uncompressed_file_offset;
- data = malloc (*size_rtn);
- if (data == NULL) {
- nbdkit_error ("malloc (%zu bytes): %m\n"
- "NOTE: If this error occurs, you need to recompress your xz files with a smaller block size. Use: 'xz --block-size=16777216 ...'.",
- *size_rtn);
- goto err1;
- }
+ /* Read the block data and uncompress it. */
+ r = lzma_block_decoder (&strm, &block);
+ if (r != LZMA_OK) {
+ error (0, 0, "%s: invalid block (error %d)", global->filename, r);
+ return &state->status;
+ }
- strm.next_in = NULL;
- strm.avail_in = 0;
- strm.next_out = (uint8_t *) data;
- strm.avail_out = block.uncompressed_size;
+ strm.next_in = NULL;
+ strm.avail_in = 0;
+ strm.next_out = outbuf;
+ strm.avail_out = sizeof outbuf;
+
+ for (;;) {
+ uint8_t buf[BUFFER_SIZE];
+ lzma_action action = LZMA_RUN;
+
+ if (strm.avail_in == 0) {
+ strm.next_in = buf;
+ n = pread (global->fd, buf, sizeof buf, position);
+ if (n == -1) {
+ error (0, errno, "%s: read", global->filename);
+ return &state->status;
+ }
+ position += n;
+ strm.avail_in = n;
+ if (n == 0)
+ action = LZMA_FINISH;
+ }
- do {
- uint8_t buf[BUFSIZ];
- lzma_action action = LZMA_RUN;
+ r = lzma_code (&strm, action);
+
+ if (strm.avail_out == 0 || r == LZMA_STREAM_END) {
+ size_t wsz = sizeof outbuf - strm.avail_out;
+
+ /* Don't write if the block is all zero, to preserve output file
+ * sparseness. However we have to update oposition.
+ */
+ if (!is_zero (outbuf, wsz)) {
+ if (pwrite (global->ofd, outbuf, wsz, oposition) != wsz) {
+ /* XXX Handle short writes. */
+ error (0, errno, "%s: write", global->filename);
+ return &state->status;
+ }
+ }
+ oposition += wsz;
+
+ strm.next_out = outbuf;
+ strm.avail_out = sizeof outbuf;
+ }
- if (strm.avail_in == 0) {
- strm.next_in = buf;
- n = read (xz->fd, buf, sizeof buf);
- if (n == -1) {
- nbdkit_error ("read: %m");
- goto err2;
+ if (r == LZMA_STREAM_END)
+ break;
+ if (r != LZMA_OK) {
+ error (0, 0,
+ "%s: could not parse block data (error %d)",
+ global->filename, r);
+ return &state->status;
}
- strm.avail_in = n;
- if (n == 0)
- action = LZMA_FINISH;
}
- strm.avail_in = n;
- strm.next_in = buf;
- r = lzma_code (&strm, action);
- } while (r == LZMA_OK);
+ lzma_end (&strm);
- if (r != LZMA_OK && r != LZMA_STREAM_END) {
- nbdkit_error ("could not parse block data (error %d)", r);
- goto err2;
+ for (i = 0; filters[i].id != LZMA_VLI_UNKNOWN; ++i)
+ free (filters[i].options);
}
- lzma_end (&strm);
-
- for (i = 0; filters[i].id != LZMA_VLI_UNKNOWN; ++i)
- free (filters[i].options);
-
- return data;
-
- err2:
- free (data);
- lzma_end (&strm);
- err1:
- for (i = 0; filters[i].id != LZMA_VLI_UNKNOWN; ++i)
- free (filters[i].options);
-
- return NULL;
+ state->status = 0;
+ return &state->status;
}
-#endif