1use crate::{AvroResult, Error, error::Details, types::Value};
20use strum_macros::{EnumIter, EnumString, IntoStaticStr};
21
22#[derive(Clone, Copy, Eq, PartialEq, Debug)]
24pub struct DeflateSettings {
25 compression_level: miniz_oxide::deflate::CompressionLevel,
26}
27
28impl DeflateSettings {
29 pub fn new(compression_level: miniz_oxide::deflate::CompressionLevel) -> Self {
30 DeflateSettings { compression_level }
31 }
32
33 pub fn compression_level(&self) -> u8 {
36 self.compression_level as u8
37 }
38}
39
40impl Default for DeflateSettings {
41 fn default() -> Self {
43 Self::new(miniz_oxide::deflate::CompressionLevel::DefaultCompression)
44 }
45}
46
47#[derive(Clone, Copy, Debug, Eq, PartialEq, EnumIter, EnumString, IntoStaticStr)]
49#[strum(serialize_all = "kebab_case")]
50pub enum Codec {
51 Null,
53 Deflate(DeflateSettings),
57 #[cfg(feature = "snappy")]
58 Snappy,
62 #[cfg(feature = "zstandard")]
63 Zstandard(zstandard::ZstandardSettings),
65 #[cfg(feature = "bzip")]
66 Bzip2(bzip::Bzip2Settings),
69 #[cfg(feature = "xz")]
70 Xz(xz::XzSettings),
73}
74
75impl From<Codec> for Value {
76 fn from(value: Codec) -> Self {
77 Self::Bytes(<&str>::from(value).as_bytes().to_vec())
78 }
79}
80
81impl Codec {
82 pub fn compress(self, stream: &mut Vec<u8>) -> AvroResult<()> {
84 match self {
85 Codec::Null => (),
86 Codec::Deflate(settings) => {
87 let compressed =
88 miniz_oxide::deflate::compress_to_vec(stream, settings.compression_level());
89 *stream = compressed;
90 }
91 #[cfg(feature = "snappy")]
92 Codec::Snappy => {
93 let mut encoded: Vec<u8> = vec![0; snap::raw::max_compress_len(stream.len())];
94 let compressed_size = snap::raw::Encoder::new()
95 .compress(&stream[..], &mut encoded[..])
96 .map_err(Details::SnappyCompress)?;
97
98 let mut hasher = crc32fast::Hasher::new();
99 hasher.update(&stream[..]);
100 let checksum = hasher.finalize();
101 let checksum_as_bytes = checksum.to_be_bytes();
102 let checksum_len = checksum_as_bytes.len();
103 encoded.truncate(compressed_size + checksum_len);
104 encoded[compressed_size..].copy_from_slice(&checksum_as_bytes);
105
106 *stream = encoded;
107 }
108 #[cfg(feature = "zstandard")]
109 Codec::Zstandard(settings) => {
110 use std::io::Write;
111 let mut encoder =
112 zstd::Encoder::new(Vec::new(), settings.compression_level as i32).unwrap();
113 encoder.write_all(stream).map_err(Details::ZstdCompress)?;
114 *stream = encoder.finish().unwrap();
115 }
116 #[cfg(feature = "bzip")]
117 Codec::Bzip2(settings) => {
118 use bzip2::read::BzEncoder;
119 use std::io::Read;
120
121 let mut encoder = BzEncoder::new(&stream[..], settings.compression());
122 let mut buffer = Vec::new();
123 encoder.read_to_end(&mut buffer).unwrap();
124 *stream = buffer;
125 }
126 #[cfg(feature = "xz")]
127 Codec::Xz(settings) => {
128 use liblzma::read::XzEncoder;
129 use std::io::Read;
130
131 let mut encoder = XzEncoder::new(&stream[..], settings.compression_level as u32);
132 let mut buffer = Vec::new();
133 encoder.read_to_end(&mut buffer).unwrap();
134 *stream = buffer;
135 }
136 };
137
138 Ok(())
139 }
140
141 pub fn decompress(self, stream: &mut Vec<u8>) -> AvroResult<()> {
143 *stream = match self {
144 Codec::Null => return Ok(()),
145 Codec::Deflate(_settings) => miniz_oxide::inflate::decompress_to_vec(stream).map_err(|e| {
146 let err = {
147 use miniz_oxide::inflate::TINFLStatus::*;
148 use std::io::{Error,ErrorKind};
149 match e.status {
150 FailedCannotMakeProgress => Error::from(ErrorKind::UnexpectedEof),
151 BadParam => Error::other("Unexpected error: miniz_oxide reported invalid output buffer size. Please report this to avro-rs developers."), Adler32Mismatch => Error::from(ErrorKind::InvalidData),
153 Failed => Error::from(ErrorKind::InvalidData),
154 Done => Error::other("Unexpected error: miniz_oxide reported an error with a success status. Please report this to avro-rs developers."),
155 NeedsMoreInput => Error::from(ErrorKind::UnexpectedEof),
156 HasMoreOutput => Error::other("Unexpected error: miniz_oxide has more data than the output buffer can hold. Please report this to avro-rs developers."), other => Error::other(format!("Unexpected error: {other:?}"))
158 }
159 };
160 Error::new(Details::DeflateDecompress(err))
161 })?,
162 #[cfg(feature = "snappy")]
163 Codec::Snappy => {
164 let decompressed_size = snap::raw::decompress_len(&stream[..stream.len() - 4])
165 .map_err(Details::GetSnappyDecompressLen)?;
166 let mut decoded = vec![0; decompressed_size];
167 snap::raw::Decoder::new()
168 .decompress(&stream[..stream.len() - 4], &mut decoded[..])
169 .map_err(Details::SnappyDecompress)?;
170
171 let mut last_four: [u8; 4] = [0; 4];
172 last_four.copy_from_slice(&stream[(stream.len() - 4)..]);
173 let expected: u32 = u32::from_be_bytes(last_four);
174
175 let mut hasher = crc32fast::Hasher::new();
176 hasher.update(&decoded);
177 let actual = hasher.finalize();
178
179 if expected != actual {
180 return Err(Details::SnappyCrc32{expected, actual}.into());
181 }
182 decoded
183 }
184 #[cfg(feature = "zstandard")]
185 Codec::Zstandard(_settings) => {
186 use std::io::BufReader;
187 use zstd::zstd_safe;
188
189 let mut decoded = Vec::new();
190 let buffer_size = zstd_safe::DCtx::in_size();
191 let buffer = BufReader::with_capacity(buffer_size, &stream[..]);
192 let mut decoder = zstd::Decoder::new(buffer).unwrap();
193 std::io::copy(&mut decoder, &mut decoded).map_err(Details::ZstdDecompress)?;
194 decoded
195 }
196 #[cfg(feature = "bzip")]
197 Codec::Bzip2(_) => {
198 use bzip2::read::BzDecoder;
199 use std::io::Read;
200
201 let mut decoder = BzDecoder::new(&stream[..]);
202 let mut decoded = Vec::new();
203 decoder.read_to_end(&mut decoded).unwrap();
204 decoded
205 }
206 #[cfg(feature = "xz")]
207 Codec::Xz(_) => {
208 use liblzma::read::XzDecoder;
209 use std::io::Read;
210
211 let mut decoder = XzDecoder::new(&stream[..]);
212 let mut decoded: Vec<u8> = Vec::new();
213 decoder.read_to_end(&mut decoded).unwrap();
214 decoded
215 }
216 };
217 Ok(())
218 }
219}
220
221#[cfg(feature = "bzip")]
222pub mod bzip {
223 use bzip2::Compression;
224
225 #[derive(Clone, Copy, Eq, PartialEq, Debug)]
226 pub struct Bzip2Settings {
227 pub compression_level: u8,
228 }
229
230 impl Bzip2Settings {
231 pub fn new(compression_level: u8) -> Self {
232 Self { compression_level }
233 }
234
235 pub(crate) fn compression(&self) -> Compression {
236 Compression::new(self.compression_level as u32)
237 }
238 }
239
240 impl Default for Bzip2Settings {
241 fn default() -> Self {
242 Bzip2Settings::new(Compression::best().level() as u8)
243 }
244 }
245}
246
247#[cfg(feature = "zstandard")]
248pub mod zstandard {
249 #[derive(Clone, Copy, Eq, PartialEq, Debug)]
250 pub struct ZstandardSettings {
251 pub compression_level: u8,
252 }
253
254 impl ZstandardSettings {
255 pub fn new(compression_level: u8) -> Self {
256 Self { compression_level }
257 }
258 }
259
260 impl Default for ZstandardSettings {
261 fn default() -> Self {
262 Self::new(0)
263 }
264 }
265}
266
267#[cfg(feature = "xz")]
268pub mod xz {
269 #[derive(Clone, Copy, Eq, PartialEq, Debug)]
270 pub struct XzSettings {
271 pub compression_level: u8,
272 }
273
274 impl XzSettings {
275 pub fn new(compression_level: u8) -> Self {
276 Self { compression_level }
277 }
278 }
279
280 impl Default for XzSettings {
281 fn default() -> Self {
282 XzSettings::new(9)
283 }
284 }
285}
286
287#[cfg(test)]
288mod tests {
289 use super::*;
290 use apache_avro_test_helper::TestResult;
291 use miniz_oxide::deflate::CompressionLevel;
292 use pretty_assertions::{assert_eq, assert_ne};
293
294 const INPUT: &[u8] = b"theanswertolifetheuniverseandeverythingis42theanswertolifetheuniverseandeverythingis4theanswertolifetheuniverseandeverythingis2";
295
296 #[test]
297 fn null_compress_and_decompress() -> TestResult {
298 let codec = Codec::Null;
299 let mut stream = INPUT.to_vec();
300 codec.compress(&mut stream)?;
301 assert_eq!(INPUT, stream.as_slice());
302 codec.decompress(&mut stream)?;
303 assert_eq!(INPUT, stream.as_slice());
304 Ok(())
305 }
306
307 #[test]
308 fn deflate_compress_and_decompress() -> TestResult {
309 compress_and_decompress(Codec::Deflate(DeflateSettings::new(
310 CompressionLevel::BestCompression,
311 )))
312 }
313
314 #[cfg(feature = "snappy")]
315 #[test]
316 fn snappy_compress_and_decompress() -> TestResult {
317 compress_and_decompress(Codec::Snappy)
318 }
319
320 #[cfg(feature = "zstandard")]
321 #[test]
322 fn zstd_compress_and_decompress() -> TestResult {
323 compress_and_decompress(Codec::Zstandard(zstandard::ZstandardSettings::default()))
324 }
325
326 #[cfg(feature = "bzip")]
327 #[test]
328 fn bzip_compress_and_decompress() -> TestResult {
329 compress_and_decompress(Codec::Bzip2(bzip::Bzip2Settings::default()))
330 }
331
332 #[cfg(feature = "xz")]
333 #[test]
334 fn xz_compress_and_decompress() -> TestResult {
335 compress_and_decompress(Codec::Xz(xz::XzSettings::default()))
336 }
337
338 fn compress_and_decompress(codec: Codec) -> TestResult {
339 let mut stream = INPUT.to_vec();
340 codec.compress(&mut stream)?;
341 assert_ne!(INPUT, stream.as_slice());
342 assert!(INPUT.len() > stream.len());
343 codec.decompress(&mut stream)?;
344 assert_eq!(INPUT, stream.as_slice());
345 Ok(())
346 }
347
348 #[test]
349 fn codec_to_str() {
350 assert_eq!(<&str>::from(Codec::Null), "null");
351 assert_eq!(
352 <&str>::from(Codec::Deflate(DeflateSettings::default())),
353 "deflate"
354 );
355
356 #[cfg(feature = "snappy")]
357 assert_eq!(<&str>::from(Codec::Snappy), "snappy");
358
359 #[cfg(feature = "zstandard")]
360 assert_eq!(
361 <&str>::from(Codec::Zstandard(zstandard::ZstandardSettings::default())),
362 "zstandard"
363 );
364
365 #[cfg(feature = "bzip")]
366 assert_eq!(
367 <&str>::from(Codec::Bzip2(bzip::Bzip2Settings::default())),
368 "bzip2"
369 );
370
371 #[cfg(feature = "xz")]
372 assert_eq!(<&str>::from(Codec::Xz(xz::XzSettings::default())), "xz");
373 }
374
375 #[test]
376 fn codec_from_str() {
377 use std::str::FromStr;
378
379 assert_eq!(Codec::from_str("null").unwrap(), Codec::Null);
380 assert_eq!(
381 Codec::from_str("deflate").unwrap(),
382 Codec::Deflate(DeflateSettings::default())
383 );
384
385 #[cfg(feature = "snappy")]
386 assert_eq!(Codec::from_str("snappy").unwrap(), Codec::Snappy);
387
388 #[cfg(feature = "zstandard")]
389 assert_eq!(
390 Codec::from_str("zstandard").unwrap(),
391 Codec::Zstandard(zstandard::ZstandardSettings::default())
392 );
393
394 #[cfg(feature = "bzip")]
395 assert_eq!(
396 Codec::from_str("bzip2").unwrap(),
397 Codec::Bzip2(bzip::Bzip2Settings::default())
398 );
399
400 #[cfg(feature = "xz")]
401 assert_eq!(
402 Codec::from_str("xz").unwrap(),
403 Codec::Xz(xz::XzSettings::default())
404 );
405
406 assert!(Codec::from_str("not a codec").is_err());
407 }
408}