1mod auto_finish;
4
5use std::io;
6use std::io::prelude::*;
7
8#[cfg(feature = "parallel")]
9use crate::stream::MtStreamBuilder;
10use crate::stream::{Action, Check, Status, Stream};
11pub use auto_finish::{AutoFinishXzDecoder, AutoFinishXzEncoder};
12
13pub struct XzEncoder<W: Write> {
18 data: Stream,
19 obj: Option<W>,
20 buf: Vec<u8>,
21}
22
23pub struct XzDecoder<W: Write> {
28 data: Stream,
29 obj: Option<W>,
30 buf: Vec<u8>,
31}
32
33impl<W: Write> XzEncoder<W> {
34 #[inline]
41 pub fn new(obj: W, level: u32) -> XzEncoder<W> {
42 let stream = Stream::new_easy_encoder(level, Check::Crc64).unwrap();
43 XzEncoder::new_stream(obj, stream)
44 }
45 #[cfg(feature = "parallel")]
52 pub fn new_parallel(obj: W, level: u32) -> XzEncoder<W> {
53 let stream = MtStreamBuilder::new()
54 .preset(level)
55 .check(Check::Crc64)
56 .threads(num_cpus::get() as u32)
57 .encoder()
58 .unwrap();
59 Self::new_stream(obj, stream)
60 }
61
62 #[inline]
65 pub fn new_stream(obj: W, stream: Stream) -> XzEncoder<W> {
66 XzEncoder {
67 data: stream,
68 obj: Some(obj),
69 buf: Vec::with_capacity(32 * 1024),
70 }
71 }
72
73 #[inline]
75 pub fn get_ref(&self) -> &W {
76 self.obj.as_ref().unwrap()
77 }
78
79 #[inline]
84 pub fn get_mut(&mut self) -> &mut W {
85 self.obj.as_mut().unwrap()
86 }
87
88 fn dump(&mut self) -> io::Result<()> {
89 self.obj.as_mut().unwrap().write_all(&self.buf)?;
90 self.buf.clear();
91 Ok(())
92 }
93
94 #[inline]
105 pub fn try_finish(&mut self) -> io::Result<()> {
106 loop {
107 self.dump()?;
108 let res = self.data.process_vec(&[], &mut self.buf, Action::Finish)?;
109 if res == Status::StreamEnd {
110 break;
111 }
112 }
113 self.dump()
114 }
115
116 #[inline]
127 pub fn finish(mut self) -> io::Result<W> {
128 self.try_finish()?;
129 Ok(self.obj.take().unwrap())
130 }
131
132 #[inline]
138 pub fn total_out(&self) -> u64 {
139 self.data.total_out()
140 }
141
142 #[inline]
145 pub fn total_in(&self) -> u64 {
146 self.data.total_in()
147 }
148
149 #[inline]
152 pub fn auto_finish(self) -> AutoFinishXzEncoder<W> {
153 AutoFinishXzEncoder(self)
154 }
155}
156
157impl<W: Write> Write for XzEncoder<W> {
158 #[inline]
159 fn write(&mut self, data: &[u8]) -> io::Result<usize> {
160 loop {
161 self.dump()?;
162
163 let total_in = self.total_in();
164 self.data.process_vec(data, &mut self.buf, Action::Run)?;
165 let written = (self.total_in() - total_in) as usize;
166
167 if written > 0 || data.is_empty() {
168 return Ok(written);
169 }
170 }
171 }
172
173 #[inline]
174 fn flush(&mut self) -> io::Result<()> {
175 loop {
176 self.dump()?;
177 let status = self
178 .data
179 .process_vec(&[], &mut self.buf, Action::FullFlush)?;
180 if status == Status::StreamEnd {
181 break;
182 }
183 }
184 self.obj.as_mut().unwrap().flush()
185 }
186}
187
188impl<W: Read + Write> Read for XzEncoder<W> {
189 #[inline]
190 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
191 self.get_mut().read(buf)
192 }
193}
194
195impl<W: Write> Drop for XzEncoder<W> {
196 #[inline]
197 fn drop(&mut self) {
198 if self.obj.is_some() {
199 let _ = self.try_finish();
200 }
201 }
202}
203
204impl<W: Write> XzDecoder<W> {
205 #[inline]
208 pub fn new(obj: W) -> XzDecoder<W> {
209 let stream = Stream::new_stream_decoder(u64::MAX, 0).unwrap();
210 XzDecoder::new_stream(obj, stream)
211 }
212
213 #[cfg(feature = "parallel")]
216 pub fn new_parallel(obj: W) -> Self {
217 let stream = MtStreamBuilder::new()
218 .memlimit_stop(u64::MAX)
219 .threads(num_cpus::get() as u32)
220 .decoder()
221 .unwrap();
222 Self::new_stream(obj, stream)
223 }
224
225 #[inline]
228 pub fn new_multi_decoder(obj: W) -> XzDecoder<W> {
229 let stream = Stream::new_stream_decoder(u64::MAX, liblzma_sys::LZMA_CONCATENATED).unwrap();
230 XzDecoder::new_stream(obj, stream)
231 }
232
233 #[inline]
239 pub fn new_stream(obj: W, stream: Stream) -> XzDecoder<W> {
240 XzDecoder {
241 data: stream,
242 obj: Some(obj),
243 buf: Vec::with_capacity(32 * 1024),
244 }
245 }
246
247 #[inline]
249 pub fn get_ref(&self) -> &W {
250 self.obj.as_ref().unwrap()
251 }
252
253 #[inline]
258 pub fn get_mut(&mut self) -> &mut W {
259 self.obj.as_mut().unwrap()
260 }
261
262 fn dump(&mut self) -> io::Result<()> {
263 self.obj.as_mut().unwrap().write_all(&self.buf)?;
264 self.buf.clear();
265 Ok(())
266 }
267
268 #[inline]
279 pub fn try_finish(&mut self) -> io::Result<()> {
280 loop {
281 self.dump()?;
282 let res = self.data.process_vec(&[], &mut self.buf, Action::Finish)?;
283
284 if self.buf.is_empty() && res == Status::MemNeeded {
291 let msg = "xz compressed stream is truncated or otherwise corrupt";
292 return Err(io::Error::new(io::ErrorKind::UnexpectedEof, msg));
293 }
294
295 if res == Status::StreamEnd {
296 break;
297 }
298 }
299 self.dump()
300 }
301
302 #[inline]
313 pub fn finish(mut self) -> io::Result<W> {
314 self.try_finish()?;
315 Ok(self.obj.take().unwrap())
316 }
317
318 #[inline]
324 pub fn total_out(&self) -> u64 {
325 self.data.total_out()
326 }
327
328 #[inline]
331 pub fn total_in(&self) -> u64 {
332 self.data.total_in()
333 }
334
335 #[inline]
338 pub fn auto_finish(self) -> AutoFinishXzDecoder<W> {
339 AutoFinishXzDecoder(self)
340 }
341}
342
343impl<W: Write> Write for XzDecoder<W> {
344 #[inline]
345 fn write(&mut self, data: &[u8]) -> io::Result<usize> {
346 loop {
347 self.dump()?;
348
349 let before = self.total_in();
350 let res = self.data.process_vec(data, &mut self.buf, Action::Run)?;
351 let written = (self.total_in() - before) as usize;
352
353 if written > 0 || data.is_empty() || res == Status::StreamEnd {
354 return Ok(written);
355 }
356 }
357 }
358
359 #[inline]
360 fn flush(&mut self) -> io::Result<()> {
361 self.dump()?;
362 self.obj.as_mut().unwrap().flush()
363 }
364}
365
366impl<W: Read + Write> Read for XzDecoder<W> {
367 #[inline]
368 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
369 self.get_mut().read(buf)
370 }
371}
372
373impl<W: Write> Drop for XzDecoder<W> {
374 #[inline]
375 fn drop(&mut self) {
376 if self.obj.is_some() {
377 let _ = self.try_finish();
378 }
379 }
380}
381
382#[cfg(test)]
383mod tests {
384 use super::*;
385 use crate::stream::{LzmaOptions, PRESET_EXTREME};
386 use quickcheck::quickcheck;
387 use std::iter::repeat;
388 #[cfg(all(target_family = "wasm", target_os = "unknown"))]
389 use wasm_bindgen_test::wasm_bindgen_test as test;
390
391 #[test]
392 fn smoke() {
393 let d = XzDecoder::new(Vec::new());
394 let mut c = XzEncoder::new(d, 6);
395 c.write_all(b"12834").unwrap();
396 let s = repeat("12345").take(100000).collect::<String>();
397 c.write_all(s.as_bytes()).unwrap();
398 let data = c.finish().unwrap().finish().unwrap();
399 assert_eq!(&data[0..5], b"12834");
400 assert_eq!(data.len(), 500005);
401 assert_eq!(format!("12834{}", s).as_bytes(), &*data);
402 }
403
404 #[test]
405 fn write_empty() {
406 let d = XzDecoder::new(Vec::new());
407 let mut c = XzEncoder::new(d, 6);
408 c.write(b"").unwrap();
409 let data = c.finish().unwrap().finish().unwrap();
410 assert_eq!(&data[..], b"");
411 }
412
413 #[test]
414 fn extreme_preset_round_trip() {
415 let d = XzDecoder::new(Vec::new());
416 let mut c = XzEncoder::new(d, 6 | PRESET_EXTREME);
417 let input = vec![11u8; 128 * 1024 + 1];
418 c.write_all(&input).unwrap();
419 let data = c.finish().unwrap().finish().unwrap();
420 assert_eq!(data, input);
421 }
422
423 #[test]
424 fn qc_lzma1() {
425 quickcheck(test as fn(_) -> _);
426
427 fn test(v: Vec<u8>) -> bool {
428 let stream = Stream::new_lzma_decoder(u64::MAX).unwrap();
429 let w = XzDecoder::new_stream(Vec::new(), stream);
430 let options = LzmaOptions::new_preset(6).unwrap();
431 let stream = Stream::new_lzma_encoder(&options).unwrap();
432 let mut w = XzEncoder::new_stream(w, stream);
433 w.write_all(&v).unwrap();
434 v == w.finish().unwrap().finish().unwrap()
435 }
436 }
437
438 #[test]
439 fn qc() {
440 quickcheck(test as fn(_) -> _);
441
442 fn test(v: Vec<u8>) -> bool {
443 let w = XzDecoder::new(Vec::new());
444 let mut w = XzEncoder::new(w, 6);
445 w.write_all(&v).unwrap();
446 v == w.finish().unwrap().finish().unwrap()
447 }
448 }
449
450 #[cfg(feature = "parallel")]
451 #[test]
452 fn qc_parallel_encode() {
453 quickcheck(test as fn(_) -> _);
454
455 fn test(v: Vec<u8>) -> bool {
456 let w = XzDecoder::new(Vec::new());
457 let mut w = XzEncoder::new_parallel(w, 6);
458 w.write_all(&v).unwrap();
459 v == w.finish().unwrap().finish().unwrap()
460 }
461 }
462
463 #[cfg(feature = "parallel")]
464 #[test]
465 fn qc_parallel_decode() {
466 quickcheck(test as fn(_) -> _);
467
468 fn test(v: Vec<u8>) -> bool {
469 let w = XzDecoder::new_parallel(Vec::new());
470 let mut w = XzEncoder::new(w, 6);
471 w.write_all(&v).unwrap();
472 v == w.finish().unwrap().finish().unwrap()
473 }
474 }
475}