1use crate::{AvroResult, Error, error::Details, types::Value};
20use strum::{EnumIter, EnumString, IntoStaticStr};
21
22#[derive(Clone, Copy, Eq, PartialEq, Debug)]
24pub struct DeflateSettings {
25 compression_level: miniz_oxide::deflate::CompressionLevel,
26}
27
28impl DeflateSettings {
29 pub fn new(compression_level: miniz_oxide::deflate::CompressionLevel) -> Self {
30 DeflateSettings { compression_level }
31 }
32
33 pub fn compression_level(&self) -> u8 {
36 self.compression_level as u8
37 }
38}
39
40impl Default for DeflateSettings {
41 fn default() -> Self {
43 Self::new(miniz_oxide::deflate::CompressionLevel::DefaultCompression)
44 }
45}
46
47#[derive(Clone, Copy, Debug, Eq, PartialEq, EnumIter, EnumString, IntoStaticStr)]
49#[strum(serialize_all = "kebab_case")]
50pub enum Codec {
51 Null,
53 Deflate(DeflateSettings),
57 #[cfg(feature = "snappy")]
58 Snappy,
62 #[cfg(feature = "zstandard")]
63 Zstandard(zstandard::ZstandardSettings),
65 #[cfg(feature = "bzip")]
66 Bzip2(bzip::Bzip2Settings),
69 #[cfg(feature = "xz")]
70 Xz(xz::XzSettings),
73}
74
75impl From<Codec> for Value {
76 fn from(value: Codec) -> Self {
77 Self::Bytes(<&str>::from(value).as_bytes().to_vec())
78 }
79}
80
81impl Codec {
82 pub fn compress(self, stream: &mut Vec<u8>) -> AvroResult<()> {
84 match self {
85 Codec::Null => (),
86 Codec::Deflate(settings) => {
87 let compressed =
88 miniz_oxide::deflate::compress_to_vec(stream, settings.compression_level());
89 *stream = compressed;
90 }
91 #[cfg(feature = "snappy")]
92 Codec::Snappy => {
93 let mut encoded: Vec<u8> = vec![0; snap::raw::max_compress_len(stream.len())];
94 let compressed_size = snap::raw::Encoder::new()
95 .compress(&stream[..], &mut encoded[..])
96 .map_err(Details::SnappyCompress)?;
97
98 let mut hasher = crc32fast::Hasher::new();
99 hasher.update(&stream[..]);
100 let checksum = hasher.finalize();
101 let checksum_as_bytes = checksum.to_be_bytes();
102 let checksum_len = checksum_as_bytes.len();
103 encoded.truncate(compressed_size + checksum_len);
104 encoded[compressed_size..].copy_from_slice(&checksum_as_bytes);
105
106 *stream = encoded;
107 }
108 #[cfg(feature = "zstandard")]
109 Codec::Zstandard(settings) => {
110 use std::io::Write;
111 let mut encoder = zstd::Encoder::new(Vec::new(), settings.compression_level as i32)
112 .map_err(Details::ZstdCompress)?;
113 encoder.write_all(stream).map_err(Details::ZstdCompress)?;
114 *stream = encoder.finish().map_err(Details::ZstdCompress)?;
115 }
116 #[cfg(feature = "bzip")]
117 Codec::Bzip2(settings) => {
118 use bzip2::read::BzEncoder;
119 use std::io::Read;
120
121 let mut encoder = BzEncoder::new(&stream[..], settings.compression());
122 let mut buffer = Vec::new();
123 encoder
124 .read_to_end(&mut buffer)
125 .unwrap_or_else(|_| unreachable!("No I/O errors possible with Vec<u8>"));
126 *stream = buffer;
127 }
128 #[cfg(feature = "xz")]
129 Codec::Xz(settings) => {
130 use liblzma::read::XzEncoder;
131 use std::io::Read;
132
133 let mut encoder = XzEncoder::new(&stream[..], settings.compression_level as u32);
134 let mut buffer = Vec::new();
135 encoder
136 .read_to_end(&mut buffer)
137 .unwrap_or_else(|_| unreachable!("No I/O errors possible with Vec<u8>"));
138 *stream = buffer;
139 }
140 };
141
142 Ok(())
143 }
144
145 pub fn decompress(self, stream: &mut Vec<u8>) -> AvroResult<()> {
147 *stream = match self {
148 Codec::Null => return Ok(()),
149 Codec::Deflate(_settings) => miniz_oxide::inflate::decompress_to_vec(stream).map_err(|e| {
150 let err = {
151 use miniz_oxide::inflate::TINFLStatus::{FailedCannotMakeProgress, BadParam, Adler32Mismatch, Failed, Done, NeedsMoreInput, HasMoreOutput};
152 use std::io::{Error,ErrorKind};
153 match e.status {
154 FailedCannotMakeProgress => Error::from(ErrorKind::UnexpectedEof),
155 BadParam => Error::other("Unexpected error: miniz_oxide reported invalid output buffer size. Please report this to avro-rs developers."), Adler32Mismatch => Error::from(ErrorKind::InvalidData),
157 Failed => Error::from(ErrorKind::InvalidData),
158 Done => Error::other("Unexpected error: miniz_oxide reported an error with a success status. Please report this to avro-rs developers."),
159 NeedsMoreInput => Error::from(ErrorKind::UnexpectedEof),
160 HasMoreOutput => Error::other("Unexpected error: miniz_oxide has more data than the output buffer can hold. Please report this to avro-rs developers."), other => Error::other(format!("Unexpected error: {other:?}"))
162 }
163 };
164 Error::new(Details::DeflateDecompress(err))
165 })?,
166 #[cfg(feature = "snappy")]
167 Codec::Snappy => {
168 let decompressed_size = snap::raw::decompress_len(&stream[..stream.len() - 4])
169 .map_err(Details::GetSnappyDecompressLen)?;
170 let mut decoded = vec![0; decompressed_size];
171 snap::raw::Decoder::new()
172 .decompress(&stream[..stream.len() - 4], &mut decoded[..])
173 .map_err(Details::SnappyDecompress)?;
174
175 let mut last_four: [u8; 4] = [0; 4];
176 last_four.copy_from_slice(&stream[(stream.len() - 4)..]);
177 let expected: u32 = u32::from_be_bytes(last_four);
178
179 let mut hasher = crc32fast::Hasher::new();
180 hasher.update(&decoded);
181 let actual = hasher.finalize();
182
183 if expected != actual {
184 return Err(Details::SnappyCrc32{expected, actual}.into());
185 }
186 decoded
187 }
188 #[cfg(feature = "zstandard")]
189 Codec::Zstandard(_settings) => {
190 use std::io::BufReader;
191 use zstd::zstd_safe;
192
193 let mut decoded = Vec::new();
194 let buffer_size = zstd_safe::DCtx::in_size();
195 let buffer = BufReader::with_capacity(buffer_size, &stream[..]);
196 let mut decoder = zstd::Decoder::new(buffer).map_err(Details::ZstdDecompress)?;
197 std::io::copy(&mut decoder, &mut decoded).map_err(Details::ZstdDecompress)?;
198 decoded
199 }
200 #[cfg(feature = "bzip")]
201 Codec::Bzip2(_) => {
202 use bzip2::read::BzDecoder;
203 use std::io::Read;
204
205 let mut decoder = BzDecoder::new(&stream[..]);
206 let mut decoded = Vec::new();
207 decoder.read_to_end(&mut decoded).unwrap_or_else(|_| unreachable!("No I/O errors possible with Vec<u8>"));
208 decoded
209 }
210 #[cfg(feature = "xz")]
211 Codec::Xz(_) => {
212 use liblzma::read::XzDecoder;
213 use std::io::Read;
214
215 let mut decoder = XzDecoder::new(&stream[..]);
216 let mut decoded: Vec<u8> = Vec::new();
217 decoder.read_to_end(&mut decoded).unwrap_or_else(|_| unreachable!("No I/O errors possible with Vec<u8>"));
218 decoded
219 }
220 };
221 Ok(())
222 }
223}
224
225#[cfg(feature = "bzip")]
226pub mod bzip {
227 use bzip2::Compression;
228
229 #[derive(Clone, Copy, Eq, PartialEq, Debug)]
230 pub struct Bzip2Settings {
231 pub compression_level: u8,
232 }
233
234 impl Bzip2Settings {
235 pub fn new(compression_level: u8) -> Self {
236 Self { compression_level }
237 }
238
239 pub(crate) fn compression(&self) -> Compression {
240 Compression::new(self.compression_level as u32)
241 }
242 }
243
244 impl Default for Bzip2Settings {
245 fn default() -> Self {
246 Bzip2Settings::new(Compression::best().level() as u8)
247 }
248 }
249}
250
251#[cfg(feature = "zstandard")]
252pub mod zstandard {
253 #[derive(Clone, Copy, Eq, PartialEq, Debug)]
254 pub struct ZstandardSettings {
255 pub compression_level: u8,
256 }
257
258 impl ZstandardSettings {
259 pub fn new(compression_level: u8) -> Self {
260 Self { compression_level }
261 }
262 }
263
264 impl Default for ZstandardSettings {
265 fn default() -> Self {
266 Self::new(0)
267 }
268 }
269}
270
271#[cfg(feature = "xz")]
272pub mod xz {
273 #[derive(Clone, Copy, Eq, PartialEq, Debug)]
274 pub struct XzSettings {
275 pub compression_level: u8,
276 }
277
278 impl XzSettings {
279 pub fn new(compression_level: u8) -> Self {
280 Self { compression_level }
281 }
282 }
283
284 impl Default for XzSettings {
285 fn default() -> Self {
286 XzSettings::new(9)
287 }
288 }
289}
290
291#[cfg(test)]
292mod tests {
293 use super::*;
294 use apache_avro_test_helper::TestResult;
295 use miniz_oxide::deflate::CompressionLevel;
296 use pretty_assertions::{assert_eq, assert_ne};
297
298 const INPUT: &[u8] = b"theanswertolifetheuniverseandeverythingis42theanswertolifetheuniverseandeverythingis4theanswertolifetheuniverseandeverythingis2";
299
300 #[test]
301 fn null_compress_and_decompress() -> TestResult {
302 let codec = Codec::Null;
303 let mut stream = INPUT.to_vec();
304 codec.compress(&mut stream)?;
305 assert_eq!(INPUT, stream.as_slice());
306 codec.decompress(&mut stream)?;
307 assert_eq!(INPUT, stream.as_slice());
308 Ok(())
309 }
310
311 #[test]
312 fn deflate_compress_and_decompress() -> TestResult {
313 compress_and_decompress(Codec::Deflate(DeflateSettings::new(
314 CompressionLevel::BestCompression,
315 )))
316 }
317
318 #[cfg(feature = "snappy")]
319 #[test]
320 fn snappy_compress_and_decompress() -> TestResult {
321 compress_and_decompress(Codec::Snappy)
322 }
323
324 #[cfg(feature = "zstandard")]
325 #[test]
326 fn zstd_compress_and_decompress() -> TestResult {
327 compress_and_decompress(Codec::Zstandard(zstandard::ZstandardSettings::default()))
328 }
329
330 #[cfg(feature = "bzip")]
331 #[test]
332 fn bzip_compress_and_decompress() -> TestResult {
333 compress_and_decompress(Codec::Bzip2(bzip::Bzip2Settings::default()))
334 }
335
336 #[cfg(feature = "xz")]
337 #[test]
338 fn xz_compress_and_decompress() -> TestResult {
339 compress_and_decompress(Codec::Xz(xz::XzSettings::default()))
340 }
341
342 fn compress_and_decompress(codec: Codec) -> TestResult {
343 let mut stream = INPUT.to_vec();
344 codec.compress(&mut stream)?;
345 assert_ne!(INPUT, stream.as_slice());
346 assert!(INPUT.len() > stream.len());
347 codec.decompress(&mut stream)?;
348 assert_eq!(INPUT, stream.as_slice());
349 Ok(())
350 }
351
352 #[test]
353 fn codec_to_str() {
354 assert_eq!(<&str>::from(Codec::Null), "null");
355 assert_eq!(
356 <&str>::from(Codec::Deflate(DeflateSettings::default())),
357 "deflate"
358 );
359
360 #[cfg(feature = "snappy")]
361 assert_eq!(<&str>::from(Codec::Snappy), "snappy");
362
363 #[cfg(feature = "zstandard")]
364 assert_eq!(
365 <&str>::from(Codec::Zstandard(zstandard::ZstandardSettings::default())),
366 "zstandard"
367 );
368
369 #[cfg(feature = "bzip")]
370 assert_eq!(
371 <&str>::from(Codec::Bzip2(bzip::Bzip2Settings::default())),
372 "bzip2"
373 );
374
375 #[cfg(feature = "xz")]
376 assert_eq!(<&str>::from(Codec::Xz(xz::XzSettings::default())), "xz");
377 }
378
379 #[test]
380 fn codec_from_str() {
381 use std::str::FromStr;
382
383 assert_eq!(Codec::from_str("null").unwrap(), Codec::Null);
384 assert_eq!(
385 Codec::from_str("deflate").unwrap(),
386 Codec::Deflate(DeflateSettings::default())
387 );
388
389 #[cfg(feature = "snappy")]
390 assert_eq!(Codec::from_str("snappy").unwrap(), Codec::Snappy);
391
392 #[cfg(feature = "zstandard")]
393 assert_eq!(
394 Codec::from_str("zstandard").unwrap(),
395 Codec::Zstandard(zstandard::ZstandardSettings::default())
396 );
397
398 #[cfg(feature = "bzip")]
399 assert_eq!(
400 Codec::from_str("bzip2").unwrap(),
401 Codec::Bzip2(bzip::Bzip2Settings::default())
402 );
403
404 #[cfg(feature = "xz")]
405 assert_eq!(
406 Codec::from_str("xz").unwrap(),
407 Codec::Xz(xz::XzSettings::default())
408 );
409
410 assert!(Codec::from_str("not a codec").is_err());
411 }
412}