Update README to point out that you shouldn't use this project in production.
[pxzcat.git] / pxzcat.c
1 /* pxzcat derived from nbdkit
2  * Copyright (C) 2013 Red Hat Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  * * Redistributions of source code must retain the above copyright
10  * notice, this list of conditions and the following disclaimer.
11  *
12  * * Redistributions in binary form must reproduce the above copyright
13  * notice, this list of conditions and the following disclaimer in the
14  * documentation and/or other materials provided with the distribution.
15  *
16  * * Neither the name of Red Hat nor the names of its contributors may be
17  * used to endorse or promote products derived from this software without
18  * specific prior written permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY RED HAT AND CONTRIBUTORS ''AS IS'' AND
21  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
22  * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
23  * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL RED HAT OR
24  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
27  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
28  * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
29  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
30  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
31  * SUCH DAMAGE.
32  */
33
34 #include <config.h>
35
36 #include <stdio.h>
37 #include <stdlib.h>
38 #include <string.h>
39 #include <stdint.h>
40 #include <inttypes.h>
41 #include <unistd.h>
42 #include <fcntl.h>
43 #include <sys/types.h>
44 #include <error.h>
45 #include <errno.h>
46 #include <getopt.h>
47 #include <pthread.h>
48
49 #include <lzma.h>
50
51 #define DEBUG 0
52
53 #if DEBUG
54 #define debug(fs,...) fprintf (stderr, "pxzcat: debug: " fs "\n", ## __VA_ARGS__)
55 #else
56 #define debug(fs,...) /* nothing */
57 #endif
58
59 /* Size of buffers used in decompression loop. */
60 #define BUFFER_SIZE (64*1024)
61
62 #define XZ_HEADER_MAGIC     "\xfd" "7zXZ\0"
63 #define XZ_HEADER_MAGIC_LEN 6
64 #define XZ_FOOTER_MAGIC     "YZ"
65 #define XZ_FOOTER_MAGIC_LEN 2
66
67 static void usage (int exitcode);
68 static void xzfile_uncompress (const char *filename, const char *outputfile, unsigned nr_threads);
69 static int check_header_magic (int fd);
70 static lzma_index *parse_indexes (const char *filename, int fd);
71 static void iter_blocks (lzma_index *idx, unsigned nr_threads, const char *filename, int fd, const char *outputfile, int ofd);
72
73 static struct option long_options[] = {
74   { "output",   required_argument,  0, 'o' },
75   { "threads",  required_argument,  0, 'T' },
76   { "help",     0,                  0, '?' },
77   { NULL,       0,                  0, 0   }
78 };
79
80 static const char *options = "o:T:";
81
82 int
83 main (int argc, char *argv[])
84 {
85   int c;
86   int longopt_index;
87   unsigned nr_threads = 0;
88   const char *outputfile = NULL;
89
90   for (;;) {
91     c = getopt_long (argc, argv, options, long_options, &longopt_index);
92     if (c == -1)
93       break;
94
95     switch (c) {
96       /* Long option with no short opt equivalent. */
97     case 0:
98       abort ();
99
100     case 'o':
101       outputfile = optarg;
102       break;
103
104     case 'T':
105       if (sscanf (optarg, "%u", &nr_threads) != 1)
106         error (EXIT_FAILURE, 0, "cannot parse -T option");
107       break;
108
109     case '?':
110       usage (EXIT_SUCCESS);
111
112     default:
113       usage (EXIT_FAILURE);
114     }
115   }
116
117   if (optind != argc - 1)
118     usage (EXIT_FAILURE);
119
120   if (outputfile == NULL)
121     error (EXIT_FAILURE, 0, "you must give the -o (output file) option");
122
123   /* -T 0 (default) means use all cores. */
124   if (nr_threads == 0) {
125     long i = sysconf (_SC_NPROCESSORS_ONLN);
126     if (i <= 0)
127       error (EXIT_FAILURE, errno, "could not get number of cores");
128     nr_threads = (unsigned) i;
129   }
130   debug ("nr_threads = %u", nr_threads);
131
132   xzfile_uncompress (argv[optind], outputfile, nr_threads);
133
134   exit (EXIT_SUCCESS);
135 }
136
137 static void
138 usage (int exitcode)
139 {
140   printf ("usage: pxzcat -o output [-T #threads] input.xz\n");
141   exit (exitcode);
142 }
143
144 static void
145 xzfile_uncompress (const char *filename, const char *outputfile,
146                    unsigned nr_threads)
147 {
148   int fd, ofd;
149   uint64_t size;
150   lzma_index *idx;
151
152   /* Open the file. */
153   fd = open (filename, O_RDONLY);
154   if (fd == -1)
155     error (EXIT_FAILURE, errno, "open: %s", filename);
156
157   /* Check file magic. */
158   if (!check_header_magic (fd))
159     error (EXIT_FAILURE, 0, "%s: not an xz file", filename);
160
161   /* Read and parse the indexes. */
162   idx = parse_indexes (filename, fd);
163
164   /* Get the file uncompressed size, create the output file. */
165   size = lzma_index_uncompressed_size (idx);
166   debug ("uncompressed size = %" PRIu64 " bytes", size);
167
168   /* Avoid annoying ext4 auto_da_alloc which causes a flush on close
169    * unless we are very careful about not truncating a regular file
170    * from non-zero size to zero size.  (Thanks Eric Sandeen)
171    */
172   ofd = open (outputfile, O_WRONLY|O_CREAT|O_NOCTTY, 0644);
173   if (ofd == -1)
174     error (EXIT_FAILURE, errno, "open: %s", outputfile);
175
176   if (ftruncate (ofd, 1) == -1)
177     error (EXIT_FAILURE, errno, "ftruncate (1 byte): %s", outputfile);
178
179   if (lseek (ofd, 0, SEEK_SET) == -1)
180     error (EXIT_FAILURE, errno, "lseek: %s", outputfile);
181
182   if (write (ofd, "\0", 1) == -1)
183     error (EXIT_FAILURE, errno, "write: %s", outputfile);
184
185   if (ftruncate (ofd, size) == -1)
186     error (EXIT_FAILURE, errno, "ftruncate: %s", outputfile);
187
188   /* Tell the kernel we won't read the output file. */
189   posix_fadvise (fd, 0, 0, POSIX_FADV_RANDOM|POSIX_FADV_DONTNEED);
190
191   /* Iterate over blocks. */
192   iter_blocks (idx, nr_threads, filename, fd, outputfile, ofd);
193
194   close (fd);
195 }
196
197 static int
198 check_header_magic (int fd)
199 {
200   char buf[XZ_HEADER_MAGIC_LEN];
201
202   if (lseek (fd, 0, SEEK_SET) == -1)
203     return 0;
204   if (read (fd, buf, XZ_HEADER_MAGIC_LEN) != XZ_HEADER_MAGIC_LEN)
205     return 0;
206   if (memcmp (buf, XZ_HEADER_MAGIC, XZ_HEADER_MAGIC_LEN) != 0)
207     return 0;
208   return 1;
209 }
210
211 /* For explanation of this function, see src/xz/list.c:parse_indexes
212  * in the xz sources.
213  */
214 static lzma_index *
215 parse_indexes (const char *filename, int fd)
216 {
217   lzma_ret r;
218   off_t pos, index_size;
219   uint8_t footer[LZMA_STREAM_HEADER_SIZE];
220   uint8_t header[LZMA_STREAM_HEADER_SIZE];
221   lzma_stream_flags footer_flags;
222   lzma_stream_flags header_flags;
223   lzma_stream strm = LZMA_STREAM_INIT;
224   ssize_t n;
225   lzma_index *combined_index = NULL;
226   lzma_index *this_index = NULL;
227   lzma_vli stream_padding = 0;
228   size_t nr_streams = 0;
229
230   /* Check file size is a multiple of 4 bytes. */
231   pos = lseek (fd, 0, SEEK_END);
232   if (pos == (off_t) -1)
233     error (EXIT_FAILURE, errno, "%s: lseek", filename);
234
235   if ((pos & 3) != 0)
236     error (EXIT_FAILURE, 0,
237            "%s: not an xz file: size is not a multiple of 4 bytes",
238            filename);
239
240   /* Jump backwards through the file identifying each stream. */
241   while (pos > 0) {
242     debug ("looping through streams: pos = %" PRIu64, (uint64_t) pos);
243
244     if (pos < LZMA_STREAM_HEADER_SIZE)
245       error (EXIT_FAILURE, 0,
246              "%s: corrupted file at %" PRIu64, filename, (uint64_t) pos);
247
248     if (lseek (fd, -LZMA_STREAM_HEADER_SIZE, SEEK_CUR) == -1)
249       error (EXIT_FAILURE, errno, "%s: lseek", filename);
250
251     if (read (fd, footer, LZMA_STREAM_HEADER_SIZE) != LZMA_STREAM_HEADER_SIZE)
252       error (EXIT_FAILURE, errno, "%s: read stream footer", filename);
253
254     /* Skip stream padding. */
255     if (footer[8] == 0 && footer[9] == 0 &&
256         footer[10] == 0 && footer[11] == 0) {
257       stream_padding += 4;
258       pos -= 4;
259       continue;
260     }
261
262     pos -= LZMA_STREAM_HEADER_SIZE;
263     nr_streams++;
264
265     debug ("decode stream footer at pos = %" PRIu64, (uint64_t) pos);
266
267     /* Does the stream footer look reasonable? */
268     r = lzma_stream_footer_decode (&footer_flags, footer);
269     if (r != LZMA_OK)
270       error (EXIT_FAILURE, 0,
271              "%s: invalid stream footer (error %d)", filename, r);
272
273     debug ("backward_size = %" PRIu64, (uint64_t) footer_flags.backward_size);
274     index_size = footer_flags.backward_size;
275     if (pos < index_size + LZMA_STREAM_HEADER_SIZE)
276       error (EXIT_FAILURE, 0, "%s: invalid stream footer", filename);
277
278     pos -= index_size;
279     debug ("decode index at pos = %" PRIu64, (uint64_t) pos);
280
281     /* Seek backwards to the index of this stream. */
282     if (lseek (fd, pos, SEEK_SET) == -1)
283       error (EXIT_FAILURE, errno, "%s: lseek", filename);
284
285     /* Decode the index. */
286     r = lzma_index_decoder (&strm, &this_index, UINT64_MAX);
287     if (r != LZMA_OK)
288       error (EXIT_FAILURE, 0,
289              "%s: invalid stream index (error %d)", filename, r);
290
291     do {
292       uint8_t buf[BUFSIZ];
293
294       strm.avail_in = index_size;
295       if (strm.avail_in > BUFSIZ)
296         strm.avail_in = BUFSIZ;
297
298       n = read (fd, &buf, strm.avail_in);
299       if (n == -1)
300         error (EXIT_FAILURE, errno, "%s: read", filename);
301
302       index_size -= strm.avail_in;
303
304       strm.next_in = buf;
305       r = lzma_code (&strm, LZMA_RUN);
306     } while (r == LZMA_OK);
307
308     if (r != LZMA_STREAM_END)
309       error (EXIT_FAILURE, 0, "%s: could not parse index (error %d)",
310              filename, r);
311
312     pos -= lzma_index_total_size (this_index) + LZMA_STREAM_HEADER_SIZE;
313
314     debug ("decode stream header at pos = %" PRIu64, (uint64_t) pos);
315
316     /* Read and decode the stream header. */
317     if (lseek (fd, pos, SEEK_SET) == -1)
318       error (EXIT_FAILURE, errno, "%s: lseek", filename);
319
320     if (read (fd, header, LZMA_STREAM_HEADER_SIZE) != LZMA_STREAM_HEADER_SIZE)
321       error (EXIT_FAILURE, errno, "%s: read stream header", filename);
322
323     r = lzma_stream_header_decode (&header_flags, header);
324     if (r != LZMA_OK)
325       error (EXIT_FAILURE, 0,
326              "%s: invalid stream header (error %d)", filename, r);
327
328     /* Header and footer of the stream should be equal. */
329     r = lzma_stream_flags_compare (&header_flags, &footer_flags);
330     if (r != LZMA_OK)
331       error (EXIT_FAILURE, 0,
332              "%s: header and footer of stream are not equal (error %d)",
333              filename, r);
334
335     /* Store the decoded stream flags in this_index. */
336     r = lzma_index_stream_flags (this_index, &footer_flags);
337     if (r != LZMA_OK)
338       error (EXIT_FAILURE, 0,
339              "%s: cannot read stream_flags from index (error %d)",
340              filename, r);
341
342     /* Store the amount of stream padding so far.  Needed to calculate
343      * compressed offsets correctly in multi-stream files.
344      */
345     r = lzma_index_stream_padding (this_index, stream_padding);
346     if (r != LZMA_OK)
347       error (EXIT_FAILURE, 0,
348              "%s: cannot set stream_padding in index (error %d)",
349              filename, r);
350
351     if (combined_index != NULL) {
352       r = lzma_index_cat (this_index, combined_index, NULL);
353       if (r != LZMA_OK)
354         error (EXIT_FAILURE, 0, "%s: cannot combine indexes", filename);
355     }
356
357     combined_index = this_index;
358     this_index = NULL;
359   }
360
361   lzma_end (&strm);
362
363   return combined_index;
364 }
365
366 /* Return true iff the buffer is all zero bytes.
367  *
368  * Note that gcc is smart enough to optimize this properly:
369  * http://stackoverflow.com/questions/1493936/faster-means-of-checking-for-an-empty-buffer-in-c/1493989#1493989
370  */
371 static inline int
372 is_zero (const char *buffer, size_t size)
373 {
374   size_t i;
375
376   for (i = 0; i < size; ++i) {
377     if (buffer[i] != 0)
378       return 0;
379   }
380
381   return 1;
382 }
383
384 struct global_state {
385   /* Current iterator.  Threads update this, but it is protected by a
386    * mutex, and each thread takes a copy of it when working on it.
387    */
388   lzma_index_iter iter;
389   lzma_bool iter_finished;
390   pthread_mutex_t iter_mutex;
391
392   /* Note that all threads are accessing these fds, so you have
393    * to use pread/pwrite instead of lseek!
394    */
395
396   /* Input file. */
397   const char *filename;
398   int fd;
399
400   /* Output file. */
401   const char *outputfile;
402   int ofd;
403 };
404
405 struct per_thread_state {
406   unsigned thread_num;
407   struct global_state *global;
408   int status;
409 };
410
411 /* Create threads to iterate over the blocks and uncompress. */
412 static void *worker_thread (void *vp);
413
414 static void
415 iter_blocks (lzma_index *idx, unsigned nr_threads,
416              const char *filename, int fd, const char *outputfile, int ofd)
417 {
418   struct global_state global;
419   struct per_thread_state per_thread[nr_threads];
420   pthread_t thread[nr_threads];
421   unsigned u, nr_errors;
422   int err;
423   void *status;
424
425   lzma_index_iter_init (&global.iter, idx);
426   global.iter_finished = 0;
427   err = pthread_mutex_init (&global.iter_mutex, NULL);
428   if (err != 0)
429     error (EXIT_FAILURE, err, "pthread_mutex_init");
430
431   global.filename = filename;
432   global.fd = fd;
433   global.outputfile = outputfile;
434   global.ofd = ofd;
435
436   for (u = 0; u < nr_threads; ++u) {
437     per_thread[u].thread_num = u;
438     per_thread[u].global = &global;
439   }
440
441   /* Start the threads. */
442   for (u = 0; u < nr_threads; ++u) {
443     err = pthread_create (&thread[u], NULL, worker_thread, &per_thread[u]);
444     if (err != 0)
445       error (EXIT_FAILURE, err, "pthread_create (%u)", u);
446   }
447
448   /* Wait for the threads to exit. */
449   nr_errors = 0;
450   for (u = 0; u < nr_threads; ++u) {
451     err = pthread_join (thread[u], &status);
452     if (err != 0) {
453       error (0, err, "pthread_join (%u)", u);
454       nr_errors++;
455     }
456     if (*(int *)status == -1)
457       nr_errors++;
458   }
459
460   if (nr_errors > 0)
461     exit (EXIT_FAILURE);
462 }
463
464 /* Iterate over the blocks and uncompress. */
465 static void *
466 worker_thread (void *vp)
467 {
468   struct per_thread_state *state = vp;
469   struct global_state *global = state->global;
470   lzma_index_iter iter;
471   int err;
472   off_t position, oposition;
473   uint8_t header[LZMA_BLOCK_HEADER_SIZE_MAX];
474   ssize_t n;
475   lzma_block block;
476   lzma_filter filters[LZMA_FILTERS_MAX + 1];
477   lzma_ret r;
478   lzma_stream strm = LZMA_STREAM_INIT;
479   uint8_t buf[BUFFER_SIZE];
480   char outbuf[BUFFER_SIZE];
481   size_t i;
482   lzma_bool iter_finished;
483
484   state->status = -1;
485
486   for (;;) {
487     /* Get the next block. */
488     err = pthread_mutex_lock (&global->iter_mutex);
489     if (err != 0) abort ();
490     iter_finished = global->iter_finished;
491     if (!iter_finished) {
492       iter_finished = global->iter_finished =
493         lzma_index_iter_next (&global->iter, LZMA_INDEX_ITER_NONEMPTY_BLOCK);
494       if (!iter_finished)
495         /* Take a local copy of this iterator since another thread will
496          * update the global version.
497          */
498         iter = global->iter;
499     }
500     err = pthread_mutex_unlock (&global->iter_mutex);
501     if (err != 0) abort ();
502     if (iter_finished)
503       break;
504
505     /* Read the block header.  Start by reading a single byte which
506      * tell us how big the block header is.
507      */
508     position = iter.block.compressed_file_offset;
509     n = pread (global->fd, header, 1, position);
510     if (n == 0) {
511       error (0, 0,
512              "%s: read: unexpected end of file reading block header byte",
513              global->filename);
514       return &state->status;
515     }
516     if (n == -1) {
517       error (0, errno, "%s: read", global->filename);
518       return &state->status;
519     }
520     position++;
521
522     if (header[0] == '\0') {
523       error (0, errno,
524              "%s: read: unexpected invalid block in file, header[0] = 0",
525              global->filename);
526       return &state->status;
527     }
528
529     block.version = 0;
530     block.check = iter.stream.flags->check;
531     block.filters = filters;
532     block.header_size = lzma_block_header_size_decode (header[0]);
533
534     /* Now read and decode the block header. */
535     n = pread (global->fd, &header[1], block.header_size-1, position);
536     if (n >= 0 && n != block.header_size-1) {
537       error (0, 0,
538              "%s: read: unexpected end of file reading block header",
539              global->filename);
540       return &state->status;
541     }
542     if (n == -1) {
543       error (0, errno, "%s: read", global->filename);
544       return &state->status;
545     }
546     position += n;
547
548     r = lzma_block_header_decode (&block, NULL, header);
549     if (r != LZMA_OK) {
550       error (0, errno, "%s: invalid block header (error %d)",
551              global->filename, r);
552       return &state->status;
553     }
554
555     /* What this actually does is it checks that the block header
556      * matches the index.
557      */
558     r = lzma_block_compressed_size (&block, iter.block.unpadded_size);
559     if (r != LZMA_OK) {
560       error (0, errno,
561              "%s: cannot calculate compressed size (error %d)",
562              global->filename, r);
563       return &state->status;
564     }
565
566     /* Where we will start writing to. */
567     oposition = iter.block.uncompressed_file_offset;
568
569     /* Read the block data and uncompress it. */
570     r = lzma_block_decoder (&strm, &block);
571     if (r != LZMA_OK) {
572       error (0, 0, "%s: invalid block (error %d)", global->filename, r);
573       return &state->status;
574     }
575
576     strm.next_in = NULL;
577     strm.avail_in = 0;
578     strm.next_out = outbuf;
579     strm.avail_out = sizeof outbuf;
580
581     for (;;) {
582       lzma_action action = LZMA_RUN;
583
584       if (strm.avail_in == 0) {
585         strm.next_in = buf;
586         n = pread (global->fd, buf, sizeof buf, position);
587         if (n == -1) {
588           error (0, errno, "%s: read", global->filename);
589           return &state->status;
590         }
591         position += n;
592         strm.avail_in = n;
593         if (n == 0)
594           action = LZMA_FINISH;
595       }
596
597       r = lzma_code (&strm, action);
598
599       if (strm.avail_out == 0 || r == LZMA_STREAM_END) {
600         size_t wsz = sizeof outbuf - strm.avail_out;
601
602         /* Don't write if the block is all zero, to preserve output file
603          * sparseness.  However we have to update oposition.
604          */
605         if (!is_zero (outbuf, wsz)) {
606           if (pwrite (global->ofd, outbuf, wsz, oposition) != wsz) {
607             /* XXX Handle short writes. */
608             error (0, errno, "%s: write", global->filename);
609             return &state->status;
610           }
611         }
612         oposition += wsz;
613
614         strm.next_out = outbuf;
615         strm.avail_out = sizeof outbuf;
616       }
617
618       if (r == LZMA_STREAM_END)
619         break;
620       if (r != LZMA_OK) {
621         error (0, 0,
622                "%s: could not parse block data (error %d)",
623                global->filename, r);
624         return &state->status;
625       }
626     }
627
628     lzma_end (&strm);
629
630     for (i = 0; filters[i].id != LZMA_VLI_UNKNOWN; ++i)
631       free (filters[i].options);
632   }
633
634   state->status = 0;
635   return &state->status;
636 }