#include <error.h>
#include <errno.h>
#include <getopt.h>
+#include <pthread.h>
#include <lzma.h>
#define debug(fs,...) /* nothing */
#endif
+/* Size of buffers used in decompression loop. */
+#define BUFFER_SIZE (64*1024)
+
#define XZ_HEADER_MAGIC "\xfd" "7zXZ\0"
#define XZ_HEADER_MAGIC_LEN 6
#define XZ_FOOTER_MAGIC "YZ"
#define XZ_FOOTER_MAGIC_LEN 2
static void usage (int exitcode);
-static void xzfile_uncompress (const char *filename, const char *outputfile);
+static void xzfile_uncompress (const char *filename, const char *outputfile, unsigned nr_threads);
static int check_header_magic (int fd);
static lzma_index *parse_indexes (const char *filename, int fd);
-static void iter_blocks (lzma_index *idx, const char *filename, int fd, const char *outputfile, int ofd);
+static void iter_blocks (lzma_index *idx, unsigned nr_threads, const char *filename, int fd, const char *outputfile, int ofd);
static struct option long_options[] = {
{ "output", required_argument, 0, 'o' },
+ { "threads", required_argument, 0, 'T' },
{ "help", 0, 0, '?' },
{ NULL, 0, 0, 0 }
};
-static const char *options = "o:";
+static const char *options = "o:T:";
int
main (int argc, char *argv[])
{
int c;
int longopt_index;
+ unsigned nr_threads = 0;
const char *outputfile = NULL;
for (;;) {
outputfile = optarg;
break;
+ case 'T':
+ if (sscanf (optarg, "%u", &nr_threads) != 1)
+ error (EXIT_FAILURE, 0, "cannot parse -T option");
+ break;
+
case '?':
usage (EXIT_SUCCESS);
}
}
- if (outputfile == NULL)
- error (EXIT_FAILURE, 0, "you must give the -o (output file) option\n");
-
if (optind != argc - 1)
usage (EXIT_FAILURE);
- xzfile_uncompress (argv[optind], outputfile);
+ if (outputfile == NULL)
+ error (EXIT_FAILURE, 0, "you must give the -o (output file) option");
+
+ /* -T 0 (default) means use all cores. */
+ if (nr_threads == 0) {
+ long i = sysconf (_SC_NPROCESSORS_ONLN);
+ if (i <= 0)
+ error (EXIT_FAILURE, errno, "could not get number of cores");
+ nr_threads = (unsigned) i;
+ }
+
+ xzfile_uncompress (argv[optind], outputfile, nr_threads);
exit (EXIT_SUCCESS);
}
static void
usage (int exitcode)
{
- printf ("usage: pxzcat -o output input.xz\n");
+ printf ("usage: pxzcat -o output [-T #threads] input.xz\n");
exit (exitcode);
}
static void
-xzfile_uncompress (const char *filename, const char *outputfile)
+xzfile_uncompress (const char *filename, const char *outputfile,
+ unsigned nr_threads)
{
int fd, ofd;
uint64_t size;
if (ftruncate (ofd, size) == -1)
error (EXIT_FAILURE, errno, "ftruncate: %s", outputfile);
- /* Iterate over blocks and uncompress. */
- iter_blocks (idx, filename, fd, outputfile, ofd);
+ /* Iterate over blocks. */
+ iter_blocks (idx, nr_threads, filename, fd, outputfile, ofd);
close (fd);
}
return combined_index;
}
-#define BUFFER_SIZE (64*1024)
-
/* Return true iff the buffer is all zero bytes.
*
* Note that gcc is smart enough to optimize this properly:
return 1;
}
-/* Iterate over the blocks and uncompress. */
+struct global_state {
+ /* Current iterator. Threads update this, but it is protected by a
+ * mutex, and each thread takes a copy of it when working on it.
+ */
+ lzma_index_iter iter;
+ lzma_bool iter_finished;
+ pthread_mutex_t iter_mutex;
+
+ /* Note that all threads are accessing these fds, so you have
+ * to use pread/pwrite instead of lseek!
+ */
+
+ /* Input file. */
+ const char *filename;
+ int fd;
+
+ /* Output file. */
+ const char *outputfile;
+ int ofd;
+};
+
+struct per_thread_state {
+ unsigned thread_num;
+ struct global_state *global;
+ int status;
+};
+
+/* Create threads to iterate over the blocks and uncompress. */
+static void *worker_thread (void *vp);
+
static void
-iter_blocks (lzma_index *idx,
+iter_blocks (lzma_index *idx, unsigned nr_threads,
const char *filename, int fd, const char *outputfile, int ofd)
{
+ struct global_state global;
+ struct per_thread_state per_thread[nr_threads];
+ pthread_t thread[nr_threads];
+ unsigned u, nr_errors;
+ int err;
+ void *status;
+
+ lzma_index_iter_init (&global.iter, idx);
+ global.iter_finished = 0;
+ err = pthread_mutex_init (&global.iter_mutex, NULL);
+ if (err != 0)
+ error (EXIT_FAILURE, err, "pthread_mutex_init");
+
+ global.filename = filename;
+ global.fd = fd;
+ global.outputfile = outputfile;
+ global.ofd = ofd;
+
+ for (u = 0; u < nr_threads; ++u) {
+ per_thread[u].thread_num = u;
+ per_thread[u].global = &global;
+ }
+
+ /* Start the threads. */
+ for (u = 0; u < nr_threads; ++u) {
+ err = pthread_create (&thread[u], NULL, worker_thread, &per_thread[u]);
+ if (err != 0)
+ error (EXIT_FAILURE, err, "pthread_create (%u)", u);
+ }
+
+ /* Wait for the threads to exit. */
+ nr_errors = 0;
+ for (u = 0; u < nr_threads; ++u) {
+ err = pthread_join (thread[u], &status);
+ if (err != 0) {
+ error (0, err, "pthread_join (%u)", u);
+ nr_errors++;
+ }
+ if (*(int *)status == -1)
+ nr_errors++;
+ }
+
+ if (nr_errors > 0)
+ exit (EXIT_FAILURE);
+}
+
+/* Iterate over the blocks and uncompress. */
+static void *
+worker_thread (void *vp)
+{
+ struct per_thread_state *state = vp;
+ struct global_state *global = state->global;
lzma_index_iter iter;
+ int err;
+ off_t position, oposition;
uint8_t header[LZMA_BLOCK_HEADER_SIZE_MAX];
ssize_t n;
lzma_block block;
lzma_stream strm = LZMA_STREAM_INIT;
char outbuf[BUFFER_SIZE];
size_t i;
+ lzma_bool iter_finished;
- lzma_index_iter_init (&iter, idx);
- while (!lzma_index_iter_next (&iter, LZMA_INDEX_ITER_NONEMPTY_BLOCK)) {
- /* Seek to the start of the block in the input file. */
- if (lseek (fd, iter.block.compressed_file_offset, SEEK_SET) == -1)
- error (EXIT_FAILURE, errno, "lseek");
+ state->status = -1;
+
+ for (;;) {
+ /* Get the next block. */
+ err = pthread_mutex_lock (&global->iter_mutex);
+ if (err != 0) abort ();
+ iter_finished = global->iter_finished;
+ if (!iter_finished) {
+ iter_finished = global->iter_finished =
+ lzma_index_iter_next (&global->iter, LZMA_INDEX_ITER_NONEMPTY_BLOCK);
+ if (!iter_finished)
+ /* Take a local copy of this iterator since another thread will
+ * update the global version.
+ */
+ iter = global->iter;
+ }
+ err = pthread_mutex_unlock (&global->iter_mutex);
+ if (err != 0) abort ();
+ if (iter_finished)
+ break;
/* Read the block header. Start by reading a single byte which
* tell us how big the block header is.
*/
- n = read (fd, header, 1);
- if (n == 0)
- error (EXIT_FAILURE, 0,
+ position = iter.block.compressed_file_offset;
+ n = pread (global->fd, header, 1, position);
+ if (n == 0) {
+ error (0, 0,
"%s: read: unexpected end of file reading block header byte",
- filename);
- if (n == -1)
- error (EXIT_FAILURE, errno, "%s: read", filename);
+ global->filename);
+ return &state->status;
+ }
+ if (n == -1) {
+ error (0, errno, "%s: read", global->filename);
+ return &state->status;
+ }
+ position++;
- if (header[0] == '\0')
- error (EXIT_FAILURE, errno,
+ if (header[0] == '\0') {
+ error (0, errno,
"%s: read: unexpected invalid block in file, header[0] = 0",
- filename);
+ global->filename);
+ return &state->status;
+ }
block.version = 0;
block.check = iter.stream.flags->check;
block.header_size = lzma_block_header_size_decode (header[0]);
/* Now read and decode the block header. */
- n = read (fd, &header[1], block.header_size-1);
- if (n >= 0 && n != block.header_size-1)
- error (EXIT_FAILURE, 0,
+ n = pread (global->fd, &header[1], block.header_size-1, position);
+ if (n >= 0 && n != block.header_size-1) {
+ error (0, 0,
"%s: read: unexpected end of file reading block header",
- filename);
- if (n == -1)
- error (EXIT_FAILURE, errno, "%s: read", filename);
+ global->filename);
+ return &state->status;
+ }
+ if (n == -1) {
+ error (0, errno, "%s: read", global->filename);
+ return &state->status;
+ }
+ position += n;
r = lzma_block_header_decode (&block, NULL, header);
- if (r != LZMA_OK)
- error (EXIT_FAILURE, errno, "%s: invalid block header (error %d)",
- filename, r);
+ if (r != LZMA_OK) {
+ error (0, errno, "%s: invalid block header (error %d)",
+ global->filename, r);
+ return &state->status;
+ }
/* What this actually does is it checks that the block header
* matches the index.
*/
r = lzma_block_compressed_size (&block, iter.block.unpadded_size);
- if (r != LZMA_OK)
- error (EXIT_FAILURE, errno,
- "%s: cannot calculate compressed size (error %d)", filename, r);
+ if (r != LZMA_OK) {
+ error (0, errno,
+ "%s: cannot calculate compressed size (error %d)",
+ global->filename, r);
+ return &state->status;
+ }
+
+ /* Where we will start writing to. */
+ oposition = iter.block.uncompressed_file_offset;
/* Read the block data and uncompress it. */
r = lzma_block_decoder (&strm, &block);
- if (r != LZMA_OK)
- error (EXIT_FAILURE, 0, "%s: invalid block (error %d)", filename, r);
+ if (r != LZMA_OK) {
+ error (0, 0, "%s: invalid block (error %d)", global->filename, r);
+ return &state->status;
+ }
strm.next_in = NULL;
strm.avail_in = 0;
if (strm.avail_in == 0) {
strm.next_in = buf;
- n = read (fd, buf, sizeof buf);
- if (n == -1)
- error (EXIT_FAILURE, errno, "%s: read", filename);
+ n = pread (global->fd, buf, sizeof buf, position);
+ if (n == -1) {
+ error (0, errno, "%s: read", global->filename);
+ return &state->status;
+ }
+ position += n;
strm.avail_in = n;
if (n == 0)
action = LZMA_FINISH;
if (strm.avail_out == 0 || r == LZMA_STREAM_END) {
size_t wsz = sizeof outbuf - strm.avail_out;
- if (is_zero (outbuf, wsz)) { /* Seek to preserve sparseness. */
- if (lseek (ofd, wsz, SEEK_CUR) == (off_t) -1)
- error (EXIT_FAILURE, errno, "%s: seek", filename);
- } else {
- if (write (ofd, outbuf, wsz) != wsz)
+ /* Don't write if the block is all zero, to preserve output file
+ * sparseness. However we have to update oposition.
+ */
+ if (!is_zero (outbuf, wsz)) {
+ if (pwrite (global->ofd, outbuf, wsz, oposition) != wsz) {
/* XXX Handle short writes. */
- error (EXIT_FAILURE, errno, "%s: write", filename);
+ error (0, errno, "%s: write", global->filename);
+ return &state->status;
+ }
}
+ oposition += wsz;
strm.next_out = outbuf;
strm.avail_out = sizeof outbuf;
if (r == LZMA_STREAM_END)
break;
- if (r != LZMA_OK)
- error (EXIT_FAILURE, 0,
- "%s: could not parse block data (error %d)", filename, r);
+ if (r != LZMA_OK) {
+ error (0, 0,
+ "%s: could not parse block data (error %d)",
+ global->filename, r);
+ return &state->status;
+ }
}
lzma_end (&strm);
for (i = 0; filters[i].id != LZMA_VLI_UNKNOWN; ++i)
free (filters[i].options);
}
+
+ state->status = 0;
+ return &state->status;
}