1use std::io;
4use std::io::prelude::*;
5
6#[cfg(feature = "parallel")]
7use crate::stream::MtStreamBuilder;
8use crate::stream::{Action, Check, Status, Stream};
9
10pub struct XzEncoder<R> {
15 obj: R,
16 data: Stream,
17}
18
19pub struct XzDecoder<R> {
24 obj: R,
25 data: Stream,
26}
27
28impl<R: BufRead> XzEncoder<R> {
29 #[inline]
36 pub fn new(r: R, level: u32) -> XzEncoder<R> {
37 let stream = Stream::new_easy_encoder(level, Check::Crc64).unwrap();
38 XzEncoder::new_stream(r, stream)
39 }
40
41 #[cfg(feature = "parallel")]
48 pub fn new_parallel(r: R, level: u32) -> XzEncoder<R> {
49 let stream = MtStreamBuilder::new()
50 .preset(level)
51 .check(Check::Crc64)
52 .threads(num_cpus::get() as u32)
53 .encoder()
54 .unwrap();
55 Self::new_stream(r, stream)
56 }
57
58 #[inline]
63 pub fn new_stream(r: R, stream: Stream) -> XzEncoder<R> {
64 XzEncoder {
65 obj: r,
66 data: stream,
67 }
68 }
69}
70
71impl<R> XzEncoder<R> {
72 #[inline]
74 pub fn get_ref(&self) -> &R {
75 &self.obj
76 }
77
78 #[inline]
83 pub fn get_mut(&mut self) -> &mut R {
84 &mut self.obj
85 }
86
87 #[inline]
89 pub fn into_inner(self) -> R {
90 self.obj
91 }
92
93 #[inline]
103 pub fn total_out(&self) -> u64 {
104 self.data.total_out()
105 }
106
107 #[inline]
110 pub fn total_in(&self) -> u64 {
111 self.data.total_in()
112 }
113}
114
115impl<R: BufRead> Read for XzEncoder<R> {
116 #[inline]
117 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
118 if buf.is_empty() {
119 return Ok(0);
120 }
121
122 loop {
123 let (read, consumed, eof, ret);
124 {
125 let input = self.obj.fill_buf()?;
126 eof = input.is_empty();
127 let before_out = self.data.total_out();
128 let before_in = self.data.total_in();
129 let action = if eof { Action::Finish } else { Action::Run };
130 ret = self.data.process(input, buf, action);
131 read = (self.data.total_out() - before_out) as usize;
132 consumed = (self.data.total_in() - before_in) as usize;
133 };
134 self.obj.consume(consumed);
135
136 ret?;
137
138 if read == 0 && !eof {
142 continue;
143 }
144 return Ok(read);
145 }
146 }
147}
148
149impl<W: Write> Write for XzEncoder<W> {
150 #[inline]
151 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
152 self.get_mut().write(buf)
153 }
154
155 #[inline]
156 fn flush(&mut self) -> io::Result<()> {
157 self.get_mut().flush()
158 }
159}
160
161impl<R: BufRead> XzDecoder<R> {
162 #[inline]
165 pub fn new(r: R) -> XzDecoder<R> {
166 let stream = Stream::new_stream_decoder(u64::MAX, 0).unwrap();
167 XzDecoder::new_stream(r, stream)
168 }
169
170 #[cfg(feature = "parallel")]
173 pub fn new_parallel(r: R) -> Self {
174 let stream = MtStreamBuilder::new()
175 .memlimit_stop(u64::MAX)
176 .threads(num_cpus::get() as u32)
177 .decoder()
178 .unwrap();
179 Self::new_stream(r, stream)
180 }
181
182 #[inline]
185 pub fn new_multi_decoder(r: R) -> XzDecoder<R> {
186 let stream = Stream::new_auto_decoder(u64::MAX, liblzma_sys::LZMA_CONCATENATED).unwrap();
187 XzDecoder::new_stream(r, stream)
188 }
189
190 #[inline]
195 pub fn new_stream(r: R, stream: Stream) -> XzDecoder<R> {
196 XzDecoder {
197 obj: r,
198 data: stream,
199 }
200 }
201}
202
203impl<R> XzDecoder<R> {
204 #[inline]
206 pub fn get_ref(&self) -> &R {
207 &self.obj
208 }
209
210 #[inline]
215 pub fn get_mut(&mut self) -> &mut R {
216 &mut self.obj
217 }
218
219 #[inline]
221 pub fn into_inner(self) -> R {
222 self.obj
223 }
224
225 #[inline]
230 pub fn total_in(&self) -> u64 {
231 self.data.total_in()
232 }
233
234 #[inline]
236 pub fn total_out(&self) -> u64 {
237 self.data.total_out()
238 }
239}
240
241impl<R: BufRead> Read for XzDecoder<R> {
242 #[inline]
243 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
244 if buf.is_empty() {
245 return Ok(0);
246 }
247
248 loop {
249 let (read, consumed, eof, ret);
250 {
251 let input = self.obj.fill_buf()?;
252 eof = input.is_empty();
253 let before_out = self.data.total_out();
254 let before_in = self.data.total_in();
255 ret = self
256 .data
257 .process(input, buf, if eof { Action::Finish } else { Action::Run });
258 read = (self.data.total_out() - before_out) as usize;
259 consumed = (self.data.total_in() - before_in) as usize;
260 }
261 self.obj.consume(consumed);
262
263 let status = ret?;
264 if read > 0 || eof || status == Status::StreamEnd {
265 if read == 0 && status != Status::StreamEnd {
266 return Err(io::Error::new(
267 io::ErrorKind::UnexpectedEof,
268 "premature eof",
269 ));
270 }
271 return Ok(read);
272 }
273 if consumed == 0 {
274 return Err(io::Error::new(
275 io::ErrorKind::InvalidData,
276 "corrupt xz stream",
277 ));
278 }
279 }
280 }
281}
282
283impl<W: Write> Write for XzDecoder<W> {
284 #[inline]
285 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
286 self.get_mut().write(buf)
287 }
288
289 #[inline]
290 fn flush(&mut self) -> io::Result<()> {
291 self.get_mut().flush()
292 }
293}
294
295#[cfg(test)]
296mod tests {
297 use super::*;
298 #[cfg(all(target_family = "wasm", target_os = "unknown"))]
299 use wasm_bindgen_test::wasm_bindgen_test as test;
300
301 #[test]
302 fn compressed_and_trailing_data() {
303 let mut to_compress: Vec<u8> = Vec::new();
305 const COMPRESSED_ORIG_SIZE: usize = 1024;
306 for num in 0..COMPRESSED_ORIG_SIZE {
307 to_compress.push(num as u8)
308 }
309 let mut encoder = XzEncoder::new(&to_compress[..], 6);
310
311 let mut decoder_input = Vec::new();
312 encoder.read_to_end(&mut decoder_input).unwrap();
313
314 assert_eq!(encoder.total_in(), to_compress.len() as u64);
315 assert_eq!(encoder.total_out(), decoder_input.len() as u64);
316
317 const ADDITIONAL_SIZE: usize = 123;
319 let mut additional_data = Vec::new();
320 for num in 0..ADDITIONAL_SIZE {
321 additional_data.push(((25 + num) % 256) as u8)
322 }
323 decoder_input.extend(&additional_data);
324
325 let mut decoder_reader = &decoder_input[..];
327 {
328 let mut decoder = XzDecoder::new(&mut decoder_reader);
329 let mut decompressed_data = vec![0u8; to_compress.len()];
330
331 assert_eq!(
332 decoder.read(&mut decompressed_data).unwrap(),
333 COMPRESSED_ORIG_SIZE
334 );
335 assert_eq!(decompressed_data, &to_compress[..]);
336 assert_eq!(
337 decoder.total_in(),
338 (decoder_input.len() - ADDITIONAL_SIZE) as u64
339 );
340 assert_eq!(decoder.total_out(), decompressed_data.len() as u64);
341 }
342
343 let mut remaining_data = Vec::new();
344 let nb_read = decoder_reader.read_to_end(&mut remaining_data).unwrap();
345 assert_eq!(nb_read, ADDITIONAL_SIZE);
346 assert_eq!(remaining_data, &additional_data[..]);
347 }
348}