X-Git-Url: http://git.annexia.org/?a=blobdiff_plain;f=pxzcat.c;h=35d0422a7b360cbbfb9f861d2dfa6eaeb9c570f5;hb=0176eab5339f4b552717cca8a57231efb16dc2a6;hp=0ac0eb9337ddadd5494bb63e2aa1f7ccf0c3fdca;hpb=37b3b03e236062c2681ec89118c125ec784e9fa3;p=pxzcat.git diff --git a/pxzcat.c b/pxzcat.c index 0ac0eb9..35d0422 100644 --- a/pxzcat.c +++ b/pxzcat.c @@ -44,41 +44,47 @@ #include #include #include +#include #include #define DEBUG 1 #if DEBUG -#define debug(fs,...) fprintf (stderr, "pxzcat: debug: " fs, ## __VA_ARGS__) +#define debug(fs,...) fprintf (stderr, "pxzcat: debug: " fs "\n", ## __VA_ARGS__) #else #define debug(fs,...) /* nothing */ #endif +/* Size of buffers used in decompression loop. */ +#define BUFFER_SIZE (64*1024) + #define XZ_HEADER_MAGIC "\xfd" "7zXZ\0" #define XZ_HEADER_MAGIC_LEN 6 #define XZ_FOOTER_MAGIC "YZ" #define XZ_FOOTER_MAGIC_LEN 2 static void usage (int exitcode); -static void xzfile_uncompress (const char *filename, const char *outputfile); +static void xzfile_uncompress (const char *filename, const char *outputfile, unsigned nr_threads); static int check_header_magic (int fd); static lzma_index *parse_indexes (const char *filename, int fd); -static void iter_indexes (lzma_index *idx); +static void iter_blocks (lzma_index *idx, unsigned nr_threads, const char *filename, int fd, const char *outputfile, int ofd); static struct option long_options[] = { { "output", required_argument, 0, 'o' }, + { "threads", required_argument, 0, 'T' }, { "help", 0, 0, '?' }, { NULL, 0, 0, 0 } }; -static const char *options = "o:"; +static const char *options = "o:T:"; int main (int argc, char *argv[]) { int c; int longopt_index; + unsigned nr_threads = 0; const char *outputfile = NULL; for (;;) { @@ -95,6 +101,11 @@ main (int argc, char *argv[]) outputfile = optarg; break; + case 'T': + if (sscanf (optarg, "%u", &nr_threads) != 1) + error (EXIT_FAILURE, 0, "cannot parse -T option"); + break; + case '?': usage (EXIT_SUCCESS); @@ -103,13 +114,21 @@ main (int argc, char *argv[]) } } - if (outputfile == NULL) - error (EXIT_FAILURE, 0, "you must give the -o (output file) option\n"); - if (optind != argc - 1) usage (EXIT_FAILURE); - xzfile_uncompress (argv[optind], outputfile); + if (outputfile == NULL) + error (EXIT_FAILURE, 0, "you must give the -o (output file) option"); + + /* -T 0 (default) means use all cores. */ + if (nr_threads == 0) { + long i = sysconf (_SC_NPROCESSORS_ONLN); + if (i <= 0) + error (EXIT_FAILURE, errno, "could not get number of cores"); + nr_threads = (unsigned) i; + } + + xzfile_uncompress (argv[optind], outputfile, nr_threads); exit (EXIT_SUCCESS); } @@ -117,14 +136,15 @@ main (int argc, char *argv[]) static void usage (int exitcode) { - printf ("usage: pxzcat -o output input.xz\n"); + printf ("usage: pxzcat -o output [-T #threads] input.xz\n"); exit (exitcode); } static void -xzfile_uncompress (const char *filename, const char *outputfile) +xzfile_uncompress (const char *filename, const char *outputfile, + unsigned nr_threads) { - int fd; + int fd, ofd; uint64_t size; lzma_index *idx; @@ -140,8 +160,18 @@ xzfile_uncompress (const char *filename, const char *outputfile) /* Read and parse the indexes. */ idx = parse_indexes (filename, fd); - /* Iterate over indexes and uncompress. */ - iter_indexes (idx); + /* Get the file uncompressed size, create the output file. */ + size = lzma_index_uncompressed_size (idx); + debug ("uncompressed size = %" PRIu64 " bytes", size); + + ofd = open (outputfile, O_WRONLY|O_CREAT|O_TRUNC|O_NOCTTY, 0644); + if (ofd == -1) + error (EXIT_FAILURE, errno, "open: %s", outputfile); + if (ftruncate (ofd, size) == -1) + error (EXIT_FAILURE, errno, "ftruncate: %s", outputfile); + + /* Iterate over blocks. */ + iter_blocks (idx, nr_threads, filename, fd, outputfile, ofd); close (fd); } @@ -315,166 +345,274 @@ parse_indexes (const char *filename, int fd) return combined_index; } -/* Iterate over the indexes and uncompress. +/* Return true iff the buffer is all zero bytes. + * + * Note that gcc is smart enough to optimize this properly: + * http://stackoverflow.com/questions/1493936/faster-means-of-checking-for-an-empty-buffer-in-c/1493989#1493989 */ -static void -iter_indexes (lzma_index *idx) +static inline int +is_zero (const char *buffer, size_t size) { + size_t i; + + for (i = 0; i < size; ++i) { + if (buffer[i] != 0) + return 0; + } + + return 1; +} + +struct global_state { + /* Current iterator. Threads update this, but it is protected by a + * mutex, and each thread takes a copy of it when working on it. + */ lzma_index_iter iter; + lzma_bool iter_finished; + pthread_mutex_t iter_mutex; - lzma_index_iter_init (&iter, idx); - while (!lzma_index_iter_next (&iter, LZMA_INDEX_ITER_NONEMPTY_BLOCK)) { - abort (); + /* Note that all threads are accessing these fds, so you have + * to use pread/pwrite instead of lseek! + */ + + /* Input file. */ + const char *filename; + int fd; + /* Output file. */ + const char *outputfile; + int ofd; +}; +struct per_thread_state { + unsigned thread_num; + struct global_state *global; + int status; +}; +/* Create threads to iterate over the blocks and uncompress. */ +static void *worker_thread (void *vp); +static void +iter_blocks (lzma_index *idx, unsigned nr_threads, + const char *filename, int fd, const char *outputfile, int ofd) +{ + struct global_state global; + struct per_thread_state per_thread[nr_threads]; + pthread_t thread[nr_threads]; + unsigned u, nr_errors; + int err; + void *status; + + lzma_index_iter_init (&global.iter, idx); + global.iter_finished = 0; + err = pthread_mutex_init (&global.iter_mutex, NULL); + if (err != 0) + error (EXIT_FAILURE, err, "pthread_mutex_init"); + + global.filename = filename; + global.fd = fd; + global.outputfile = outputfile; + global.ofd = ofd; + + for (u = 0; u < nr_threads; ++u) { + per_thread[u].thread_num = u; + per_thread[u].global = &global; } + + /* Start the threads. */ + for (u = 0; u < nr_threads; ++u) { + err = pthread_create (&thread[u], NULL, worker_thread, &per_thread[u]); + if (err != 0) + error (EXIT_FAILURE, err, "pthread_create (%u)", u); + } + + /* Wait for the threads to exit. */ + nr_errors = 0; + for (u = 0; u < nr_threads; ++u) { + err = pthread_join (thread[u], &status); + if (err != 0) { + error (0, err, "pthread_join (%u)", u); + nr_errors++; + } + if (*(int *)status == -1) + nr_errors++; + } + + if (nr_errors > 0) + exit (EXIT_FAILURE); } -#if 0 -char * -xzfile_read_block (xzfile *xz, uint64_t offset, - uint64_t *start_rtn, uint64_t *size_rtn) +/* Iterate over the blocks and uncompress. */ +static void * +worker_thread (void *vp) { + struct per_thread_state *state = vp; + struct global_state *global = state->global; lzma_index_iter iter; + int err; + off_t position, oposition; uint8_t header[LZMA_BLOCK_HEADER_SIZE_MAX]; + ssize_t n; lzma_block block; lzma_filter filters[LZMA_FILTERS_MAX + 1]; lzma_ret r; lzma_stream strm = LZMA_STREAM_INIT; - char *data; - ssize_t n; + char outbuf[BUFFER_SIZE]; size_t i; + lzma_bool iter_finished; - /* Locate the block containing the uncompressed offset. */ - lzma_index_iter_init (&iter, xz->idx); - if (lzma_index_iter_locate (&iter, offset)) { - nbdkit_error ("cannot find offset %" PRIu64 " in the xz file", offset); - return NULL; - } - - *start_rtn = iter.block.uncompressed_file_offset; - *size_rtn = iter.block.uncompressed_size; + state->status = -1; - nbdkit_debug ("seek: block number %d at file offset %" PRIu64, - (int) iter.block.number_in_file, - (uint64_t) iter.block.compressed_file_offset); - - if (lseek (xz->fd, iter.block.compressed_file_offset, SEEK_SET) == -1) { - nbdkit_error ("lseek: %m"); - return NULL; - } - - /* Read the block header. Start by reading a single byte which - * tell us how big the block header is. - */ - n = read (xz->fd, header, 1); - if (n == 0) { - nbdkit_error ("read: unexpected end of file reading block header byte"); - return NULL; - } - if (n == -1) { - nbdkit_error ("read: %m"); - return NULL; - } + for (;;) { + /* Get the next block. */ + err = pthread_mutex_lock (&global->iter_mutex); + if (err != 0) abort (); + iter_finished = global->iter_finished; + if (!iter_finished) { + iter_finished = global->iter_finished = + lzma_index_iter_next (&global->iter, LZMA_INDEX_ITER_NONEMPTY_BLOCK); + if (!iter_finished) + /* Take a local copy of this iterator since another thread will + * update the global version. + */ + iter = global->iter; + } + err = pthread_mutex_unlock (&global->iter_mutex); + if (err != 0) abort (); + if (iter_finished) + break; - if (header[0] == '\0') { - nbdkit_error ("read: unexpected invalid block in file, header[0] = 0"); - return NULL; - } + /* Read the block header. Start by reading a single byte which + * tell us how big the block header is. + */ + position = iter.block.compressed_file_offset; + n = pread (global->fd, header, 1, position); + if (n == 0) { + error (0, 0, + "%s: read: unexpected end of file reading block header byte", + global->filename); + return &state->status; + } + if (n == -1) { + error (0, errno, "%s: read", global->filename); + return &state->status; + } + position++; - block.version = 0; - block.check = iter.stream.flags->check; - block.filters = filters; - block.header_size = lzma_block_header_size_decode (header[0]); + if (header[0] == '\0') { + error (0, errno, + "%s: read: unexpected invalid block in file, header[0] = 0", + global->filename); + return &state->status; + } - /* Now read and decode the block header. */ - n = read (xz->fd, &header[1], block.header_size-1); - if (n >= 0 && n != block.header_size-1) { - nbdkit_error ("read: unexpected end of file reading block header"); - return NULL; - } - if (n == -1) { - nbdkit_error ("read: %m"); - return NULL; - } + block.version = 0; + block.check = iter.stream.flags->check; + block.filters = filters; + block.header_size = lzma_block_header_size_decode (header[0]); + + /* Now read and decode the block header. */ + n = pread (global->fd, &header[1], block.header_size-1, position); + if (n >= 0 && n != block.header_size-1) { + error (0, 0, + "%s: read: unexpected end of file reading block header", + global->filename); + return &state->status; + } + if (n == -1) { + error (0, errno, "%s: read", global->filename); + return &state->status; + } + position += n; - r = lzma_block_header_decode (&block, NULL, header); - if (r != LZMA_OK) { - nbdkit_error ("invalid block header (error %d)", r); - return NULL; - } + r = lzma_block_header_decode (&block, NULL, header); + if (r != LZMA_OK) { + error (0, errno, "%s: invalid block header (error %d)", + global->filename, r); + return &state->status; + } - /* What this actually does is it checks that the block header - * matches the index. - */ - r = lzma_block_compressed_size (&block, iter.block.unpadded_size); - if (r != LZMA_OK) { - nbdkit_error ("cannot calculate compressed size (error %d)", r); - goto err1; - } + /* What this actually does is it checks that the block header + * matches the index. + */ + r = lzma_block_compressed_size (&block, iter.block.unpadded_size); + if (r != LZMA_OK) { + error (0, errno, + "%s: cannot calculate compressed size (error %d)", + global->filename, r); + return &state->status; + } - /* Read the block data. */ - r = lzma_block_decoder (&strm, &block); - if (r != LZMA_OK) { - nbdkit_error ("invalid block (error %d)", r); - goto err1; - } + /* Where we will start writing to. */ + oposition = iter.block.uncompressed_file_offset; - data = malloc (*size_rtn); - if (data == NULL) { - nbdkit_error ("malloc (%zu bytes): %m\n" - "NOTE: If this error occurs, you need to recompress your xz files with a smaller block size. Use: 'xz --block-size=16777216 ...'.", - *size_rtn); - goto err1; - } + /* Read the block data and uncompress it. */ + r = lzma_block_decoder (&strm, &block); + if (r != LZMA_OK) { + error (0, 0, "%s: invalid block (error %d)", global->filename, r); + return &state->status; + } - strm.next_in = NULL; - strm.avail_in = 0; - strm.next_out = (uint8_t *) data; - strm.avail_out = block.uncompressed_size; + strm.next_in = NULL; + strm.avail_in = 0; + strm.next_out = outbuf; + strm.avail_out = sizeof outbuf; + + for (;;) { + uint8_t buf[BUFFER_SIZE]; + lzma_action action = LZMA_RUN; + + if (strm.avail_in == 0) { + strm.next_in = buf; + n = pread (global->fd, buf, sizeof buf, position); + if (n == -1) { + error (0, errno, "%s: read", global->filename); + return &state->status; + } + position += n; + strm.avail_in = n; + if (n == 0) + action = LZMA_FINISH; + } - do { - uint8_t buf[BUFSIZ]; - lzma_action action = LZMA_RUN; + r = lzma_code (&strm, action); + + if (strm.avail_out == 0 || r == LZMA_STREAM_END) { + size_t wsz = sizeof outbuf - strm.avail_out; + + /* Don't write if the block is all zero, to preserve output file + * sparseness. However we have to update oposition. + */ + if (!is_zero (outbuf, wsz)) { + if (pwrite (global->ofd, outbuf, wsz, oposition) != wsz) { + /* XXX Handle short writes. */ + error (0, errno, "%s: write", global->filename); + return &state->status; + } + } + oposition += wsz; + + strm.next_out = outbuf; + strm.avail_out = sizeof outbuf; + } - if (strm.avail_in == 0) { - strm.next_in = buf; - n = read (xz->fd, buf, sizeof buf); - if (n == -1) { - nbdkit_error ("read: %m"); - goto err2; + if (r == LZMA_STREAM_END) + break; + if (r != LZMA_OK) { + error (0, 0, + "%s: could not parse block data (error %d)", + global->filename, r); + return &state->status; } - strm.avail_in = n; - if (n == 0) - action = LZMA_FINISH; } - strm.avail_in = n; - strm.next_in = buf; - r = lzma_code (&strm, action); - } while (r == LZMA_OK); + lzma_end (&strm); - if (r != LZMA_OK && r != LZMA_STREAM_END) { - nbdkit_error ("could not parse block data (error %d)", r); - goto err2; + for (i = 0; filters[i].id != LZMA_VLI_UNKNOWN; ++i) + free (filters[i].options); } - lzma_end (&strm); - - for (i = 0; filters[i].id != LZMA_VLI_UNKNOWN; ++i) - free (filters[i].options); - - return data; - - err2: - free (data); - lzma_end (&strm); - err1: - for (i = 0; filters[i].id != LZMA_VLI_UNKNOWN; ++i) - free (filters[i].options); - - return NULL; + state->status = 0; + return &state->status; } -#endif