1use std::io;
4use std::io::prelude::*;
5
6#[cfg(feature = "parallel")]
7use crate::stream::MtStreamBuilder;
8use crate::stream::{Action, Check, Status, Stream};
9
10pub struct XzEncoder<R> {
15 obj: R,
16 data: Stream,
17}
18
19pub struct XzDecoder<R> {
24 obj: R,
25 data: Stream,
26}
27
28impl<R: BufRead> XzEncoder<R> {
29 #[inline]
34 pub fn new(r: R, level: u32) -> XzEncoder<R> {
35 let stream = Stream::new_easy_encoder(level, Check::Crc64).unwrap();
36 XzEncoder::new_stream(r, stream)
37 }
38
39 #[cfg(feature = "parallel")]
44 pub fn new_parallel(r: R, level: u32) -> XzEncoder<R> {
45 let stream = MtStreamBuilder::new()
46 .preset(level)
47 .check(Check::Crc64)
48 .threads(num_cpus::get() as u32)
49 .encoder()
50 .unwrap();
51 Self::new_stream(r, stream)
52 }
53
54 #[inline]
59 pub fn new_stream(r: R, stream: Stream) -> XzEncoder<R> {
60 XzEncoder {
61 obj: r,
62 data: stream,
63 }
64 }
65}
66
67impl<R> XzEncoder<R> {
68 #[inline]
70 pub fn get_ref(&self) -> &R {
71 &self.obj
72 }
73
74 #[inline]
79 pub fn get_mut(&mut self) -> &mut R {
80 &mut self.obj
81 }
82
83 #[inline]
85 pub fn into_inner(self) -> R {
86 self.obj
87 }
88
89 #[inline]
99 pub fn total_out(&self) -> u64 {
100 self.data.total_out()
101 }
102
103 #[inline]
106 pub fn total_in(&self) -> u64 {
107 self.data.total_in()
108 }
109}
110
111impl<R: BufRead> Read for XzEncoder<R> {
112 #[inline]
113 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
114 if buf.is_empty() {
115 return Ok(0);
116 }
117
118 loop {
119 let (read, consumed, eof, ret);
120 {
121 let input = self.obj.fill_buf()?;
122 eof = input.is_empty();
123 let before_out = self.data.total_out();
124 let before_in = self.data.total_in();
125 let action = if eof { Action::Finish } else { Action::Run };
126 ret = self.data.process(input, buf, action);
127 read = (self.data.total_out() - before_out) as usize;
128 consumed = (self.data.total_in() - before_in) as usize;
129 };
130 self.obj.consume(consumed);
131
132 ret?;
133
134 if read == 0 && !eof {
138 continue;
139 }
140 return Ok(read);
141 }
142 }
143}
144
145impl<W: Write> Write for XzEncoder<W> {
146 #[inline]
147 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
148 self.get_mut().write(buf)
149 }
150
151 #[inline]
152 fn flush(&mut self) -> io::Result<()> {
153 self.get_mut().flush()
154 }
155}
156
157impl<R: BufRead> XzDecoder<R> {
158 #[inline]
161 pub fn new(r: R) -> XzDecoder<R> {
162 let stream = Stream::new_stream_decoder(u64::MAX, 0).unwrap();
163 XzDecoder::new_stream(r, stream)
164 }
165
166 #[cfg(feature = "parallel")]
169 pub fn new_parallel(r: R) -> Self {
170 let stream = MtStreamBuilder::new()
171 .memlimit_stop(u64::MAX)
172 .threads(num_cpus::get() as u32)
173 .decoder()
174 .unwrap();
175 Self::new_stream(r, stream)
176 }
177
178 #[inline]
181 pub fn new_multi_decoder(r: R) -> XzDecoder<R> {
182 let stream = Stream::new_auto_decoder(u64::MAX, liblzma_sys::LZMA_CONCATENATED).unwrap();
183 XzDecoder::new_stream(r, stream)
184 }
185
186 #[inline]
191 pub fn new_stream(r: R, stream: Stream) -> XzDecoder<R> {
192 XzDecoder {
193 obj: r,
194 data: stream,
195 }
196 }
197}
198
199impl<R> XzDecoder<R> {
200 #[inline]
202 pub fn get_ref(&self) -> &R {
203 &self.obj
204 }
205
206 #[inline]
211 pub fn get_mut(&mut self) -> &mut R {
212 &mut self.obj
213 }
214
215 #[inline]
217 pub fn into_inner(self) -> R {
218 self.obj
219 }
220
221 #[inline]
226 pub fn total_in(&self) -> u64 {
227 self.data.total_in()
228 }
229
230 #[inline]
232 pub fn total_out(&self) -> u64 {
233 self.data.total_out()
234 }
235}
236
237impl<R: BufRead> Read for XzDecoder<R> {
238 #[inline]
239 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
240 if buf.is_empty() {
241 return Ok(0);
242 }
243
244 loop {
245 let (read, consumed, eof, ret);
246 {
247 let input = self.obj.fill_buf()?;
248 eof = input.is_empty();
249 let before_out = self.data.total_out();
250 let before_in = self.data.total_in();
251 ret = self
252 .data
253 .process(input, buf, if eof { Action::Finish } else { Action::Run });
254 read = (self.data.total_out() - before_out) as usize;
255 consumed = (self.data.total_in() - before_in) as usize;
256 }
257 self.obj.consume(consumed);
258
259 let status = ret?;
260 if read > 0 || eof || status == Status::StreamEnd {
261 if read == 0 && status != Status::StreamEnd {
262 return Err(io::Error::new(
263 io::ErrorKind::UnexpectedEof,
264 "premature eof",
265 ));
266 }
267 return Ok(read);
268 }
269 if consumed == 0 {
270 return Err(io::Error::new(
271 io::ErrorKind::InvalidData,
272 "corrupt xz stream",
273 ));
274 }
275 }
276 }
277}
278
279impl<W: Write> Write for XzDecoder<W> {
280 #[inline]
281 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
282 self.get_mut().write(buf)
283 }
284
285 #[inline]
286 fn flush(&mut self) -> io::Result<()> {
287 self.get_mut().flush()
288 }
289}
290
291#[cfg(test)]
292mod tests {
293 use super::*;
294 #[cfg(all(target_family = "wasm", target_os = "unknown"))]
295 use wasm_bindgen_test::wasm_bindgen_test as test;
296
297 #[test]
298 fn compressed_and_trailing_data() {
299 let mut to_compress: Vec<u8> = Vec::new();
301 const COMPRESSED_ORIG_SIZE: usize = 1024;
302 for num in 0..COMPRESSED_ORIG_SIZE {
303 to_compress.push(num as u8)
304 }
305 let mut encoder = XzEncoder::new(&to_compress[..], 6);
306
307 let mut decoder_input = Vec::new();
308 encoder.read_to_end(&mut decoder_input).unwrap();
309
310 assert_eq!(encoder.total_in(), to_compress.len() as u64);
311 assert_eq!(encoder.total_out(), decoder_input.len() as u64);
312
313 const ADDITIONAL_SIZE: usize = 123;
315 let mut additional_data = Vec::new();
316 for num in 0..ADDITIONAL_SIZE {
317 additional_data.push(((25 + num) % 256) as u8)
318 }
319 decoder_input.extend(&additional_data);
320
321 let mut decoder_reader = &decoder_input[..];
323 {
324 let mut decoder = XzDecoder::new(&mut decoder_reader);
325 let mut decompressed_data = vec![0u8; to_compress.len()];
326
327 assert_eq!(
328 decoder.read(&mut decompressed_data).unwrap(),
329 COMPRESSED_ORIG_SIZE
330 );
331 assert_eq!(decompressed_data, &to_compress[..]);
332 assert_eq!(
333 decoder.total_in(),
334 (decoder_input.len() - ADDITIONAL_SIZE) as u64
335 );
336 assert_eq!(decoder.total_out(), decompressed_data.len() as u64);
337 }
338
339 let mut remaining_data = Vec::new();
340 let nb_read = decoder_reader.read_to_end(&mut remaining_data).unwrap();
341 assert_eq!(nb_read, ADDITIONAL_SIZE);
342 assert_eq!(remaining_data, &additional_data[..]);
343 }
344}