liblzma/
write.rs

1//! Writer-based compression/decompression streams
2
3mod auto_finish;
4
5use std::io;
6use std::io::prelude::*;
7
8#[cfg(feature = "parallel")]
9use crate::stream::MtStreamBuilder;
10use crate::stream::{Action, Check, Status, Stream};
11pub use auto_finish::{AutoFinishXzDecoder, AutoFinishXzEncoder};
12
13/// A compression stream which will have uncompressed data written to it and
14/// will write compressed data to an output stream.
15/// [XzEncoder] will no longer perform the finalization automatically in the next miner release, so you need to call [XzEncoder::finish] manually.
16/// If you want to automate the finalization process, please use [XzEncoder::auto_finish].
17pub struct XzEncoder<W: Write> {
18    data: Stream,
19    obj: Option<W>,
20    buf: Vec<u8>,
21}
22
23/// A compression stream which will have compressed data written to it and
24/// will write uncompressed data to an output stream.
25/// [XzDecoder] will no longer perform the finalization automatically in the next miner release, so you need to call [XzDecoder::finish] manually.
26/// If you want to automate the finalization process, please use [XzDecoder::auto_finish].
27pub struct XzDecoder<W: Write> {
28    data: Stream,
29    obj: Option<W>,
30    buf: Vec<u8>,
31}
32
33impl<W: Write> XzEncoder<W> {
34    /// Create a new compression stream which will compress at the given level
35    /// to write compress output to the give output stream.
36    ///
37    /// The `level` argument here is typically 0-9 with 6 being a good default.
38    /// To use the slower `xz --extreme`-style preset, bitwise-OR a level with
39    /// [`crate::stream::PRESET_EXTREME`] (for example, `6 | crate::stream::PRESET_EXTREME`).
40    #[inline]
41    pub fn new(obj: W, level: u32) -> XzEncoder<W> {
42        let stream = Stream::new_easy_encoder(level, Check::Crc64).unwrap();
43        XzEncoder::new_stream(obj, stream)
44    }
45    /// Create a new parallel compression stream which will compress at the given level
46    /// to write compress output to the give output stream.
47    ///
48    /// The `level` argument here is typically 0-9 with 6 being a good default.
49    /// To use the slower `xz --extreme`-style preset, bitwise-OR a level with
50    /// [`crate::stream::PRESET_EXTREME`] (for example, `6 | crate::stream::PRESET_EXTREME`).
51    #[cfg(feature = "parallel")]
52    pub fn new_parallel(obj: W, level: u32) -> XzEncoder<W> {
53        let stream = MtStreamBuilder::new()
54            .preset(level)
55            .check(Check::Crc64)
56            .threads(num_cpus::get() as u32)
57            .encoder()
58            .unwrap();
59        Self::new_stream(obj, stream)
60    }
61
62    /// Create a new encoder which will use the specified `Stream` to encode
63    /// (compress) data into the provided `obj`.
64    #[inline]
65    pub fn new_stream(obj: W, stream: Stream) -> XzEncoder<W> {
66        XzEncoder {
67            data: stream,
68            obj: Some(obj),
69            buf: Vec::with_capacity(32 * 1024),
70        }
71    }
72
73    /// Acquires a reference to the underlying writer.
74    #[inline]
75    pub fn get_ref(&self) -> &W {
76        self.obj.as_ref().unwrap()
77    }
78
79    /// Acquires a mutable reference to the underlying writer.
80    ///
81    /// Note that mutating the output/input state of the stream may corrupt this
82    /// object, so care must be taken when using this method.
83    #[inline]
84    pub fn get_mut(&mut self) -> &mut W {
85        self.obj.as_mut().unwrap()
86    }
87
88    fn dump(&mut self) -> io::Result<()> {
89        self.obj.as_mut().unwrap().write_all(&self.buf)?;
90        self.buf.clear();
91        Ok(())
92    }
93
94    /// Attempt to finish this output stream, writing out final chunks of data.
95    ///
96    /// Note that this function can only be used once data has finished being
97    /// written to the output stream. After this function is called then further
98    /// calls to `write` may result in a panic.
99    ///
100    /// # Panics
101    ///
102    /// Attempts to write data to this stream may result in a panic after this
103    /// function is called.
104    #[inline]
105    pub fn try_finish(&mut self) -> io::Result<()> {
106        loop {
107            self.dump()?;
108            let res = self.data.process_vec(&[], &mut self.buf, Action::Finish)?;
109            if res == Status::StreamEnd {
110                break;
111            }
112        }
113        self.dump()
114    }
115
116    /// Consumes this encoder, finishing the compression stream.
117    ///
118    /// This will finish the underlying data stream and then return the contained
119    /// writer if the finish succeeded.
120    ///
121    /// Note that this function may not be suitable to call in a situation where
122    /// the underlying stream is an asynchronous I/O stream. To finish a stream
123    /// the `try_finish` (or `shutdown`) method should be used instead. To
124    /// re-acquire ownership of a stream it is safe to call this method after
125    /// `try_finish` or `shutdown` has returned `Ok`.
126    #[inline]
127    pub fn finish(mut self) -> io::Result<W> {
128        self.try_finish()?;
129        Ok(self.obj.take().unwrap())
130    }
131
132    /// Returns the number of bytes produced by the compressor
133    ///
134    /// Note that, due to buffering, this only bears any relation to
135    /// `total_in()` after a call to `flush()`.  At that point,
136    /// `total_out() / total_in()` is the compression ratio.
137    #[inline]
138    pub fn total_out(&self) -> u64 {
139        self.data.total_out()
140    }
141
142    /// Returns the number of bytes consumed by the compressor
143    /// (e.g. the number of bytes written to this stream.)
144    #[inline]
145    pub fn total_in(&self) -> u64 {
146        self.data.total_in()
147    }
148
149    /// Convert to [AutoFinishXzEncoder] that impl [Drop] trait.
150    /// [AutoFinishXzEncoder] automatically calls [XzDecoder::try_finish] method when exiting the scope.
151    #[inline]
152    pub fn auto_finish(self) -> AutoFinishXzEncoder<W> {
153        AutoFinishXzEncoder(self)
154    }
155}
156
157impl<W: Write> Write for XzEncoder<W> {
158    #[inline]
159    fn write(&mut self, data: &[u8]) -> io::Result<usize> {
160        loop {
161            self.dump()?;
162
163            let total_in = self.total_in();
164            self.data.process_vec(data, &mut self.buf, Action::Run)?;
165            let written = (self.total_in() - total_in) as usize;
166
167            if written > 0 || data.is_empty() {
168                return Ok(written);
169            }
170        }
171    }
172
173    #[inline]
174    fn flush(&mut self) -> io::Result<()> {
175        loop {
176            self.dump()?;
177            let status = self
178                .data
179                .process_vec(&[], &mut self.buf, Action::FullFlush)?;
180            if status == Status::StreamEnd {
181                break;
182            }
183        }
184        self.obj.as_mut().unwrap().flush()
185    }
186}
187
188impl<W: Read + Write> Read for XzEncoder<W> {
189    #[inline]
190    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
191        self.get_mut().read(buf)
192    }
193}
194
195impl<W: Write> Drop for XzEncoder<W> {
196    #[inline]
197    fn drop(&mut self) {
198        if self.obj.is_some() {
199            let _ = self.try_finish();
200        }
201    }
202}
203
204impl<W: Write> XzDecoder<W> {
205    /// Creates a new decoding stream which will decode into `obj` one xz stream
206    /// from the input written to it.
207    #[inline]
208    pub fn new(obj: W) -> XzDecoder<W> {
209        let stream = Stream::new_stream_decoder(u64::MAX, 0).unwrap();
210        XzDecoder::new_stream(obj, stream)
211    }
212
213    /// Creates a new parallel decoding stream which will decode into `obj` one xz stream
214    /// from the input written to it.
215    #[cfg(feature = "parallel")]
216    pub fn new_parallel(obj: W) -> Self {
217        let stream = MtStreamBuilder::new()
218            .memlimit_stop(u64::MAX)
219            .threads(num_cpus::get() as u32)
220            .decoder()
221            .unwrap();
222        Self::new_stream(obj, stream)
223    }
224
225    /// Creates a new decoding stream which will decode into `obj` all the xz streams
226    /// from the input written to it.
227    #[inline]
228    pub fn new_multi_decoder(obj: W) -> XzDecoder<W> {
229        let stream = Stream::new_stream_decoder(u64::MAX, liblzma_sys::LZMA_CONCATENATED).unwrap();
230        XzDecoder::new_stream(obj, stream)
231    }
232
233    /// Creates a new decoding stream which will decode all input written to it
234    /// into `obj`.
235    ///
236    /// A custom `stream` can be specified to configure what format this decoder
237    /// will recognize or configure other various decoding options.
238    #[inline]
239    pub fn new_stream(obj: W, stream: Stream) -> XzDecoder<W> {
240        XzDecoder {
241            data: stream,
242            obj: Some(obj),
243            buf: Vec::with_capacity(32 * 1024),
244        }
245    }
246
247    /// Acquires a reference to the underlying writer.
248    #[inline]
249    pub fn get_ref(&self) -> &W {
250        self.obj.as_ref().unwrap()
251    }
252
253    /// Acquires a mutable reference to the underlying writer.
254    ///
255    /// Note that mutating the output/input state of the stream may corrupt this
256    /// object, so care must be taken when using this method.
257    #[inline]
258    pub fn get_mut(&mut self) -> &mut W {
259        self.obj.as_mut().unwrap()
260    }
261
262    fn dump(&mut self) -> io::Result<()> {
263        self.obj.as_mut().unwrap().write_all(&self.buf)?;
264        self.buf.clear();
265        Ok(())
266    }
267
268    /// Attempt to finish this output stream, writing out final chunks of data.
269    ///
270    /// Note that this function can only be used once data has finished being
271    /// written to the output stream. After this function is called then further
272    /// calls to `write` may result in a panic.
273    ///
274    /// # Panics
275    ///
276    /// Attempts to write data to this stream may result in a panic after this
277    /// function is called.
278    #[inline]
279    pub fn try_finish(&mut self) -> io::Result<()> {
280        loop {
281            self.dump()?;
282            let res = self.data.process_vec(&[], &mut self.buf, Action::Finish)?;
283
284            // When decoding a truncated file, XZ returns LZMA_BUF_ERROR and
285            // decodes no new data, which corresponds to this crate's MemNeeded
286            // status.  Since we're finishing, we cannot provide more data so
287            // this is an error.
288            //
289            // See the 02_decompress.c example in xz-utils.
290            if self.buf.is_empty() && res == Status::MemNeeded {
291                let msg = "xz compressed stream is truncated or otherwise corrupt";
292                return Err(io::Error::new(io::ErrorKind::UnexpectedEof, msg));
293            }
294
295            if res == Status::StreamEnd {
296                break;
297            }
298        }
299        self.dump()
300    }
301
302    /// Consumes this decoder, finishing the decompression stream.
303    ///
304    /// This will finish the underlying data stream and then return the contained
305    /// writer if the finish succeeded.
306    ///
307    /// Note that this function may not be suitable to call in a situation where
308    /// the underlying stream is an asynchronous I/O stream. To finish a stream
309    /// the `try_finish` (or `shutdown`) method should be used instead. To
310    /// re-acquire ownership of a stream it is safe to call this method after
311    /// `try_finish` or `shutdown` has returned `Ok`.
312    #[inline]
313    pub fn finish(mut self) -> io::Result<W> {
314        self.try_finish()?;
315        Ok(self.obj.take().unwrap())
316    }
317
318    /// Returns the number of bytes produced by the decompressor
319    ///
320    /// Note that, due to buffering, this only bears any relation to
321    /// `total_in()` after a call to `flush()`.  At that point,
322    /// `total_in() / total_out()` is the compression ratio.
323    #[inline]
324    pub fn total_out(&self) -> u64 {
325        self.data.total_out()
326    }
327
328    /// Returns the number of bytes consumed by the decompressor
329    /// (e.g. the number of bytes written to this stream.)
330    #[inline]
331    pub fn total_in(&self) -> u64 {
332        self.data.total_in()
333    }
334
335    /// Convert to [AutoFinishXzDecoder] that impl [Drop] trait.
336    /// [AutoFinishXzDecoder] automatically calls [XzDecoder::try_finish] method when exiting the scope.
337    #[inline]
338    pub fn auto_finish(self) -> AutoFinishXzDecoder<W> {
339        AutoFinishXzDecoder(self)
340    }
341}
342
343impl<W: Write> Write for XzDecoder<W> {
344    #[inline]
345    fn write(&mut self, data: &[u8]) -> io::Result<usize> {
346        loop {
347            self.dump()?;
348
349            let before = self.total_in();
350            let res = self.data.process_vec(data, &mut self.buf, Action::Run)?;
351            let written = (self.total_in() - before) as usize;
352
353            if written > 0 || data.is_empty() || res == Status::StreamEnd {
354                return Ok(written);
355            }
356        }
357    }
358
359    #[inline]
360    fn flush(&mut self) -> io::Result<()> {
361        self.dump()?;
362        self.obj.as_mut().unwrap().flush()
363    }
364}
365
366impl<W: Read + Write> Read for XzDecoder<W> {
367    #[inline]
368    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
369        self.get_mut().read(buf)
370    }
371}
372
373impl<W: Write> Drop for XzDecoder<W> {
374    #[inline]
375    fn drop(&mut self) {
376        if self.obj.is_some() {
377            let _ = self.try_finish();
378        }
379    }
380}
381
382#[cfg(test)]
383mod tests {
384    use super::*;
385    use crate::stream::{LzmaOptions, PRESET_EXTREME};
386    use quickcheck::quickcheck;
387    use std::iter::repeat;
388    #[cfg(all(target_family = "wasm", target_os = "unknown"))]
389    use wasm_bindgen_test::wasm_bindgen_test as test;
390
391    #[test]
392    fn smoke() {
393        let d = XzDecoder::new(Vec::new());
394        let mut c = XzEncoder::new(d, 6);
395        c.write_all(b"12834").unwrap();
396        let s = repeat("12345").take(100000).collect::<String>();
397        c.write_all(s.as_bytes()).unwrap();
398        let data = c.finish().unwrap().finish().unwrap();
399        assert_eq!(&data[0..5], b"12834");
400        assert_eq!(data.len(), 500005);
401        assert_eq!(format!("12834{}", s).as_bytes(), &*data);
402    }
403
404    #[test]
405    fn write_empty() {
406        let d = XzDecoder::new(Vec::new());
407        let mut c = XzEncoder::new(d, 6);
408        c.write(b"").unwrap();
409        let data = c.finish().unwrap().finish().unwrap();
410        assert_eq!(&data[..], b"");
411    }
412
413    #[test]
414    fn extreme_preset_round_trip() {
415        let d = XzDecoder::new(Vec::new());
416        let mut c = XzEncoder::new(d, 6 | PRESET_EXTREME);
417        let input = vec![11u8; 128 * 1024 + 1];
418        c.write_all(&input).unwrap();
419        let data = c.finish().unwrap().finish().unwrap();
420        assert_eq!(data, input);
421    }
422
423    #[test]
424    fn qc_lzma1() {
425        quickcheck(test as fn(_) -> _);
426
427        fn test(v: Vec<u8>) -> bool {
428            let stream = Stream::new_lzma_decoder(u64::MAX).unwrap();
429            let w = XzDecoder::new_stream(Vec::new(), stream);
430            let options = LzmaOptions::new_preset(6).unwrap();
431            let stream = Stream::new_lzma_encoder(&options).unwrap();
432            let mut w = XzEncoder::new_stream(w, stream);
433            w.write_all(&v).unwrap();
434            v == w.finish().unwrap().finish().unwrap()
435        }
436    }
437
438    #[test]
439    fn qc() {
440        quickcheck(test as fn(_) -> _);
441
442        fn test(v: Vec<u8>) -> bool {
443            let w = XzDecoder::new(Vec::new());
444            let mut w = XzEncoder::new(w, 6);
445            w.write_all(&v).unwrap();
446            v == w.finish().unwrap().finish().unwrap()
447        }
448    }
449
450    #[cfg(feature = "parallel")]
451    #[test]
452    fn qc_parallel_encode() {
453        quickcheck(test as fn(_) -> _);
454
455        fn test(v: Vec<u8>) -> bool {
456            let w = XzDecoder::new(Vec::new());
457            let mut w = XzEncoder::new_parallel(w, 6);
458            w.write_all(&v).unwrap();
459            v == w.finish().unwrap().finish().unwrap()
460        }
461    }
462
463    #[cfg(feature = "parallel")]
464    #[test]
465    fn qc_parallel_decode() {
466        quickcheck(test as fn(_) -> _);
467
468        fn test(v: Vec<u8>) -> bool {
469            let w = XzDecoder::new_parallel(Vec::new());
470            let mut w = XzEncoder::new(w, 6);
471            w.write_all(&v).unwrap();
472            v == w.finish().unwrap().finish().unwrap()
473        }
474    }
475}