liblzma/
bufread.rs

1//! I/O streams for wrapping `BufRead` types as encoders/decoders
2
3use std::io;
4use std::io::prelude::*;
5
6#[cfg(feature = "parallel")]
7use crate::stream::MtStreamBuilder;
8use crate::stream::{Action, Check, Status, Stream};
9
10/// A xz encoder, or compressor.
11///
12/// This structure implements a `BufRead` interface and will read uncompressed
13/// data from an underlying stream and emit a stream of compressed data.
14pub struct XzEncoder<R> {
15    obj: R,
16    data: Stream,
17}
18
19/// A xz decoder, or decompressor.
20///
21/// This structure implements a `BufRead` interface and takes a stream of
22/// compressed data as input, providing the decompressed data when read from.
23pub struct XzDecoder<R> {
24    obj: R,
25    data: Stream,
26}
27
28impl<R: BufRead> XzEncoder<R> {
29    /// Creates a new encoder which will read uncompressed data from the given
30    /// stream and emit the compressed stream.
31    ///
32    /// The `level` argument here is typically 0-9 with 6 being a good default.
33    /// To use the slower `xz --extreme`-style preset, bitwise-OR a level with
34    /// [`crate::stream::PRESET_EXTREME`] (for example, `6 | crate::stream::PRESET_EXTREME`).
35    #[inline]
36    pub fn new(r: R, level: u32) -> XzEncoder<R> {
37        let stream = Stream::new_easy_encoder(level, Check::Crc64).unwrap();
38        XzEncoder::new_stream(r, stream)
39    }
40
41    /// Creates a new parallel encoder which will read uncompressed data from the given
42    /// stream and emit the compressed stream.
43    ///
44    /// The `level` argument here is typically 0-9 with 6 being a good default.
45    /// To use the slower `xz --extreme`-style preset, bitwise-OR a level with
46    /// [`crate::stream::PRESET_EXTREME`] (for example, `6 | crate::stream::PRESET_EXTREME`).
47    #[cfg(feature = "parallel")]
48    pub fn new_parallel(r: R, level: u32) -> XzEncoder<R> {
49        let stream = MtStreamBuilder::new()
50            .preset(level)
51            .check(Check::Crc64)
52            .threads(num_cpus::get() as u32)
53            .encoder()
54            .unwrap();
55        Self::new_stream(r, stream)
56    }
57
58    /// Creates a new encoder with a custom `Stream`.
59    ///
60    /// The `Stream` can be pre-configured for multithreaded encoding, different
61    /// compression options/tuning, etc.
62    #[inline]
63    pub fn new_stream(r: R, stream: Stream) -> XzEncoder<R> {
64        XzEncoder {
65            obj: r,
66            data: stream,
67        }
68    }
69}
70
71impl<R> XzEncoder<R> {
72    /// Acquires a reference to the underlying stream
73    #[inline]
74    pub fn get_ref(&self) -> &R {
75        &self.obj
76    }
77
78    /// Acquires a mutable reference to the underlying stream
79    ///
80    /// Note that mutation of the stream may result in surprising results if
81    /// this encoder is continued to be used.
82    #[inline]
83    pub fn get_mut(&mut self) -> &mut R {
84        &mut self.obj
85    }
86
87    /// Consumes this encoder, returning the underlying reader.
88    #[inline]
89    pub fn into_inner(self) -> R {
90        self.obj
91    }
92
93    /// Returns the number of bytes produced by the compressor
94    /// (e.g., the number of bytes read from this stream)
95    ///
96    /// Note that, due to buffering, this only bears any relation to
97    /// total_in() when the compressor chooses to flush its data
98    /// (unfortunately, this won't happen in general at the end of the
99    /// stream, because the compressor doesn't know if there's more data
100    /// to come).  At that point, `total_out() / total_in()` would be
101    /// the compression ratio.
102    #[inline]
103    pub fn total_out(&self) -> u64 {
104        self.data.total_out()
105    }
106
107    /// Returns the number of bytes consumed by the compressor
108    /// (e.g., the number of bytes read from the underlying stream)
109    #[inline]
110    pub fn total_in(&self) -> u64 {
111        self.data.total_in()
112    }
113}
114
115impl<R: BufRead> Read for XzEncoder<R> {
116    #[inline]
117    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
118        if buf.is_empty() {
119            return Ok(0);
120        }
121
122        loop {
123            let (read, consumed, eof, ret);
124            {
125                let input = self.obj.fill_buf()?;
126                eof = input.is_empty();
127                let before_out = self.data.total_out();
128                let before_in = self.data.total_in();
129                let action = if eof { Action::Finish } else { Action::Run };
130                ret = self.data.process(input, buf, action);
131                read = (self.data.total_out() - before_out) as usize;
132                consumed = (self.data.total_in() - before_in) as usize;
133            };
134            self.obj.consume(consumed);
135
136            ret?;
137
138            // If we haven't ready any data and we haven't hit EOF yet, then we
139            // need to keep asking for more data because if we return that 0
140            // bytes of data have been read then it will be interpreted as EOF.
141            if read == 0 && !eof {
142                continue;
143            }
144            return Ok(read);
145        }
146    }
147}
148
149impl<W: Write> Write for XzEncoder<W> {
150    #[inline]
151    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
152        self.get_mut().write(buf)
153    }
154
155    #[inline]
156    fn flush(&mut self) -> io::Result<()> {
157        self.get_mut().flush()
158    }
159}
160
161impl<R: BufRead> XzDecoder<R> {
162    /// Creates a new decoder which will decompress data read from the given
163    /// stream.
164    #[inline]
165    pub fn new(r: R) -> XzDecoder<R> {
166        let stream = Stream::new_stream_decoder(u64::MAX, 0).unwrap();
167        XzDecoder::new_stream(r, stream)
168    }
169
170    /// Creates a new parallel decoder which will decompress data read from the given
171    /// stream.
172    #[cfg(feature = "parallel")]
173    pub fn new_parallel(r: R) -> Self {
174        let stream = MtStreamBuilder::new()
175            .memlimit_stop(u64::MAX)
176            .threads(num_cpus::get() as u32)
177            .decoder()
178            .unwrap();
179        Self::new_stream(r, stream)
180    }
181
182    /// Creates a new decoder which will decompress data read from the given
183    /// input. All the concatenated xz streams from input will be consumed.
184    #[inline]
185    pub fn new_multi_decoder(r: R) -> XzDecoder<R> {
186        let stream = Stream::new_auto_decoder(u64::MAX, liblzma_sys::LZMA_CONCATENATED).unwrap();
187        XzDecoder::new_stream(r, stream)
188    }
189
190    /// Creates a new decoder with a custom `Stream`.
191    ///
192    /// The `Stream` can be pre-configured for various checks, different
193    /// decompression options/tuning, etc.
194    #[inline]
195    pub fn new_stream(r: R, stream: Stream) -> XzDecoder<R> {
196        XzDecoder {
197            obj: r,
198            data: stream,
199        }
200    }
201}
202
203impl<R> XzDecoder<R> {
204    /// Acquires a reference to the underlying stream
205    #[inline]
206    pub fn get_ref(&self) -> &R {
207        &self.obj
208    }
209
210    /// Acquires a mutable reference to the underlying stream
211    ///
212    /// Note that mutation of the stream may result in surprising results if
213    /// this encoder is continued to be used.
214    #[inline]
215    pub fn get_mut(&mut self) -> &mut R {
216        &mut self.obj
217    }
218
219    /// Consumes this decoder, returning the underlying reader.
220    #[inline]
221    pub fn into_inner(self) -> R {
222        self.obj
223    }
224
225    /// Returns the number of bytes that the decompressor has consumed.
226    ///
227    /// Note that this will likely be smaller than what the decompressor
228    /// actually read from the underlying stream due to buffering.
229    #[inline]
230    pub fn total_in(&self) -> u64 {
231        self.data.total_in()
232    }
233
234    /// Returns the number of bytes that the decompressor has produced.
235    #[inline]
236    pub fn total_out(&self) -> u64 {
237        self.data.total_out()
238    }
239}
240
241impl<R: BufRead> Read for XzDecoder<R> {
242    #[inline]
243    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
244        if buf.is_empty() {
245            return Ok(0);
246        }
247
248        loop {
249            let (read, consumed, eof, ret);
250            {
251                let input = self.obj.fill_buf()?;
252                eof = input.is_empty();
253                let before_out = self.data.total_out();
254                let before_in = self.data.total_in();
255                ret = self
256                    .data
257                    .process(input, buf, if eof { Action::Finish } else { Action::Run });
258                read = (self.data.total_out() - before_out) as usize;
259                consumed = (self.data.total_in() - before_in) as usize;
260            }
261            self.obj.consume(consumed);
262
263            let status = ret?;
264            if read > 0 || eof || status == Status::StreamEnd {
265                if read == 0 && status != Status::StreamEnd {
266                    return Err(io::Error::new(
267                        io::ErrorKind::UnexpectedEof,
268                        "premature eof",
269                    ));
270                }
271                return Ok(read);
272            }
273            if consumed == 0 {
274                return Err(io::Error::new(
275                    io::ErrorKind::InvalidData,
276                    "corrupt xz stream",
277                ));
278            }
279        }
280    }
281}
282
283impl<W: Write> Write for XzDecoder<W> {
284    #[inline]
285    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
286        self.get_mut().write(buf)
287    }
288
289    #[inline]
290    fn flush(&mut self) -> io::Result<()> {
291        self.get_mut().flush()
292    }
293}
294
295#[cfg(test)]
296mod tests {
297    use super::*;
298    #[cfg(all(target_family = "wasm", target_os = "unknown"))]
299    use wasm_bindgen_test::wasm_bindgen_test as test;
300
301    #[test]
302    fn compressed_and_trailing_data() {
303        // Make a vector with compressed data...
304        let mut to_compress: Vec<u8> = Vec::new();
305        const COMPRESSED_ORIG_SIZE: usize = 1024;
306        for num in 0..COMPRESSED_ORIG_SIZE {
307            to_compress.push(num as u8)
308        }
309        let mut encoder = XzEncoder::new(&to_compress[..], 6);
310
311        let mut decoder_input = Vec::new();
312        encoder.read_to_end(&mut decoder_input).unwrap();
313
314        assert_eq!(encoder.total_in(), to_compress.len() as u64);
315        assert_eq!(encoder.total_out(), decoder_input.len() as u64);
316
317        // ...plus additional unrelated trailing data
318        const ADDITIONAL_SIZE: usize = 123;
319        let mut additional_data = Vec::new();
320        for num in 0..ADDITIONAL_SIZE {
321            additional_data.push(((25 + num) % 256) as u8)
322        }
323        decoder_input.extend(&additional_data);
324
325        // Decoder must be able to read the compressed xz stream, and keep the trailing data.
326        let mut decoder_reader = &decoder_input[..];
327        {
328            let mut decoder = XzDecoder::new(&mut decoder_reader);
329            let mut decompressed_data = vec![0u8; to_compress.len()];
330
331            assert_eq!(
332                decoder.read(&mut decompressed_data).unwrap(),
333                COMPRESSED_ORIG_SIZE
334            );
335            assert_eq!(decompressed_data, &to_compress[..]);
336            assert_eq!(
337                decoder.total_in(),
338                (decoder_input.len() - ADDITIONAL_SIZE) as u64
339            );
340            assert_eq!(decoder.total_out(), decompressed_data.len() as u64);
341        }
342
343        let mut remaining_data = Vec::new();
344        let nb_read = decoder_reader.read_to_end(&mut remaining_data).unwrap();
345        assert_eq!(nb_read, ADDITIONAL_SIZE);
346        assert_eq!(remaining_data, &additional_data[..]);
347    }
348}