liblzma/
write.rs

1//! Writer-based compression/decompression streams
2
3mod auto_finish;
4
5use std::io;
6use std::io::prelude::*;
7
8#[cfg(feature = "parallel")]
9use crate::stream::MtStreamBuilder;
10use crate::stream::{Action, Check, Status, Stream};
11pub use auto_finish::{AutoFinishXzDecoder, AutoFinishXzEncoder};
12
13/// A compression stream which will have uncompressed data written to it and
14/// will write compressed data to an output stream.
15/// [XzEncoder] will no longer perform the finalization automatically in the next miner release, so you need to call [XzEncoder::finish] manually.
16/// If you want to automate the finalization process, please use [XzEncoder::auto_finish].
17pub struct XzEncoder<W: Write> {
18    data: Stream,
19    obj: Option<W>,
20    buf: Vec<u8>,
21}
22
23/// A compression stream which will have compressed data written to it and
24/// will write uncompressed data to an output stream.
25/// [XzDecoder] will no longer perform the finalization automatically in the next miner release, so you need to call [XzDecoder::finish] manually.
26/// If you want to automate the finalization process, please use [XzDecoder::auto_finish].
27pub struct XzDecoder<W: Write> {
28    data: Stream,
29    obj: Option<W>,
30    buf: Vec<u8>,
31}
32
33impl<W: Write> XzEncoder<W> {
34    /// Create a new compression stream which will compress at the given level
35    /// to write compress output to the give output stream.
36    #[inline]
37    pub fn new(obj: W, level: u32) -> XzEncoder<W> {
38        let stream = Stream::new_easy_encoder(level, Check::Crc64).unwrap();
39        XzEncoder::new_stream(obj, stream)
40    }
41    /// Create a new parallel compression stream which will compress at the given level
42    /// to write compress output to the give output stream.
43    #[cfg(feature = "parallel")]
44    pub fn new_parallel(obj: W, level: u32) -> XzEncoder<W> {
45        let stream = MtStreamBuilder::new()
46            .preset(level)
47            .check(Check::Crc64)
48            .threads(num_cpus::get() as u32)
49            .encoder()
50            .unwrap();
51        Self::new_stream(obj, stream)
52    }
53
54    /// Create a new encoder which will use the specified `Stream` to encode
55    /// (compress) data into the provided `obj`.
56    #[inline]
57    pub fn new_stream(obj: W, stream: Stream) -> XzEncoder<W> {
58        XzEncoder {
59            data: stream,
60            obj: Some(obj),
61            buf: Vec::with_capacity(32 * 1024),
62        }
63    }
64
65    /// Acquires a reference to the underlying writer.
66    #[inline]
67    pub fn get_ref(&self) -> &W {
68        self.obj.as_ref().unwrap()
69    }
70
71    /// Acquires a mutable reference to the underlying writer.
72    ///
73    /// Note that mutating the output/input state of the stream may corrupt this
74    /// object, so care must be taken when using this method.
75    #[inline]
76    pub fn get_mut(&mut self) -> &mut W {
77        self.obj.as_mut().unwrap()
78    }
79
80    fn dump(&mut self) -> io::Result<()> {
81        self.obj.as_mut().unwrap().write_all(&self.buf)?;
82        self.buf.clear();
83        Ok(())
84    }
85
86    /// Attempt to finish this output stream, writing out final chunks of data.
87    ///
88    /// Note that this function can only be used once data has finished being
89    /// written to the output stream. After this function is called then further
90    /// calls to `write` may result in a panic.
91    ///
92    /// # Panics
93    ///
94    /// Attempts to write data to this stream may result in a panic after this
95    /// function is called.
96    #[inline]
97    pub fn try_finish(&mut self) -> io::Result<()> {
98        loop {
99            self.dump()?;
100            let res = self.data.process_vec(&[], &mut self.buf, Action::Finish)?;
101            if res == Status::StreamEnd {
102                break;
103            }
104        }
105        self.dump()
106    }
107
108    /// Consumes this encoder, finishing the compression stream.
109    ///
110    /// This will finish the underlying data stream and then return the contained
111    /// writer if the finish succeeded.
112    ///
113    /// Note that this function may not be suitable to call in a situation where
114    /// the underlying stream is an asynchronous I/O stream. To finish a stream
115    /// the `try_finish` (or `shutdown`) method should be used instead. To
116    /// re-acquire ownership of a stream it is safe to call this method after
117    /// `try_finish` or `shutdown` has returned `Ok`.
118    #[inline]
119    pub fn finish(mut self) -> io::Result<W> {
120        self.try_finish()?;
121        Ok(self.obj.take().unwrap())
122    }
123
124    /// Returns the number of bytes produced by the compressor
125    ///
126    /// Note that, due to buffering, this only bears any relation to
127    /// `total_in()` after a call to `flush()`.  At that point,
128    /// `total_out() / total_in()` is the compression ratio.
129    #[inline]
130    pub fn total_out(&self) -> u64 {
131        self.data.total_out()
132    }
133
134    /// Returns the number of bytes consumed by the compressor
135    /// (e.g. the number of bytes written to this stream.)
136    #[inline]
137    pub fn total_in(&self) -> u64 {
138        self.data.total_in()
139    }
140
141    /// Convert to [AutoFinishXzEncoder] that impl [Drop] trait.
142    /// [AutoFinishXzEncoder] automatically calls [XzDecoder::try_finish] method when exiting the scope.
143    #[inline]
144    pub fn auto_finish(self) -> AutoFinishXzEncoder<W> {
145        AutoFinishXzEncoder(self)
146    }
147}
148
149impl<W: Write> Write for XzEncoder<W> {
150    #[inline]
151    fn write(&mut self, data: &[u8]) -> io::Result<usize> {
152        loop {
153            self.dump()?;
154
155            let total_in = self.total_in();
156            self.data.process_vec(data, &mut self.buf, Action::Run)?;
157            let written = (self.total_in() - total_in) as usize;
158
159            if written > 0 || data.is_empty() {
160                return Ok(written);
161            }
162        }
163    }
164
165    #[inline]
166    fn flush(&mut self) -> io::Result<()> {
167        loop {
168            self.dump()?;
169            let status = self
170                .data
171                .process_vec(&[], &mut self.buf, Action::FullFlush)?;
172            if status == Status::StreamEnd {
173                break;
174            }
175        }
176        self.obj.as_mut().unwrap().flush()
177    }
178}
179
180impl<W: Read + Write> Read for XzEncoder<W> {
181    #[inline]
182    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
183        self.get_mut().read(buf)
184    }
185}
186
187impl<W: Write> Drop for XzEncoder<W> {
188    #[inline]
189    fn drop(&mut self) {
190        if self.obj.is_some() {
191            let _ = self.try_finish();
192        }
193    }
194}
195
196impl<W: Write> XzDecoder<W> {
197    /// Creates a new decoding stream which will decode into `obj` one xz stream
198    /// from the input written to it.
199    #[inline]
200    pub fn new(obj: W) -> XzDecoder<W> {
201        let stream = Stream::new_stream_decoder(u64::MAX, 0).unwrap();
202        XzDecoder::new_stream(obj, stream)
203    }
204
205    /// Creates a new parallel decoding stream which will decode into `obj` one xz stream
206    /// from the input written to it.
207    #[cfg(feature = "parallel")]
208    pub fn new_parallel(obj: W) -> Self {
209        let stream = MtStreamBuilder::new()
210            .memlimit_stop(u64::MAX)
211            .threads(num_cpus::get() as u32)
212            .decoder()
213            .unwrap();
214        Self::new_stream(obj, stream)
215    }
216
217    /// Creates a new decoding stream which will decode into `obj` all the xz streams
218    /// from the input written to it.
219    #[inline]
220    pub fn new_multi_decoder(obj: W) -> XzDecoder<W> {
221        let stream = Stream::new_stream_decoder(u64::MAX, liblzma_sys::LZMA_CONCATENATED).unwrap();
222        XzDecoder::new_stream(obj, stream)
223    }
224
225    /// Creates a new decoding stream which will decode all input written to it
226    /// into `obj`.
227    ///
228    /// A custom `stream` can be specified to configure what format this decoder
229    /// will recognize or configure other various decoding options.
230    #[inline]
231    pub fn new_stream(obj: W, stream: Stream) -> XzDecoder<W> {
232        XzDecoder {
233            data: stream,
234            obj: Some(obj),
235            buf: Vec::with_capacity(32 * 1024),
236        }
237    }
238
239    /// Acquires a reference to the underlying writer.
240    #[inline]
241    pub fn get_ref(&self) -> &W {
242        self.obj.as_ref().unwrap()
243    }
244
245    /// Acquires a mutable reference to the underlying writer.
246    ///
247    /// Note that mutating the output/input state of the stream may corrupt this
248    /// object, so care must be taken when using this method.
249    #[inline]
250    pub fn get_mut(&mut self) -> &mut W {
251        self.obj.as_mut().unwrap()
252    }
253
254    fn dump(&mut self) -> io::Result<()> {
255        self.obj.as_mut().unwrap().write_all(&self.buf)?;
256        self.buf.clear();
257        Ok(())
258    }
259
260    /// Attempt to finish this output stream, writing out final chunks of data.
261    ///
262    /// Note that this function can only be used once data has finished being
263    /// written to the output stream. After this function is called then further
264    /// calls to `write` may result in a panic.
265    ///
266    /// # Panics
267    ///
268    /// Attempts to write data to this stream may result in a panic after this
269    /// function is called.
270    #[inline]
271    pub fn try_finish(&mut self) -> io::Result<()> {
272        loop {
273            self.dump()?;
274            let res = self.data.process_vec(&[], &mut self.buf, Action::Finish)?;
275
276            // When decoding a truncated file, XZ returns LZMA_BUF_ERROR and
277            // decodes no new data, which corresponds to this crate's MemNeeded
278            // status.  Since we're finishing, we cannot provide more data so
279            // this is an error.
280            //
281            // See the 02_decompress.c example in xz-utils.
282            if self.buf.is_empty() && res == Status::MemNeeded {
283                let msg = "xz compressed stream is truncated or otherwise corrupt";
284                return Err(io::Error::new(io::ErrorKind::UnexpectedEof, msg));
285            }
286
287            if res == Status::StreamEnd {
288                break;
289            }
290        }
291        self.dump()
292    }
293
294    /// Consumes this decoder, finishing the decompression stream.
295    ///
296    /// This will finish the underlying data stream and then return the contained
297    /// writer if the finish succeeded.
298    ///
299    /// Note that this function may not be suitable to call in a situation where
300    /// the underlying stream is an asynchronous I/O stream. To finish a stream
301    /// the `try_finish` (or `shutdown`) method should be used instead. To
302    /// re-acquire ownership of a stream it is safe to call this method after
303    /// `try_finish` or `shutdown` has returned `Ok`.
304    #[inline]
305    pub fn finish(mut self) -> io::Result<W> {
306        self.try_finish()?;
307        Ok(self.obj.take().unwrap())
308    }
309
310    /// Returns the number of bytes produced by the decompressor
311    ///
312    /// Note that, due to buffering, this only bears any relation to
313    /// `total_in()` after a call to `flush()`.  At that point,
314    /// `total_in() / total_out()` is the compression ratio.
315    #[inline]
316    pub fn total_out(&self) -> u64 {
317        self.data.total_out()
318    }
319
320    /// Returns the number of bytes consumed by the decompressor
321    /// (e.g. the number of bytes written to this stream.)
322    #[inline]
323    pub fn total_in(&self) -> u64 {
324        self.data.total_in()
325    }
326
327    /// Convert to [AutoFinishXzDecoder] that impl [Drop] trait.
328    /// [AutoFinishXzDecoder] automatically calls [XzDecoder::try_finish] method when exiting the scope.
329    #[inline]
330    pub fn auto_finish(self) -> AutoFinishXzDecoder<W> {
331        AutoFinishXzDecoder(self)
332    }
333}
334
335impl<W: Write> Write for XzDecoder<W> {
336    #[inline]
337    fn write(&mut self, data: &[u8]) -> io::Result<usize> {
338        loop {
339            self.dump()?;
340
341            let before = self.total_in();
342            let res = self.data.process_vec(data, &mut self.buf, Action::Run)?;
343            let written = (self.total_in() - before) as usize;
344
345            if written > 0 || data.is_empty() || res == Status::StreamEnd {
346                return Ok(written);
347            }
348        }
349    }
350
351    #[inline]
352    fn flush(&mut self) -> io::Result<()> {
353        self.dump()?;
354        self.obj.as_mut().unwrap().flush()
355    }
356}
357
358impl<W: Read + Write> Read for XzDecoder<W> {
359    #[inline]
360    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
361        self.get_mut().read(buf)
362    }
363}
364
365impl<W: Write> Drop for XzDecoder<W> {
366    #[inline]
367    fn drop(&mut self) {
368        if self.obj.is_some() {
369            let _ = self.try_finish();
370        }
371    }
372}
373
374#[cfg(test)]
375mod tests {
376    use super::*;
377    use crate::stream::LzmaOptions;
378    use quickcheck::quickcheck;
379    use std::iter::repeat;
380    #[cfg(all(target_family = "wasm", target_os = "unknown"))]
381    use wasm_bindgen_test::wasm_bindgen_test as test;
382
383    #[test]
384    fn smoke() {
385        let d = XzDecoder::new(Vec::new());
386        let mut c = XzEncoder::new(d, 6);
387        c.write_all(b"12834").unwrap();
388        let s = repeat("12345").take(100000).collect::<String>();
389        c.write_all(s.as_bytes()).unwrap();
390        let data = c.finish().unwrap().finish().unwrap();
391        assert_eq!(&data[0..5], b"12834");
392        assert_eq!(data.len(), 500005);
393        assert_eq!(format!("12834{}", s).as_bytes(), &*data);
394    }
395
396    #[test]
397    fn write_empty() {
398        let d = XzDecoder::new(Vec::new());
399        let mut c = XzEncoder::new(d, 6);
400        c.write(b"").unwrap();
401        let data = c.finish().unwrap().finish().unwrap();
402        assert_eq!(&data[..], b"");
403    }
404
405    #[test]
406    fn qc_lzma1() {
407        quickcheck(test as fn(_) -> _);
408
409        fn test(v: Vec<u8>) -> bool {
410            let stream = Stream::new_lzma_decoder(u64::MAX).unwrap();
411            let w = XzDecoder::new_stream(Vec::new(), stream);
412            let options = LzmaOptions::new_preset(6).unwrap();
413            let stream = Stream::new_lzma_encoder(&options).unwrap();
414            let mut w = XzEncoder::new_stream(w, stream);
415            w.write_all(&v).unwrap();
416            v == w.finish().unwrap().finish().unwrap()
417        }
418    }
419
420    #[test]
421    fn qc() {
422        quickcheck(test as fn(_) -> _);
423
424        fn test(v: Vec<u8>) -> bool {
425            let w = XzDecoder::new(Vec::new());
426            let mut w = XzEncoder::new(w, 6);
427            w.write_all(&v).unwrap();
428            v == w.finish().unwrap().finish().unwrap()
429        }
430    }
431
432    #[cfg(feature = "parallel")]
433    #[test]
434    fn qc_parallel_encode() {
435        quickcheck(test as fn(_) -> _);
436
437        fn test(v: Vec<u8>) -> bool {
438            let w = XzDecoder::new(Vec::new());
439            let mut w = XzEncoder::new_parallel(w, 6);
440            w.write_all(&v).unwrap();
441            v == w.finish().unwrap().finish().unwrap()
442        }
443    }
444
445    #[cfg(feature = "parallel")]
446    #[test]
447    fn qc_parallel_decode() {
448        quickcheck(test as fn(_) -> _);
449
450        fn test(v: Vec<u8>) -> bool {
451            let w = XzDecoder::new_parallel(Vec::new());
452            let mut w = XzEncoder::new(w, 6);
453            w.write_all(&v).unwrap();
454            v == w.finish().unwrap().finish().unwrap()
455        }
456    }
457}