1use crate::{types::Value, AvroResult, Error};
20use strum_macros::{EnumIter, EnumString, IntoStaticStr};
21
22#[derive(Clone, Copy, Eq, PartialEq, Debug)]
24pub struct DeflateSettings {
25 compression_level: miniz_oxide::deflate::CompressionLevel,
26}
27
28impl DeflateSettings {
29 pub fn new(compression_level: miniz_oxide::deflate::CompressionLevel) -> Self {
30 DeflateSettings { compression_level }
31 }
32
33 fn compression_level(&self) -> u8 {
34 self.compression_level as u8
35 }
36}
37
38impl Default for DeflateSettings {
39 fn default() -> Self {
41 Self::new(miniz_oxide::deflate::CompressionLevel::DefaultCompression)
42 }
43}
44
45#[derive(Clone, Copy, Debug, Eq, PartialEq, EnumIter, EnumString, IntoStaticStr)]
47#[strum(serialize_all = "kebab_case")]
48pub enum Codec {
49 Null,
51 Deflate(DeflateSettings),
55 #[cfg(feature = "snappy")]
56 Snappy,
60 #[cfg(feature = "zstandard")]
61 Zstandard(zstandard::ZstandardSettings),
63 #[cfg(feature = "bzip")]
64 Bzip2(bzip::Bzip2Settings),
67 #[cfg(feature = "xz")]
68 Xz(xz::XzSettings),
71}
72
73impl From<Codec> for Value {
74 fn from(value: Codec) -> Self {
75 Self::Bytes(<&str>::from(value).as_bytes().to_vec())
76 }
77}
78
79impl Codec {
80 pub fn compress(self, stream: &mut Vec<u8>) -> AvroResult<()> {
82 match self {
83 Codec::Null => (),
84 Codec::Deflate(settings) => {
85 let compressed =
86 miniz_oxide::deflate::compress_to_vec(stream, settings.compression_level());
87 *stream = compressed;
88 }
89 #[cfg(feature = "snappy")]
90 Codec::Snappy => {
91 let mut encoded: Vec<u8> = vec![0; snap::raw::max_compress_len(stream.len())];
92 let compressed_size = snap::raw::Encoder::new()
93 .compress(&stream[..], &mut encoded[..])
94 .map_err(Error::SnappyCompress)?;
95
96 let mut hasher = crc32fast::Hasher::new();
97 hasher.update(&stream[..]);
98 let checksum = hasher.finalize();
99 let checksum_as_bytes = checksum.to_be_bytes();
100 let checksum_len = checksum_as_bytes.len();
101 encoded.truncate(compressed_size + checksum_len);
102 encoded[compressed_size..].copy_from_slice(&checksum_as_bytes);
103
104 *stream = encoded;
105 }
106 #[cfg(feature = "zstandard")]
107 Codec::Zstandard(settings) => {
108 use std::io::Write;
109 let mut encoder =
110 zstd::Encoder::new(Vec::new(), settings.compression_level as i32).unwrap();
111 encoder.write_all(stream).map_err(Error::ZstdCompress)?;
112 *stream = encoder.finish().unwrap();
113 }
114 #[cfg(feature = "bzip")]
115 Codec::Bzip2(settings) => {
116 use bzip2::read::BzEncoder;
117 use std::io::Read;
118
119 let mut encoder = BzEncoder::new(&stream[..], settings.compression());
120 let mut buffer = Vec::new();
121 encoder.read_to_end(&mut buffer).unwrap();
122 *stream = buffer;
123 }
124 #[cfg(feature = "xz")]
125 Codec::Xz(settings) => {
126 use std::io::Read;
127 use xz2::read::XzEncoder;
128
129 let mut encoder = XzEncoder::new(&stream[..], settings.compression_level as u32);
130 let mut buffer = Vec::new();
131 encoder.read_to_end(&mut buffer).unwrap();
132 *stream = buffer;
133 }
134 };
135
136 Ok(())
137 }
138
139 pub fn decompress(self, stream: &mut Vec<u8>) -> AvroResult<()> {
141 *stream = match self {
142 Codec::Null => return Ok(()),
143 Codec::Deflate(_settings) => miniz_oxide::inflate::decompress_to_vec(stream).map_err(|e| {
144 let err = {
145 use miniz_oxide::inflate::TINFLStatus::*;
146 use std::io::{Error,ErrorKind};
147 match e.status {
148 FailedCannotMakeProgress => Error::from(ErrorKind::UnexpectedEof),
149 BadParam => Error::other("Unexpected error: miniz_oxide reported invalid output buffer size. Please report this to avro-rs developers."), Adler32Mismatch => Error::from(ErrorKind::InvalidData),
151 Failed => Error::from(ErrorKind::InvalidData),
152 Done => Error::other("Unexpected error: miniz_oxide reported an error with a success status. Please report this to avro-rs developers."),
153 NeedsMoreInput => Error::from(ErrorKind::UnexpectedEof),
154 HasMoreOutput => Error::other("Unexpected error: miniz_oxide has more data than the output buffer can hold. Please report this to avro-rs developers."), }
156 };
157 Error::DeflateDecompress(err)
158 })?,
159 #[cfg(feature = "snappy")]
160 Codec::Snappy => {
161 let decompressed_size = snap::raw::decompress_len(&stream[..stream.len() - 4])
162 .map_err(Error::GetSnappyDecompressLen)?;
163 let mut decoded = vec![0; decompressed_size];
164 snap::raw::Decoder::new()
165 .decompress(&stream[..stream.len() - 4], &mut decoded[..])
166 .map_err(Error::SnappyDecompress)?;
167
168 let mut last_four: [u8; 4] = [0; 4];
169 last_four.copy_from_slice(&stream[(stream.len() - 4)..]);
170 let expected: u32 = u32::from_be_bytes(last_four);
171
172 let mut hasher = crc32fast::Hasher::new();
173 hasher.update(&decoded);
174 let actual = hasher.finalize();
175
176 if expected != actual {
177 return Err(Error::SnappyCrc32 { expected, actual });
178 }
179 decoded
180 }
181 #[cfg(feature = "zstandard")]
182 Codec::Zstandard(_settings) => {
183 use std::io::BufReader;
184 use zstd::zstd_safe;
185
186 let mut decoded = Vec::new();
187 let buffer_size = zstd_safe::DCtx::in_size();
188 let buffer = BufReader::with_capacity(buffer_size, &stream[..]);
189 let mut decoder = zstd::Decoder::new(buffer).unwrap();
190 std::io::copy(&mut decoder, &mut decoded).map_err(Error::ZstdDecompress)?;
191 decoded
192 }
193 #[cfg(feature = "bzip")]
194 Codec::Bzip2(_) => {
195 use bzip2::read::BzDecoder;
196 use std::io::Read;
197
198 let mut decoder = BzDecoder::new(&stream[..]);
199 let mut decoded = Vec::new();
200 decoder.read_to_end(&mut decoded).unwrap();
201 decoded
202 }
203 #[cfg(feature = "xz")]
204 Codec::Xz(_) => {
205 use xz2::read::XzDecoder;
206 use std::io::Read;
207
208 let mut decoder = XzDecoder::new(&stream[..]);
209 let mut decoded: Vec<u8> = Vec::new();
210 decoder.read_to_end(&mut decoded).unwrap();
211 decoded
212 }
213 };
214 Ok(())
215 }
216}
217
218#[cfg(feature = "bzip")]
219pub mod bzip {
220 use bzip2::Compression;
221
222 #[derive(Clone, Copy, Eq, PartialEq, Debug)]
223 pub struct Bzip2Settings {
224 pub compression_level: u8,
225 }
226
227 impl Bzip2Settings {
228 pub fn new(compression_level: u8) -> Self {
229 Self { compression_level }
230 }
231
232 pub(crate) fn compression(&self) -> Compression {
233 Compression::new(self.compression_level as u32)
234 }
235 }
236
237 impl Default for Bzip2Settings {
238 fn default() -> Self {
239 Bzip2Settings::new(Compression::best().level() as u8)
240 }
241 }
242}
243
244#[cfg(feature = "zstandard")]
245pub mod zstandard {
246 #[derive(Clone, Copy, Eq, PartialEq, Debug)]
247 pub struct ZstandardSettings {
248 pub compression_level: u8,
249 }
250
251 impl ZstandardSettings {
252 pub fn new(compression_level: u8) -> Self {
253 Self { compression_level }
254 }
255 }
256
257 impl Default for ZstandardSettings {
258 fn default() -> Self {
259 Self::new(0)
260 }
261 }
262}
263
264#[cfg(feature = "xz")]
265pub mod xz {
266 #[derive(Clone, Copy, Eq, PartialEq, Debug)]
267 pub struct XzSettings {
268 pub compression_level: u8,
269 }
270
271 impl XzSettings {
272 pub fn new(compression_level: u8) -> Self {
273 Self { compression_level }
274 }
275 }
276
277 impl Default for XzSettings {
278 fn default() -> Self {
279 XzSettings::new(9)
280 }
281 }
282}
283
284#[cfg(test)]
285mod tests {
286 use super::*;
287 use apache_avro_test_helper::TestResult;
288 use miniz_oxide::deflate::CompressionLevel;
289 use pretty_assertions::{assert_eq, assert_ne};
290
291 const INPUT: &[u8] = b"theanswertolifetheuniverseandeverythingis42theanswertolifetheuniverseandeverythingis4theanswertolifetheuniverseandeverythingis2";
292
293 #[test]
294 fn null_compress_and_decompress() -> TestResult {
295 let codec = Codec::Null;
296 let mut stream = INPUT.to_vec();
297 codec.compress(&mut stream)?;
298 assert_eq!(INPUT, stream.as_slice());
299 codec.decompress(&mut stream)?;
300 assert_eq!(INPUT, stream.as_slice());
301 Ok(())
302 }
303
304 #[test]
305 fn deflate_compress_and_decompress() -> TestResult {
306 compress_and_decompress(Codec::Deflate(DeflateSettings::new(
307 CompressionLevel::BestCompression,
308 )))
309 }
310
311 #[cfg(feature = "snappy")]
312 #[test]
313 fn snappy_compress_and_decompress() -> TestResult {
314 compress_and_decompress(Codec::Snappy)
315 }
316
317 #[cfg(feature = "zstandard")]
318 #[test]
319 fn zstd_compress_and_decompress() -> TestResult {
320 compress_and_decompress(Codec::Zstandard(zstandard::ZstandardSettings::default()))
321 }
322
323 #[cfg(feature = "bzip")]
324 #[test]
325 fn bzip_compress_and_decompress() -> TestResult {
326 compress_and_decompress(Codec::Bzip2(bzip::Bzip2Settings::default()))
327 }
328
329 #[cfg(feature = "xz")]
330 #[test]
331 fn xz_compress_and_decompress() -> TestResult {
332 compress_and_decompress(Codec::Xz(xz::XzSettings::default()))
333 }
334
335 fn compress_and_decompress(codec: Codec) -> TestResult {
336 let mut stream = INPUT.to_vec();
337 codec.compress(&mut stream)?;
338 assert_ne!(INPUT, stream.as_slice());
339 assert!(INPUT.len() > stream.len());
340 codec.decompress(&mut stream)?;
341 assert_eq!(INPUT, stream.as_slice());
342 Ok(())
343 }
344
345 #[test]
346 fn codec_to_str() {
347 assert_eq!(<&str>::from(Codec::Null), "null");
348 assert_eq!(
349 <&str>::from(Codec::Deflate(DeflateSettings::default())),
350 "deflate"
351 );
352
353 #[cfg(feature = "snappy")]
354 assert_eq!(<&str>::from(Codec::Snappy), "snappy");
355
356 #[cfg(feature = "zstandard")]
357 assert_eq!(
358 <&str>::from(Codec::Zstandard(zstandard::ZstandardSettings::default())),
359 "zstandard"
360 );
361
362 #[cfg(feature = "bzip")]
363 assert_eq!(
364 <&str>::from(Codec::Bzip2(bzip::Bzip2Settings::default())),
365 "bzip2"
366 );
367
368 #[cfg(feature = "xz")]
369 assert_eq!(<&str>::from(Codec::Xz(xz::XzSettings::default())), "xz");
370 }
371
372 #[test]
373 fn codec_from_str() {
374 use std::str::FromStr;
375
376 assert_eq!(Codec::from_str("null").unwrap(), Codec::Null);
377 assert_eq!(
378 Codec::from_str("deflate").unwrap(),
379 Codec::Deflate(DeflateSettings::default())
380 );
381
382 #[cfg(feature = "snappy")]
383 assert_eq!(Codec::from_str("snappy").unwrap(), Codec::Snappy);
384
385 #[cfg(feature = "zstandard")]
386 assert_eq!(
387 Codec::from_str("zstandard").unwrap(),
388 Codec::Zstandard(zstandard::ZstandardSettings::default())
389 );
390
391 #[cfg(feature = "bzip")]
392 assert_eq!(
393 Codec::from_str("bzip2").unwrap(),
394 Codec::Bzip2(bzip::Bzip2Settings::default())
395 );
396
397 #[cfg(feature = "xz")]
398 assert_eq!(
399 Codec::from_str("xz").unwrap(),
400 Codec::Xz(xz::XzSettings::default())
401 );
402
403 assert!(Codec::from_str("not a codec").is_err());
404 }
405}