/* pxzcat derived from nbdkit * Copyright (C) 2013 Red Hat Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are * met: * * * Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * * * Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * * * Neither the name of Red Hat nor the names of its contributors may be * used to endorse or promote products derived from this software without * specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY RED HAT AND CONTRIBUTORS ''AS IS'' AND * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL RED HAT OR * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #define DEBUG 0 #if DEBUG #define debug(fs,...) fprintf (stderr, "pxzcat: debug: " fs "\n", ## __VA_ARGS__) #else #define debug(fs,...) /* nothing */ #endif /* Size of buffers used in decompression loop. */ #define BUFFER_SIZE (64*1024) #define XZ_HEADER_MAGIC "\xfd" "7zXZ\0" #define XZ_HEADER_MAGIC_LEN 6 #define XZ_FOOTER_MAGIC "YZ" #define XZ_FOOTER_MAGIC_LEN 2 static void usage (int exitcode); static void xzfile_uncompress (const char *filename, const char *outputfile, unsigned nr_threads); static int check_header_magic (int fd); static lzma_index *parse_indexes (const char *filename, int fd); static void iter_blocks (lzma_index *idx, unsigned nr_threads, const char *filename, int fd, const char *outputfile, int ofd); static struct option long_options[] = { { "output", required_argument, 0, 'o' }, { "threads", required_argument, 0, 'T' }, { "help", 0, 0, '?' }, { NULL, 0, 0, 0 } }; static const char *options = "o:T:"; int main (int argc, char *argv[]) { int c; int longopt_index; unsigned nr_threads = 0; const char *outputfile = NULL; for (;;) { c = getopt_long (argc, argv, options, long_options, &longopt_index); if (c == -1) break; switch (c) { /* Long option with no short opt equivalent. */ case 0: abort (); case 'o': outputfile = optarg; break; case 'T': if (sscanf (optarg, "%u", &nr_threads) != 1) error (EXIT_FAILURE, 0, "cannot parse -T option"); break; case '?': usage (EXIT_SUCCESS); default: usage (EXIT_FAILURE); } } if (optind != argc - 1) usage (EXIT_FAILURE); if (outputfile == NULL) error (EXIT_FAILURE, 0, "you must give the -o (output file) option"); /* -T 0 (default) means use all cores. */ if (nr_threads == 0) { long i = sysconf (_SC_NPROCESSORS_ONLN); if (i <= 0) error (EXIT_FAILURE, errno, "could not get number of cores"); nr_threads = (unsigned) i; } debug ("nr_threads = %u", nr_threads); xzfile_uncompress (argv[optind], outputfile, nr_threads); exit (EXIT_SUCCESS); } static void usage (int exitcode) { printf ("usage: pxzcat -o output [-T #threads] input.xz\n"); exit (exitcode); } static void xzfile_uncompress (const char *filename, const char *outputfile, unsigned nr_threads) { int fd, ofd; uint64_t size; lzma_index *idx; /* Open the file. */ fd = open (filename, O_RDONLY); if (fd == -1) error (EXIT_FAILURE, errno, "open: %s", filename); /* Check file magic. */ if (!check_header_magic (fd)) error (EXIT_FAILURE, 0, "%s: not an xz file", filename); /* Read and parse the indexes. */ idx = parse_indexes (filename, fd); /* Get the file uncompressed size, create the output file. */ size = lzma_index_uncompressed_size (idx); debug ("uncompressed size = %" PRIu64 " bytes", size); /* Avoid annoying ext4 auto_da_alloc which causes a flush on close * unless we are very careful about not truncating the file when it * has zero size. (Thanks Eric Sandeen) */ unlink (outputfile); ofd = open (outputfile, O_WRONLY|O_CREAT|O_TRUNC|O_NOCTTY, 0644); if (ofd == -1) error (EXIT_FAILURE, errno, "open: %s", outputfile); /* See above about auto_da_alloc. */ write (ofd, "\0", 1); if (ftruncate (ofd, size) == -1) error (EXIT_FAILURE, errno, "ftruncate: %s", outputfile); /* Tell the kernel we won't read the output file. */ posix_fadvise (fd, 0, 0, POSIX_FADV_RANDOM|POSIX_FADV_DONTNEED); /* Iterate over blocks. */ iter_blocks (idx, nr_threads, filename, fd, outputfile, ofd); close (fd); } static int check_header_magic (int fd) { char buf[XZ_HEADER_MAGIC_LEN]; if (lseek (fd, 0, SEEK_SET) == -1) return 0; if (read (fd, buf, XZ_HEADER_MAGIC_LEN) != XZ_HEADER_MAGIC_LEN) return 0; if (memcmp (buf, XZ_HEADER_MAGIC, XZ_HEADER_MAGIC_LEN) != 0) return 0; return 1; } /* For explanation of this function, see src/xz/list.c:parse_indexes * in the xz sources. */ static lzma_index * parse_indexes (const char *filename, int fd) { lzma_ret r; off_t pos, index_size; uint8_t footer[LZMA_STREAM_HEADER_SIZE]; uint8_t header[LZMA_STREAM_HEADER_SIZE]; lzma_stream_flags footer_flags; lzma_stream_flags header_flags; lzma_stream strm = LZMA_STREAM_INIT; ssize_t n; lzma_index *combined_index = NULL; lzma_index *this_index = NULL; lzma_vli stream_padding = 0; size_t nr_streams = 0; /* Check file size is a multiple of 4 bytes. */ pos = lseek (fd, 0, SEEK_END); if (pos == (off_t) -1) error (EXIT_FAILURE, errno, "%s: lseek", filename); if ((pos & 3) != 0) error (EXIT_FAILURE, 0, "%s: not an xz file: size is not a multiple of 4 bytes", filename); /* Jump backwards through the file identifying each stream. */ while (pos > 0) { debug ("looping through streams: pos = %" PRIu64, (uint64_t) pos); if (pos < LZMA_STREAM_HEADER_SIZE) error (EXIT_FAILURE, 0, "%s: corrupted file at %" PRIu64, filename, (uint64_t) pos); if (lseek (fd, -LZMA_STREAM_HEADER_SIZE, SEEK_CUR) == -1) error (EXIT_FAILURE, errno, "%s: lseek", filename); if (read (fd, footer, LZMA_STREAM_HEADER_SIZE) != LZMA_STREAM_HEADER_SIZE) error (EXIT_FAILURE, errno, "%s: read stream footer", filename); /* Skip stream padding. */ if (footer[8] == 0 && footer[9] == 0 && footer[10] == 0 && footer[11] == 0) { stream_padding += 4; pos -= 4; continue; } pos -= LZMA_STREAM_HEADER_SIZE; nr_streams++; debug ("decode stream footer at pos = %" PRIu64, (uint64_t) pos); /* Does the stream footer look reasonable? */ r = lzma_stream_footer_decode (&footer_flags, footer); if (r != LZMA_OK) error (EXIT_FAILURE, 0, "%s: invalid stream footer (error %d)", filename, r); debug ("backward_size = %" PRIu64, (uint64_t) footer_flags.backward_size); index_size = footer_flags.backward_size; if (pos < index_size + LZMA_STREAM_HEADER_SIZE) error (EXIT_FAILURE, 0, "%s: invalid stream footer", filename); pos -= index_size; debug ("decode index at pos = %" PRIu64, (uint64_t) pos); /* Seek backwards to the index of this stream. */ if (lseek (fd, pos, SEEK_SET) == -1) error (EXIT_FAILURE, errno, "%s: lseek", filename); /* Decode the index. */ r = lzma_index_decoder (&strm, &this_index, UINT64_MAX); if (r != LZMA_OK) error (EXIT_FAILURE, 0, "%s: invalid stream index (error %d)", filename, r); do { uint8_t buf[BUFSIZ]; strm.avail_in = index_size; if (strm.avail_in > BUFSIZ) strm.avail_in = BUFSIZ; n = read (fd, &buf, strm.avail_in); if (n == -1) error (EXIT_FAILURE, errno, "%s: read", filename); index_size -= strm.avail_in; strm.next_in = buf; r = lzma_code (&strm, LZMA_RUN); } while (r == LZMA_OK); if (r != LZMA_STREAM_END) error (EXIT_FAILURE, 0, "%s: could not parse index (error %d)", filename, r); pos -= lzma_index_total_size (this_index) + LZMA_STREAM_HEADER_SIZE; debug ("decode stream header at pos = %" PRIu64, (uint64_t) pos); /* Read and decode the stream header. */ if (lseek (fd, pos, SEEK_SET) == -1) error (EXIT_FAILURE, errno, "%s: lseek", filename); if (read (fd, header, LZMA_STREAM_HEADER_SIZE) != LZMA_STREAM_HEADER_SIZE) error (EXIT_FAILURE, errno, "%s: read stream header", filename); r = lzma_stream_header_decode (&header_flags, header); if (r != LZMA_OK) error (EXIT_FAILURE, 0, "%s: invalid stream header (error %d)", filename, r); /* Header and footer of the stream should be equal. */ r = lzma_stream_flags_compare (&header_flags, &footer_flags); if (r != LZMA_OK) error (EXIT_FAILURE, 0, "%s: header and footer of stream are not equal (error %d)", filename, r); /* Store the decoded stream flags in this_index. */ r = lzma_index_stream_flags (this_index, &footer_flags); if (r != LZMA_OK) error (EXIT_FAILURE, 0, "%s: cannot read stream_flags from index (error %d)", filename, r); /* Store the amount of stream padding so far. Needed to calculate * compressed offsets correctly in multi-stream files. */ r = lzma_index_stream_padding (this_index, stream_padding); if (r != LZMA_OK) error (EXIT_FAILURE, 0, "%s: cannot set stream_padding in index (error %d)", filename, r); if (combined_index != NULL) { r = lzma_index_cat (this_index, combined_index, NULL); if (r != LZMA_OK) error (EXIT_FAILURE, 0, "%s: cannot combine indexes", filename); } combined_index = this_index; this_index = NULL; } lzma_end (&strm); return combined_index; } /* Return true iff the buffer is all zero bytes. * * Note that gcc is smart enough to optimize this properly: * http://stackoverflow.com/questions/1493936/faster-means-of-checking-for-an-empty-buffer-in-c/1493989#1493989 */ static inline int is_zero (const char *buffer, size_t size) { size_t i; for (i = 0; i < size; ++i) { if (buffer[i] != 0) return 0; } return 1; } struct global_state { /* Current iterator. Threads update this, but it is protected by a * mutex, and each thread takes a copy of it when working on it. */ lzma_index_iter iter; lzma_bool iter_finished; pthread_mutex_t iter_mutex; /* Note that all threads are accessing these fds, so you have * to use pread/pwrite instead of lseek! */ /* Input file. */ const char *filename; int fd; /* Output file. */ const char *outputfile; int ofd; }; struct per_thread_state { unsigned thread_num; struct global_state *global; int status; }; /* Create threads to iterate over the blocks and uncompress. */ static void *worker_thread (void *vp); static void iter_blocks (lzma_index *idx, unsigned nr_threads, const char *filename, int fd, const char *outputfile, int ofd) { struct global_state global; struct per_thread_state per_thread[nr_threads]; pthread_t thread[nr_threads]; unsigned u, nr_errors; int err; void *status; lzma_index_iter_init (&global.iter, idx); global.iter_finished = 0; err = pthread_mutex_init (&global.iter_mutex, NULL); if (err != 0) error (EXIT_FAILURE, err, "pthread_mutex_init"); global.filename = filename; global.fd = fd; global.outputfile = outputfile; global.ofd = ofd; for (u = 0; u < nr_threads; ++u) { per_thread[u].thread_num = u; per_thread[u].global = &global; } /* Start the threads. */ for (u = 0; u < nr_threads; ++u) { err = pthread_create (&thread[u], NULL, worker_thread, &per_thread[u]); if (err != 0) error (EXIT_FAILURE, err, "pthread_create (%u)", u); } /* Wait for the threads to exit. */ nr_errors = 0; for (u = 0; u < nr_threads; ++u) { err = pthread_join (thread[u], &status); if (err != 0) { error (0, err, "pthread_join (%u)", u); nr_errors++; } if (*(int *)status == -1) nr_errors++; } if (nr_errors > 0) exit (EXIT_FAILURE); } /* Iterate over the blocks and uncompress. */ static void * worker_thread (void *vp) { struct per_thread_state *state = vp; struct global_state *global = state->global; lzma_index_iter iter; int err; off_t position, oposition; uint8_t header[LZMA_BLOCK_HEADER_SIZE_MAX]; ssize_t n; lzma_block block; lzma_filter filters[LZMA_FILTERS_MAX + 1]; lzma_ret r; lzma_stream strm = LZMA_STREAM_INIT; uint8_t buf[BUFFER_SIZE]; char outbuf[BUFFER_SIZE]; size_t i; lzma_bool iter_finished; state->status = -1; for (;;) { /* Get the next block. */ err = pthread_mutex_lock (&global->iter_mutex); if (err != 0) abort (); iter_finished = global->iter_finished; if (!iter_finished) { iter_finished = global->iter_finished = lzma_index_iter_next (&global->iter, LZMA_INDEX_ITER_NONEMPTY_BLOCK); if (!iter_finished) /* Take a local copy of this iterator since another thread will * update the global version. */ iter = global->iter; } err = pthread_mutex_unlock (&global->iter_mutex); if (err != 0) abort (); if (iter_finished) break; /* Read the block header. Start by reading a single byte which * tell us how big the block header is. */ position = iter.block.compressed_file_offset; n = pread (global->fd, header, 1, position); if (n == 0) { error (0, 0, "%s: read: unexpected end of file reading block header byte", global->filename); return &state->status; } if (n == -1) { error (0, errno, "%s: read", global->filename); return &state->status; } position++; if (header[0] == '\0') { error (0, errno, "%s: read: unexpected invalid block in file, header[0] = 0", global->filename); return &state->status; } block.version = 0; block.check = iter.stream.flags->check; block.filters = filters; block.header_size = lzma_block_header_size_decode (header[0]); /* Now read and decode the block header. */ n = pread (global->fd, &header[1], block.header_size-1, position); if (n >= 0 && n != block.header_size-1) { error (0, 0, "%s: read: unexpected end of file reading block header", global->filename); return &state->status; } if (n == -1) { error (0, errno, "%s: read", global->filename); return &state->status; } position += n; r = lzma_block_header_decode (&block, NULL, header); if (r != LZMA_OK) { error (0, errno, "%s: invalid block header (error %d)", global->filename, r); return &state->status; } /* What this actually does is it checks that the block header * matches the index. */ r = lzma_block_compressed_size (&block, iter.block.unpadded_size); if (r != LZMA_OK) { error (0, errno, "%s: cannot calculate compressed size (error %d)", global->filename, r); return &state->status; } /* Where we will start writing to. */ oposition = iter.block.uncompressed_file_offset; /* Read the block data and uncompress it. */ r = lzma_block_decoder (&strm, &block); if (r != LZMA_OK) { error (0, 0, "%s: invalid block (error %d)", global->filename, r); return &state->status; } strm.next_in = NULL; strm.avail_in = 0; strm.next_out = outbuf; strm.avail_out = sizeof outbuf; for (;;) { lzma_action action = LZMA_RUN; if (strm.avail_in == 0) { strm.next_in = buf; n = pread (global->fd, buf, sizeof buf, position); if (n == -1) { error (0, errno, "%s: read", global->filename); return &state->status; } position += n; strm.avail_in = n; if (n == 0) action = LZMA_FINISH; } r = lzma_code (&strm, action); if (strm.avail_out == 0 || r == LZMA_STREAM_END) { size_t wsz = sizeof outbuf - strm.avail_out; /* Don't write if the block is all zero, to preserve output file * sparseness. However we have to update oposition. */ if (!is_zero (outbuf, wsz)) { if (pwrite (global->ofd, outbuf, wsz, oposition) != wsz) { /* XXX Handle short writes. */ error (0, errno, "%s: write", global->filename); return &state->status; } } oposition += wsz; strm.next_out = outbuf; strm.avail_out = sizeof outbuf; } if (r == LZMA_STREAM_END) break; if (r != LZMA_OK) { error (0, 0, "%s: could not parse block data (error %d)", global->filename, r); return &state->status; } } lzma_end (&strm); for (i = 0; filters[i].id != LZMA_VLI_UNKNOWN; ++i) free (filters[i].options); } state->status = 0; return &state->status; }