From 0176eab5339f4b552717cca8a57231efb16dc2a6 Mon Sep 17 00:00:00 2001 From: "Richard W.M. Jones" Date: Mon, 21 Oct 2013 14:40:01 +0100 Subject: [PATCH] Add threading support. --- .gitignore | 1 + Makefile.am | 3 +- pxzcat.c | 262 +++++++++++++++++++++++++++++++++++++++++++++++------------- 3 files changed, 210 insertions(+), 56 deletions(-) diff --git a/.gitignore b/.gitignore index 483e91c..e288ce8 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,7 @@ *.o .deps +.gdb_history Makefile Makefile.in diff --git a/Makefile.am b/Makefile.am index 324e946..ddc657a 100644 --- a/Makefile.am +++ b/Makefile.am @@ -3,4 +3,5 @@ ACLOCAL_AMFLAGS = -I m4 bin_PROGRAMS = pxzcat pxzcat_SOURCES = pxzcat.c -pxzcat_LDADD = $(LIBLZMA_LIBS) +pxzcat_CFLAGS = -pthread +pxzcat_LDADD = $(LIBLZMA_LIBS) -lpthread diff --git a/pxzcat.c b/pxzcat.c index 18cc751..35d0422 100644 --- a/pxzcat.c +++ b/pxzcat.c @@ -44,6 +44,7 @@ #include #include #include +#include #include @@ -55,30 +56,35 @@ #define debug(fs,...) /* nothing */ #endif +/* Size of buffers used in decompression loop. */ +#define BUFFER_SIZE (64*1024) + #define XZ_HEADER_MAGIC "\xfd" "7zXZ\0" #define XZ_HEADER_MAGIC_LEN 6 #define XZ_FOOTER_MAGIC "YZ" #define XZ_FOOTER_MAGIC_LEN 2 static void usage (int exitcode); -static void xzfile_uncompress (const char *filename, const char *outputfile); +static void xzfile_uncompress (const char *filename, const char *outputfile, unsigned nr_threads); static int check_header_magic (int fd); static lzma_index *parse_indexes (const char *filename, int fd); -static void iter_blocks (lzma_index *idx, const char *filename, int fd, const char *outputfile, int ofd); +static void iter_blocks (lzma_index *idx, unsigned nr_threads, const char *filename, int fd, const char *outputfile, int ofd); static struct option long_options[] = { { "output", required_argument, 0, 'o' }, + { "threads", required_argument, 0, 'T' }, { "help", 0, 0, '?' }, { NULL, 0, 0, 0 } }; -static const char *options = "o:"; +static const char *options = "o:T:"; int main (int argc, char *argv[]) { int c; int longopt_index; + unsigned nr_threads = 0; const char *outputfile = NULL; for (;;) { @@ -95,6 +101,11 @@ main (int argc, char *argv[]) outputfile = optarg; break; + case 'T': + if (sscanf (optarg, "%u", &nr_threads) != 1) + error (EXIT_FAILURE, 0, "cannot parse -T option"); + break; + case '?': usage (EXIT_SUCCESS); @@ -103,13 +114,21 @@ main (int argc, char *argv[]) } } - if (outputfile == NULL) - error (EXIT_FAILURE, 0, "you must give the -o (output file) option\n"); - if (optind != argc - 1) usage (EXIT_FAILURE); - xzfile_uncompress (argv[optind], outputfile); + if (outputfile == NULL) + error (EXIT_FAILURE, 0, "you must give the -o (output file) option"); + + /* -T 0 (default) means use all cores. */ + if (nr_threads == 0) { + long i = sysconf (_SC_NPROCESSORS_ONLN); + if (i <= 0) + error (EXIT_FAILURE, errno, "could not get number of cores"); + nr_threads = (unsigned) i; + } + + xzfile_uncompress (argv[optind], outputfile, nr_threads); exit (EXIT_SUCCESS); } @@ -117,12 +136,13 @@ main (int argc, char *argv[]) static void usage (int exitcode) { - printf ("usage: pxzcat -o output input.xz\n"); + printf ("usage: pxzcat -o output [-T #threads] input.xz\n"); exit (exitcode); } static void -xzfile_uncompress (const char *filename, const char *outputfile) +xzfile_uncompress (const char *filename, const char *outputfile, + unsigned nr_threads) { int fd, ofd; uint64_t size; @@ -150,8 +170,8 @@ xzfile_uncompress (const char *filename, const char *outputfile) if (ftruncate (ofd, size) == -1) error (EXIT_FAILURE, errno, "ftruncate: %s", outputfile); - /* Iterate over blocks and uncompress. */ - iter_blocks (idx, filename, fd, outputfile, ofd); + /* Iterate over blocks. */ + iter_blocks (idx, nr_threads, filename, fd, outputfile, ofd); close (fd); } @@ -325,8 +345,6 @@ parse_indexes (const char *filename, int fd) return combined_index; } -#define BUFFER_SIZE (64*1024) - /* Return true iff the buffer is all zero bytes. * * Note that gcc is smart enough to optimize this properly: @@ -345,12 +363,95 @@ is_zero (const char *buffer, size_t size) return 1; } -/* Iterate over the blocks and uncompress. */ +struct global_state { + /* Current iterator. Threads update this, but it is protected by a + * mutex, and each thread takes a copy of it when working on it. + */ + lzma_index_iter iter; + lzma_bool iter_finished; + pthread_mutex_t iter_mutex; + + /* Note that all threads are accessing these fds, so you have + * to use pread/pwrite instead of lseek! + */ + + /* Input file. */ + const char *filename; + int fd; + + /* Output file. */ + const char *outputfile; + int ofd; +}; + +struct per_thread_state { + unsigned thread_num; + struct global_state *global; + int status; +}; + +/* Create threads to iterate over the blocks and uncompress. */ +static void *worker_thread (void *vp); + static void -iter_blocks (lzma_index *idx, +iter_blocks (lzma_index *idx, unsigned nr_threads, const char *filename, int fd, const char *outputfile, int ofd) { + struct global_state global; + struct per_thread_state per_thread[nr_threads]; + pthread_t thread[nr_threads]; + unsigned u, nr_errors; + int err; + void *status; + + lzma_index_iter_init (&global.iter, idx); + global.iter_finished = 0; + err = pthread_mutex_init (&global.iter_mutex, NULL); + if (err != 0) + error (EXIT_FAILURE, err, "pthread_mutex_init"); + + global.filename = filename; + global.fd = fd; + global.outputfile = outputfile; + global.ofd = ofd; + + for (u = 0; u < nr_threads; ++u) { + per_thread[u].thread_num = u; + per_thread[u].global = &global; + } + + /* Start the threads. */ + for (u = 0; u < nr_threads; ++u) { + err = pthread_create (&thread[u], NULL, worker_thread, &per_thread[u]); + if (err != 0) + error (EXIT_FAILURE, err, "pthread_create (%u)", u); + } + + /* Wait for the threads to exit. */ + nr_errors = 0; + for (u = 0; u < nr_threads; ++u) { + err = pthread_join (thread[u], &status); + if (err != 0) { + error (0, err, "pthread_join (%u)", u); + nr_errors++; + } + if (*(int *)status == -1) + nr_errors++; + } + + if (nr_errors > 0) + exit (EXIT_FAILURE); +} + +/* Iterate over the blocks and uncompress. */ +static void * +worker_thread (void *vp) +{ + struct per_thread_state *state = vp; + struct global_state *global = state->global; lzma_index_iter iter; + int err; + off_t position, oposition; uint8_t header[LZMA_BLOCK_HEADER_SIZE_MAX]; ssize_t n; lzma_block block; @@ -359,28 +460,52 @@ iter_blocks (lzma_index *idx, lzma_stream strm = LZMA_STREAM_INIT; char outbuf[BUFFER_SIZE]; size_t i; + lzma_bool iter_finished; - lzma_index_iter_init (&iter, idx); - while (!lzma_index_iter_next (&iter, LZMA_INDEX_ITER_NONEMPTY_BLOCK)) { - /* Seek to the start of the block in the input file. */ - if (lseek (fd, iter.block.compressed_file_offset, SEEK_SET) == -1) - error (EXIT_FAILURE, errno, "lseek"); + state->status = -1; + + for (;;) { + /* Get the next block. */ + err = pthread_mutex_lock (&global->iter_mutex); + if (err != 0) abort (); + iter_finished = global->iter_finished; + if (!iter_finished) { + iter_finished = global->iter_finished = + lzma_index_iter_next (&global->iter, LZMA_INDEX_ITER_NONEMPTY_BLOCK); + if (!iter_finished) + /* Take a local copy of this iterator since another thread will + * update the global version. + */ + iter = global->iter; + } + err = pthread_mutex_unlock (&global->iter_mutex); + if (err != 0) abort (); + if (iter_finished) + break; /* Read the block header. Start by reading a single byte which * tell us how big the block header is. */ - n = read (fd, header, 1); - if (n == 0) - error (EXIT_FAILURE, 0, + position = iter.block.compressed_file_offset; + n = pread (global->fd, header, 1, position); + if (n == 0) { + error (0, 0, "%s: read: unexpected end of file reading block header byte", - filename); - if (n == -1) - error (EXIT_FAILURE, errno, "%s: read", filename); + global->filename); + return &state->status; + } + if (n == -1) { + error (0, errno, "%s: read", global->filename); + return &state->status; + } + position++; - if (header[0] == '\0') - error (EXIT_FAILURE, errno, + if (header[0] == '\0') { + error (0, errno, "%s: read: unexpected invalid block in file, header[0] = 0", - filename); + global->filename); + return &state->status; + } block.version = 0; block.check = iter.stream.flags->check; @@ -388,31 +513,46 @@ iter_blocks (lzma_index *idx, block.header_size = lzma_block_header_size_decode (header[0]); /* Now read and decode the block header. */ - n = read (fd, &header[1], block.header_size-1); - if (n >= 0 && n != block.header_size-1) - error (EXIT_FAILURE, 0, + n = pread (global->fd, &header[1], block.header_size-1, position); + if (n >= 0 && n != block.header_size-1) { + error (0, 0, "%s: read: unexpected end of file reading block header", - filename); - if (n == -1) - error (EXIT_FAILURE, errno, "%s: read", filename); + global->filename); + return &state->status; + } + if (n == -1) { + error (0, errno, "%s: read", global->filename); + return &state->status; + } + position += n; r = lzma_block_header_decode (&block, NULL, header); - if (r != LZMA_OK) - error (EXIT_FAILURE, errno, "%s: invalid block header (error %d)", - filename, r); + if (r != LZMA_OK) { + error (0, errno, "%s: invalid block header (error %d)", + global->filename, r); + return &state->status; + } /* What this actually does is it checks that the block header * matches the index. */ r = lzma_block_compressed_size (&block, iter.block.unpadded_size); - if (r != LZMA_OK) - error (EXIT_FAILURE, errno, - "%s: cannot calculate compressed size (error %d)", filename, r); + if (r != LZMA_OK) { + error (0, errno, + "%s: cannot calculate compressed size (error %d)", + global->filename, r); + return &state->status; + } + + /* Where we will start writing to. */ + oposition = iter.block.uncompressed_file_offset; /* Read the block data and uncompress it. */ r = lzma_block_decoder (&strm, &block); - if (r != LZMA_OK) - error (EXIT_FAILURE, 0, "%s: invalid block (error %d)", filename, r); + if (r != LZMA_OK) { + error (0, 0, "%s: invalid block (error %d)", global->filename, r); + return &state->status; + } strm.next_in = NULL; strm.avail_in = 0; @@ -425,9 +565,12 @@ iter_blocks (lzma_index *idx, if (strm.avail_in == 0) { strm.next_in = buf; - n = read (fd, buf, sizeof buf); - if (n == -1) - error (EXIT_FAILURE, errno, "%s: read", filename); + n = pread (global->fd, buf, sizeof buf, position); + if (n == -1) { + error (0, errno, "%s: read", global->filename); + return &state->status; + } + position += n; strm.avail_in = n; if (n == 0) action = LZMA_FINISH; @@ -438,14 +581,17 @@ iter_blocks (lzma_index *idx, if (strm.avail_out == 0 || r == LZMA_STREAM_END) { size_t wsz = sizeof outbuf - strm.avail_out; - if (is_zero (outbuf, wsz)) { /* Seek to preserve sparseness. */ - if (lseek (ofd, wsz, SEEK_CUR) == (off_t) -1) - error (EXIT_FAILURE, errno, "%s: seek", filename); - } else { - if (write (ofd, outbuf, wsz) != wsz) + /* Don't write if the block is all zero, to preserve output file + * sparseness. However we have to update oposition. + */ + if (!is_zero (outbuf, wsz)) { + if (pwrite (global->ofd, outbuf, wsz, oposition) != wsz) { /* XXX Handle short writes. */ - error (EXIT_FAILURE, errno, "%s: write", filename); + error (0, errno, "%s: write", global->filename); + return &state->status; + } } + oposition += wsz; strm.next_out = outbuf; strm.avail_out = sizeof outbuf; @@ -453,9 +599,12 @@ iter_blocks (lzma_index *idx, if (r == LZMA_STREAM_END) break; - if (r != LZMA_OK) - error (EXIT_FAILURE, 0, - "%s: could not parse block data (error %d)", filename, r); + if (r != LZMA_OK) { + error (0, 0, + "%s: could not parse block data (error %d)", + global->filename, r); + return &state->status; + } } lzma_end (&strm); @@ -463,4 +612,7 @@ iter_blocks (lzma_index *idx, for (i = 0; filters[i].id != LZMA_VLI_UNKNOWN; ++i) free (filters[i].options); } + + state->status = 0; + return &state->status; } -- 1.8.3.1