bzip2/
bufread.rs

1//! I/O streams for wrapping `BufRead` types as encoders/decoders
2
3use std::io;
4use std::io::prelude::*;
5
6use crate::{Action, Compress, Compression, Decompress, Status};
7
8/// A bz2 encoder, or compressor.
9///
10/// This structure implements a [`BufRead`] interface and will read uncompressed
11/// data from an underlying stream and emit a stream of compressed data.
12pub struct BzEncoder<R> {
13    obj: R,
14    data: Compress,
15    done: bool,
16}
17
18/// A bz2 decoder, or decompressor.
19///
20/// This structure implements a [`BufRead`] interface and takes a stream of
21/// compressed data as input, providing the decompressed data when read from.
22pub struct BzDecoder<R> {
23    obj: R,
24    data: Decompress,
25    done: bool,
26    multi: bool,
27}
28
29impl<R: BufRead> BzEncoder<R> {
30    /// Creates a new encoder which will read uncompressed data from the given
31    /// stream and emit the compressed stream.
32    pub fn new(r: R, level: Compression) -> BzEncoder<R> {
33        BzEncoder {
34            obj: r,
35            data: Compress::new(level, 30),
36            done: false,
37        }
38    }
39}
40
41impl<R> BzEncoder<R> {
42    /// Acquires a reference to the underlying stream
43    pub fn get_ref(&self) -> &R {
44        &self.obj
45    }
46
47    /// Acquires a mutable reference to the underlying stream
48    ///
49    /// Note that mutation of the stream may result in surprising results if
50    /// this encoder is continued to be used.
51    pub fn get_mut(&mut self) -> &mut R {
52        &mut self.obj
53    }
54
55    /// Consumes this encoder, returning the underlying reader.
56    pub fn into_inner(self) -> R {
57        self.obj
58    }
59
60    /// Returns the number of bytes produced by the compressor
61    /// (e.g. the number of bytes read from this stream)
62    ///
63    /// Note that, due to buffering, this only bears any relation to
64    /// total_in() when the compressor chooses to flush its data
65    /// (unfortunately, this won't happen in general
66    /// at the end of the stream, because the compressor doesn't know
67    /// if there's more data to come).  At that point,
68    /// `total_out() / total_in()` would be the compression ratio.
69    pub fn total_out(&self) -> u64 {
70        self.data.total_out()
71    }
72
73    /// Returns the number of bytes consumed by the compressor
74    /// (e.g. the number of bytes read from the underlying stream)
75    pub fn total_in(&self) -> u64 {
76        self.data.total_in()
77    }
78}
79
80impl<R: BufRead> Read for BzEncoder<R> {
81    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
82        if self.done {
83            return Ok(0);
84        }
85        loop {
86            let (read, consumed, eof, ret);
87            {
88                let input = self.obj.fill_buf()?;
89                eof = input.is_empty();
90                let before_out = self.data.total_out();
91                let before_in = self.data.total_in();
92                let action = if eof { Action::Finish } else { Action::Run };
93                ret = self.data.compress(input, buf, action);
94                read = (self.data.total_out() - before_out) as usize;
95                consumed = (self.data.total_in() - before_in) as usize;
96            }
97            self.obj.consume(consumed);
98
99            // we should never get the sequence error that's possible to be
100            // returned from compression
101            let ret = ret.unwrap();
102
103            // If we haven't ready any data and we haven't hit EOF yet, then we
104            // need to keep asking for more data because if we return that 0
105            // bytes of data have been read then it will be interpreted as EOF.
106            if read == 0 && !eof && !buf.is_empty() {
107                continue;
108            }
109            if ret == Status::StreamEnd {
110                self.done = true;
111            }
112            return Ok(read);
113        }
114    }
115}
116
117impl<W: Write> Write for BzEncoder<W> {
118    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
119        self.get_mut().write(buf)
120    }
121
122    fn flush(&mut self) -> io::Result<()> {
123        self.get_mut().flush()
124    }
125}
126
127impl<R: BufRead> BzDecoder<R> {
128    /// Creates a new decoder which will decompress data read from the given
129    /// stream.
130    pub fn new(r: R) -> BzDecoder<R> {
131        BzDecoder {
132            obj: r,
133            data: Decompress::new(false),
134            done: false,
135            multi: false,
136        }
137    }
138
139    fn multi(mut self, flag: bool) -> BzDecoder<R> {
140        self.multi = flag;
141        self
142    }
143}
144
145impl<R> BzDecoder<R> {
146    /// Acquires a reference to the underlying stream
147    pub fn get_ref(&self) -> &R {
148        &self.obj
149    }
150
151    /// Acquires a mutable reference to the underlying stream
152    ///
153    /// Note that mutation of the stream may result in surprising results if
154    /// this encoder is continued to be used.
155    pub fn get_mut(&mut self) -> &mut R {
156        &mut self.obj
157    }
158
159    /// Consumes this decoder, returning the underlying reader.
160    pub fn into_inner(self) -> R {
161        self.obj
162    }
163
164    /// Returns the number of bytes that the decompressor has consumed.
165    ///
166    /// Note that this will likely be smaller than what the decompressor
167    /// actually read from the underlying stream due to buffering.
168    pub fn total_in(&self) -> u64 {
169        self.data.total_in()
170    }
171
172    /// Returns the number of bytes that the decompressor has produced.
173    pub fn total_out(&self) -> u64 {
174        self.data.total_out()
175    }
176}
177
178impl<R: BufRead> Read for BzDecoder<R> {
179    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
180        loop {
181            if self.done && !self.multi {
182                return Ok(0);
183            }
184            let (read, consumed, remaining, ret);
185            {
186                let input = self.obj.fill_buf()?;
187                if self.done {
188                    assert!(self.multi);
189                    if input.is_empty() {
190                        // beyond last stream in multi-stream case
191                        return Ok(0);
192                    } else {
193                        // previous stream ended, more data follows => create new decompressor
194                        self.data = Decompress::new(false);
195                        self.done = false;
196                    }
197                }
198                let before_out = self.data.total_out();
199                let before_in = self.data.total_in();
200                ret = self.data.decompress(input, buf);
201                read = (self.data.total_out() - before_out) as usize;
202                consumed = (self.data.total_in() - before_in) as usize;
203                remaining = input.len() - consumed;
204            }
205            self.obj.consume(consumed);
206
207            let ret = ret.map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?;
208            if ret == Status::StreamEnd {
209                self.done = true;
210            } else if consumed == 0 && remaining == 0 && read == 0 {
211                return Err(io::Error::new(
212                    io::ErrorKind::UnexpectedEof,
213                    "decompression not finished but EOF reached",
214                ));
215            }
216
217            if read > 0 || buf.is_empty() {
218                return Ok(read);
219            }
220        }
221    }
222}
223
224impl<W: Write> Write for BzDecoder<W> {
225    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
226        self.get_mut().write(buf)
227    }
228
229    fn flush(&mut self) -> io::Result<()> {
230        self.get_mut().flush()
231    }
232}
233
234/// A bzip2 streaming decoder that decodes all members of a multistream.
235///
236/// Wikipedia, particularly, uses bzip2 multistream for their dumps, and the
237/// `pbzip2` tool creates such data as well.
238pub struct MultiBzDecoder<R>(BzDecoder<R>);
239
240impl<R: BufRead> MultiBzDecoder<R> {
241    /// Creates a new decoder from the given reader. If the bzip2 stream contains multiple members
242    /// all will be decoded.
243    pub fn new(r: R) -> MultiBzDecoder<R> {
244        MultiBzDecoder(BzDecoder::new(r).multi(true))
245    }
246}
247
248impl<R> MultiBzDecoder<R> {
249    /// Acquires a reference to the underlying reader.
250    pub fn get_ref(&self) -> &R {
251        self.0.get_ref()
252    }
253
254    /// Acquires a mutable reference to the underlying stream.
255    ///
256    /// Note that mutation of the stream may result in surprising results if
257    /// this encoder is continued to be used.
258    pub fn get_mut(&mut self) -> &mut R {
259        self.0.get_mut()
260    }
261
262    /// Consumes this decoder, returning the underlying reader.
263    pub fn into_inner(self) -> R {
264        self.0.into_inner()
265    }
266}
267
268impl<R: BufRead> Read for MultiBzDecoder<R> {
269    fn read(&mut self, into: &mut [u8]) -> io::Result<usize> {
270        self.0.read(into)
271    }
272}
273
274impl<R: BufRead + Write> Write for MultiBzDecoder<R> {
275    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
276        self.get_mut().write(buf)
277    }
278
279    fn flush(&mut self) -> io::Result<()> {
280        self.get_mut().flush()
281    }
282}
283
284#[cfg(test)]
285mod tests {
286    use super::MultiBzDecoder;
287    use std::io::{BufReader, Read};
288
289    #[test]
290    fn bug_61() {
291        let compressed_bytes = include_bytes!("../tests/bug_61.bz2");
292        let uncompressed_bytes = include_bytes!("../tests/bug_61.raw");
293        let reader = BufReader::with_capacity(8192, compressed_bytes.as_ref());
294
295        let mut d = MultiBzDecoder::new(reader);
296        let mut data = Vec::new();
297
298        assert_eq!(d.read_to_end(&mut data).unwrap(), uncompressed_bytes.len());
299        assert_eq!(data, uncompressed_bytes);
300    }
301}