1mod auto_finish;
4
5use std::io;
6use std::io::prelude::*;
7
8#[cfg(feature = "parallel")]
9use crate::stream::MtStreamBuilder;
10use crate::stream::{Action, Check, Status, Stream};
11pub use auto_finish::{AutoFinishXzDecoder, AutoFinishXzEncoder};
12
13pub struct XzEncoder<W: Write> {
18    data: Stream,
19    obj: Option<W>,
20    buf: Vec<u8>,
21}
22
23pub struct XzDecoder<W: Write> {
28    data: Stream,
29    obj: Option<W>,
30    buf: Vec<u8>,
31}
32
33impl<W: Write> XzEncoder<W> {
34    #[inline]
37    pub fn new(obj: W, level: u32) -> XzEncoder<W> {
38        let stream = Stream::new_easy_encoder(level, Check::Crc64).unwrap();
39        XzEncoder::new_stream(obj, stream)
40    }
41    #[cfg(feature = "parallel")]
44    pub fn new_parallel(obj: W, level: u32) -> XzEncoder<W> {
45        let stream = MtStreamBuilder::new()
46            .preset(level)
47            .check(Check::Crc64)
48            .threads(num_cpus::get() as u32)
49            .encoder()
50            .unwrap();
51        Self::new_stream(obj, stream)
52    }
53
54    #[inline]
57    pub fn new_stream(obj: W, stream: Stream) -> XzEncoder<W> {
58        XzEncoder {
59            data: stream,
60            obj: Some(obj),
61            buf: Vec::with_capacity(32 * 1024),
62        }
63    }
64
65    #[inline]
67    pub fn get_ref(&self) -> &W {
68        self.obj.as_ref().unwrap()
69    }
70
71    #[inline]
76    pub fn get_mut(&mut self) -> &mut W {
77        self.obj.as_mut().unwrap()
78    }
79
80    fn dump(&mut self) -> io::Result<()> {
81        self.obj.as_mut().unwrap().write_all(&self.buf)?;
82        self.buf.clear();
83        Ok(())
84    }
85
86    #[inline]
97    pub fn try_finish(&mut self) -> io::Result<()> {
98        loop {
99            self.dump()?;
100            let res = self.data.process_vec(&[], &mut self.buf, Action::Finish)?;
101            if res == Status::StreamEnd {
102                break;
103            }
104        }
105        self.dump()
106    }
107
108    #[inline]
119    pub fn finish(mut self) -> io::Result<W> {
120        self.try_finish()?;
121        Ok(self.obj.take().unwrap())
122    }
123
124    #[inline]
130    pub fn total_out(&self) -> u64 {
131        self.data.total_out()
132    }
133
134    #[inline]
137    pub fn total_in(&self) -> u64 {
138        self.data.total_in()
139    }
140
141    #[inline]
144    pub fn auto_finish(self) -> AutoFinishXzEncoder<W> {
145        AutoFinishXzEncoder(self)
146    }
147}
148
149impl<W: Write> Write for XzEncoder<W> {
150    #[inline]
151    fn write(&mut self, data: &[u8]) -> io::Result<usize> {
152        loop {
153            self.dump()?;
154
155            let total_in = self.total_in();
156            self.data.process_vec(data, &mut self.buf, Action::Run)?;
157            let written = (self.total_in() - total_in) as usize;
158
159            if written > 0 || data.is_empty() {
160                return Ok(written);
161            }
162        }
163    }
164
165    #[inline]
166    fn flush(&mut self) -> io::Result<()> {
167        loop {
168            self.dump()?;
169            let status = self
170                .data
171                .process_vec(&[], &mut self.buf, Action::FullFlush)?;
172            if status == Status::StreamEnd {
173                break;
174            }
175        }
176        self.obj.as_mut().unwrap().flush()
177    }
178}
179
180impl<W: Read + Write> Read for XzEncoder<W> {
181    #[inline]
182    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
183        self.get_mut().read(buf)
184    }
185}
186
187impl<W: Write> Drop for XzEncoder<W> {
188    #[inline]
189    fn drop(&mut self) {
190        if self.obj.is_some() {
191            let _ = self.try_finish();
192        }
193    }
194}
195
196impl<W: Write> XzDecoder<W> {
197    #[inline]
200    pub fn new(obj: W) -> XzDecoder<W> {
201        let stream = Stream::new_stream_decoder(u64::MAX, 0).unwrap();
202        XzDecoder::new_stream(obj, stream)
203    }
204
205    #[cfg(feature = "parallel")]
208    pub fn new_parallel(obj: W) -> Self {
209        let stream = MtStreamBuilder::new()
210            .memlimit_stop(u64::MAX)
211            .threads(num_cpus::get() as u32)
212            .decoder()
213            .unwrap();
214        Self::new_stream(obj, stream)
215    }
216
217    #[inline]
220    pub fn new_multi_decoder(obj: W) -> XzDecoder<W> {
221        let stream = Stream::new_stream_decoder(u64::MAX, liblzma_sys::LZMA_CONCATENATED).unwrap();
222        XzDecoder::new_stream(obj, stream)
223    }
224
225    #[inline]
231    pub fn new_stream(obj: W, stream: Stream) -> XzDecoder<W> {
232        XzDecoder {
233            data: stream,
234            obj: Some(obj),
235            buf: Vec::with_capacity(32 * 1024),
236        }
237    }
238
239    #[inline]
241    pub fn get_ref(&self) -> &W {
242        self.obj.as_ref().unwrap()
243    }
244
245    #[inline]
250    pub fn get_mut(&mut self) -> &mut W {
251        self.obj.as_mut().unwrap()
252    }
253
254    fn dump(&mut self) -> io::Result<()> {
255        self.obj.as_mut().unwrap().write_all(&self.buf)?;
256        self.buf.clear();
257        Ok(())
258    }
259
260    #[inline]
271    pub fn try_finish(&mut self) -> io::Result<()> {
272        loop {
273            self.dump()?;
274            let res = self.data.process_vec(&[], &mut self.buf, Action::Finish)?;
275
276            if self.buf.is_empty() && res == Status::MemNeeded {
283                let msg = "xz compressed stream is truncated or otherwise corrupt";
284                return Err(io::Error::new(io::ErrorKind::UnexpectedEof, msg));
285            }
286
287            if res == Status::StreamEnd {
288                break;
289            }
290        }
291        self.dump()
292    }
293
294    #[inline]
305    pub fn finish(mut self) -> io::Result<W> {
306        self.try_finish()?;
307        Ok(self.obj.take().unwrap())
308    }
309
310    #[inline]
316    pub fn total_out(&self) -> u64 {
317        self.data.total_out()
318    }
319
320    #[inline]
323    pub fn total_in(&self) -> u64 {
324        self.data.total_in()
325    }
326
327    #[inline]
330    pub fn auto_finish(self) -> AutoFinishXzDecoder<W> {
331        AutoFinishXzDecoder(self)
332    }
333}
334
335impl<W: Write> Write for XzDecoder<W> {
336    #[inline]
337    fn write(&mut self, data: &[u8]) -> io::Result<usize> {
338        loop {
339            self.dump()?;
340
341            let before = self.total_in();
342            let res = self.data.process_vec(data, &mut self.buf, Action::Run)?;
343            let written = (self.total_in() - before) as usize;
344
345            if written > 0 || data.is_empty() || res == Status::StreamEnd {
346                return Ok(written);
347            }
348        }
349    }
350
351    #[inline]
352    fn flush(&mut self) -> io::Result<()> {
353        self.dump()?;
354        self.obj.as_mut().unwrap().flush()
355    }
356}
357
358impl<W: Read + Write> Read for XzDecoder<W> {
359    #[inline]
360    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
361        self.get_mut().read(buf)
362    }
363}
364
365impl<W: Write> Drop for XzDecoder<W> {
366    #[inline]
367    fn drop(&mut self) {
368        if self.obj.is_some() {
369            let _ = self.try_finish();
370        }
371    }
372}
373
374#[cfg(test)]
375mod tests {
376    use super::*;
377    use crate::stream::LzmaOptions;
378    use quickcheck::quickcheck;
379    use std::iter::repeat;
380    #[cfg(all(target_family = "wasm", target_os = "unknown"))]
381    use wasm_bindgen_test::wasm_bindgen_test as test;
382
383    #[test]
384    fn smoke() {
385        let d = XzDecoder::new(Vec::new());
386        let mut c = XzEncoder::new(d, 6);
387        c.write_all(b"12834").unwrap();
388        let s = repeat("12345").take(100000).collect::<String>();
389        c.write_all(s.as_bytes()).unwrap();
390        let data = c.finish().unwrap().finish().unwrap();
391        assert_eq!(&data[0..5], b"12834");
392        assert_eq!(data.len(), 500005);
393        assert_eq!(format!("12834{}", s).as_bytes(), &*data);
394    }
395
396    #[test]
397    fn write_empty() {
398        let d = XzDecoder::new(Vec::new());
399        let mut c = XzEncoder::new(d, 6);
400        c.write(b"").unwrap();
401        let data = c.finish().unwrap().finish().unwrap();
402        assert_eq!(&data[..], b"");
403    }
404
405    #[test]
406    fn qc_lzma1() {
407        quickcheck(test as fn(_) -> _);
408
409        fn test(v: Vec<u8>) -> bool {
410            let stream = Stream::new_lzma_decoder(u64::MAX).unwrap();
411            let w = XzDecoder::new_stream(Vec::new(), stream);
412            let options = LzmaOptions::new_preset(6).unwrap();
413            let stream = Stream::new_lzma_encoder(&options).unwrap();
414            let mut w = XzEncoder::new_stream(w, stream);
415            w.write_all(&v).unwrap();
416            v == w.finish().unwrap().finish().unwrap()
417        }
418    }
419
420    #[test]
421    fn qc() {
422        quickcheck(test as fn(_) -> _);
423
424        fn test(v: Vec<u8>) -> bool {
425            let w = XzDecoder::new(Vec::new());
426            let mut w = XzEncoder::new(w, 6);
427            w.write_all(&v).unwrap();
428            v == w.finish().unwrap().finish().unwrap()
429        }
430    }
431
432    #[cfg(feature = "parallel")]
433    #[test]
434    fn qc_parallel_encode() {
435        quickcheck(test as fn(_) -> _);
436
437        fn test(v: Vec<u8>) -> bool {
438            let w = XzDecoder::new(Vec::new());
439            let mut w = XzEncoder::new_parallel(w, 6);
440            w.write_all(&v).unwrap();
441            v == w.finish().unwrap().finish().unwrap()
442        }
443    }
444
445    #[cfg(feature = "parallel")]
446    #[test]
447    fn qc_parallel_decode() {
448        quickcheck(test as fn(_) -> _);
449
450        fn test(v: Vec<u8>) -> bool {
451            let w = XzDecoder::new_parallel(Vec::new());
452            let mut w = XzEncoder::new(w, 6);
453            w.write_all(&v).unwrap();
454            v == w.finish().unwrap().finish().unwrap()
455        }
456    }
457}