liblzma/
bufread.rs

1//! I/O streams for wrapping `BufRead` types as encoders/decoders
2
3use std::io;
4use std::io::prelude::*;
5
6#[cfg(feature = "parallel")]
7use crate::stream::MtStreamBuilder;
8use crate::stream::{Action, Check, Status, Stream};
9
10/// A xz encoder, or compressor.
11///
12/// This structure implements a `BufRead` interface and will read uncompressed
13/// data from an underlying stream and emit a stream of compressed data.
14pub struct XzEncoder<R> {
15    obj: R,
16    data: Stream,
17}
18
19/// A xz decoder, or decompressor.
20///
21/// This structure implements a `BufRead` interface and takes a stream of
22/// compressed data as input, providing the decompressed data when read from.
23pub struct XzDecoder<R> {
24    obj: R,
25    data: Stream,
26}
27
28impl<R: BufRead> XzEncoder<R> {
29    /// Creates a new encoder which will read uncompressed data from the given
30    /// stream and emit the compressed stream.
31    ///
32    /// The `level` argument here is typically 0-9 with 6 being a good default.
33    #[inline]
34    pub fn new(r: R, level: u32) -> XzEncoder<R> {
35        let stream = Stream::new_easy_encoder(level, Check::Crc64).unwrap();
36        XzEncoder::new_stream(r, stream)
37    }
38
39    /// Creates a new parallel encoder which will read uncompressed data from the given
40    /// stream and emit the compressed stream.
41    ///
42    /// The `level` argument here is typically 0-9 with 6 being a good default.
43    #[cfg(feature = "parallel")]
44    pub fn new_parallel(r: R, level: u32) -> XzEncoder<R> {
45        let stream = MtStreamBuilder::new()
46            .preset(level)
47            .check(Check::Crc64)
48            .threads(num_cpus::get() as u32)
49            .encoder()
50            .unwrap();
51        Self::new_stream(r, stream)
52    }
53
54    /// Creates a new encoder with a custom `Stream`.
55    ///
56    /// The `Stream` can be pre-configured for multithreaded encoding, different
57    /// compression options/tuning, etc.
58    #[inline]
59    pub fn new_stream(r: R, stream: Stream) -> XzEncoder<R> {
60        XzEncoder {
61            obj: r,
62            data: stream,
63        }
64    }
65}
66
67impl<R> XzEncoder<R> {
68    /// Acquires a reference to the underlying stream
69    #[inline]
70    pub fn get_ref(&self) -> &R {
71        &self.obj
72    }
73
74    /// Acquires a mutable reference to the underlying stream
75    ///
76    /// Note that mutation of the stream may result in surprising results if
77    /// this encoder is continued to be used.
78    #[inline]
79    pub fn get_mut(&mut self) -> &mut R {
80        &mut self.obj
81    }
82
83    /// Consumes this encoder, returning the underlying reader.
84    #[inline]
85    pub fn into_inner(self) -> R {
86        self.obj
87    }
88
89    /// Returns the number of bytes produced by the compressor
90    /// (e.g., the number of bytes read from this stream)
91    ///
92    /// Note that, due to buffering, this only bears any relation to
93    /// total_in() when the compressor chooses to flush its data
94    /// (unfortunately, this won't happen in general at the end of the
95    /// stream, because the compressor doesn't know if there's more data
96    /// to come).  At that point, `total_out() / total_in()` would be
97    /// the compression ratio.
98    #[inline]
99    pub fn total_out(&self) -> u64 {
100        self.data.total_out()
101    }
102
103    /// Returns the number of bytes consumed by the compressor
104    /// (e.g., the number of bytes read from the underlying stream)
105    #[inline]
106    pub fn total_in(&self) -> u64 {
107        self.data.total_in()
108    }
109}
110
111impl<R: BufRead> Read for XzEncoder<R> {
112    #[inline]
113    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
114        if buf.is_empty() {
115            return Ok(0);
116        }
117
118        loop {
119            let (read, consumed, eof, ret);
120            {
121                let input = self.obj.fill_buf()?;
122                eof = input.is_empty();
123                let before_out = self.data.total_out();
124                let before_in = self.data.total_in();
125                let action = if eof { Action::Finish } else { Action::Run };
126                ret = self.data.process(input, buf, action);
127                read = (self.data.total_out() - before_out) as usize;
128                consumed = (self.data.total_in() - before_in) as usize;
129            };
130            self.obj.consume(consumed);
131
132            ret?;
133
134            // If we haven't ready any data and we haven't hit EOF yet, then we
135            // need to keep asking for more data because if we return that 0
136            // bytes of data have been read then it will be interpreted as EOF.
137            if read == 0 && !eof {
138                continue;
139            }
140            return Ok(read);
141        }
142    }
143}
144
145impl<W: Write> Write for XzEncoder<W> {
146    #[inline]
147    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
148        self.get_mut().write(buf)
149    }
150
151    #[inline]
152    fn flush(&mut self) -> io::Result<()> {
153        self.get_mut().flush()
154    }
155}
156
157impl<R: BufRead> XzDecoder<R> {
158    /// Creates a new decoder which will decompress data read from the given
159    /// stream.
160    #[inline]
161    pub fn new(r: R) -> XzDecoder<R> {
162        let stream = Stream::new_stream_decoder(u64::MAX, 0).unwrap();
163        XzDecoder::new_stream(r, stream)
164    }
165
166    /// Creates a new parallel decoder which will decompress data read from the given
167    /// stream.
168    #[cfg(feature = "parallel")]
169    pub fn new_parallel(r: R) -> Self {
170        let stream = MtStreamBuilder::new()
171            .memlimit_stop(u64::MAX)
172            .threads(num_cpus::get() as u32)
173            .decoder()
174            .unwrap();
175        Self::new_stream(r, stream)
176    }
177
178    /// Creates a new decoder which will decompress data read from the given
179    /// input. All the concatenated xz streams from input will be consumed.
180    #[inline]
181    pub fn new_multi_decoder(r: R) -> XzDecoder<R> {
182        let stream = Stream::new_auto_decoder(u64::MAX, liblzma_sys::LZMA_CONCATENATED).unwrap();
183        XzDecoder::new_stream(r, stream)
184    }
185
186    /// Creates a new decoder with a custom `Stream`.
187    ///
188    /// The `Stream` can be pre-configured for various checks, different
189    /// decompression options/tuning, etc.
190    #[inline]
191    pub fn new_stream(r: R, stream: Stream) -> XzDecoder<R> {
192        XzDecoder {
193            obj: r,
194            data: stream,
195        }
196    }
197}
198
199impl<R> XzDecoder<R> {
200    /// Acquires a reference to the underlying stream
201    #[inline]
202    pub fn get_ref(&self) -> &R {
203        &self.obj
204    }
205
206    /// Acquires a mutable reference to the underlying stream
207    ///
208    /// Note that mutation of the stream may result in surprising results if
209    /// this encoder is continued to be used.
210    #[inline]
211    pub fn get_mut(&mut self) -> &mut R {
212        &mut self.obj
213    }
214
215    /// Consumes this decoder, returning the underlying reader.
216    #[inline]
217    pub fn into_inner(self) -> R {
218        self.obj
219    }
220
221    /// Returns the number of bytes that the decompressor has consumed.
222    ///
223    /// Note that this will likely be smaller than what the decompressor
224    /// actually read from the underlying stream due to buffering.
225    #[inline]
226    pub fn total_in(&self) -> u64 {
227        self.data.total_in()
228    }
229
230    /// Returns the number of bytes that the decompressor has produced.
231    #[inline]
232    pub fn total_out(&self) -> u64 {
233        self.data.total_out()
234    }
235}
236
237impl<R: BufRead> Read for XzDecoder<R> {
238    #[inline]
239    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
240        if buf.is_empty() {
241            return Ok(0);
242        }
243
244        loop {
245            let (read, consumed, eof, ret);
246            {
247                let input = self.obj.fill_buf()?;
248                eof = input.is_empty();
249                let before_out = self.data.total_out();
250                let before_in = self.data.total_in();
251                ret = self
252                    .data
253                    .process(input, buf, if eof { Action::Finish } else { Action::Run });
254                read = (self.data.total_out() - before_out) as usize;
255                consumed = (self.data.total_in() - before_in) as usize;
256            }
257            self.obj.consume(consumed);
258
259            let status = ret?;
260            if read > 0 || eof || status == Status::StreamEnd {
261                if read == 0 && status != Status::StreamEnd {
262                    return Err(io::Error::new(
263                        io::ErrorKind::UnexpectedEof,
264                        "premature eof",
265                    ));
266                }
267                return Ok(read);
268            }
269            if consumed == 0 {
270                return Err(io::Error::new(
271                    io::ErrorKind::InvalidData,
272                    "corrupt xz stream",
273                ));
274            }
275        }
276    }
277}
278
279impl<W: Write> Write for XzDecoder<W> {
280    #[inline]
281    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
282        self.get_mut().write(buf)
283    }
284
285    #[inline]
286    fn flush(&mut self) -> io::Result<()> {
287        self.get_mut().flush()
288    }
289}
290
291#[cfg(test)]
292mod tests {
293    use super::*;
294    #[cfg(all(target_family = "wasm", target_os = "unknown"))]
295    use wasm_bindgen_test::wasm_bindgen_test as test;
296
297    #[test]
298    fn compressed_and_trailing_data() {
299        // Make a vector with compressed data...
300        let mut to_compress: Vec<u8> = Vec::new();
301        const COMPRESSED_ORIG_SIZE: usize = 1024;
302        for num in 0..COMPRESSED_ORIG_SIZE {
303            to_compress.push(num as u8)
304        }
305        let mut encoder = XzEncoder::new(&to_compress[..], 6);
306
307        let mut decoder_input = Vec::new();
308        encoder.read_to_end(&mut decoder_input).unwrap();
309
310        assert_eq!(encoder.total_in(), to_compress.len() as u64);
311        assert_eq!(encoder.total_out(), decoder_input.len() as u64);
312
313        // ...plus additional unrelated trailing data
314        const ADDITIONAL_SIZE: usize = 123;
315        let mut additional_data = Vec::new();
316        for num in 0..ADDITIONAL_SIZE {
317            additional_data.push(((25 + num) % 256) as u8)
318        }
319        decoder_input.extend(&additional_data);
320
321        // Decoder must be able to read the compressed xz stream, and keep the trailing data.
322        let mut decoder_reader = &decoder_input[..];
323        {
324            let mut decoder = XzDecoder::new(&mut decoder_reader);
325            let mut decompressed_data = vec![0u8; to_compress.len()];
326
327            assert_eq!(
328                decoder.read(&mut decompressed_data).unwrap(),
329                COMPRESSED_ORIG_SIZE
330            );
331            assert_eq!(decompressed_data, &to_compress[..]);
332            assert_eq!(
333                decoder.total_in(),
334                (decoder_input.len() - ADDITIONAL_SIZE) as u64
335            );
336            assert_eq!(decoder.total_out(), decompressed_data.len() as u64);
337        }
338
339        let mut remaining_data = Vec::new();
340        let nb_read = decoder_reader.read_to_end(&mut remaining_data).unwrap();
341        assert_eq!(nb_read, ADDITIONAL_SIZE);
342        assert_eq!(remaining_data, &additional_data[..]);
343    }
344}