1use crate::{AvroResult, Error, error::Details, types::Value};
20use strum_macros::{EnumIter, EnumString, IntoStaticStr};
21
22#[derive(Clone, Copy, Eq, PartialEq, Debug)]
24pub struct DeflateSettings {
25 compression_level: miniz_oxide::deflate::CompressionLevel,
26}
27
28impl DeflateSettings {
29 pub fn new(compression_level: miniz_oxide::deflate::CompressionLevel) -> Self {
30 DeflateSettings { compression_level }
31 }
32
33 pub fn compression_level(&self) -> u8 {
36 self.compression_level as u8
37 }
38}
39
40impl Default for DeflateSettings {
41 fn default() -> Self {
43 Self::new(miniz_oxide::deflate::CompressionLevel::DefaultCompression)
44 }
45}
46
47#[derive(Clone, Copy, Debug, Eq, PartialEq, EnumIter, EnumString, IntoStaticStr)]
49#[strum(serialize_all = "kebab_case")]
50pub enum Codec {
51 Null,
53 Deflate(DeflateSettings),
57 #[cfg(feature = "snappy")]
58 Snappy,
62 #[cfg(feature = "zstandard")]
63 Zstandard(zstandard::ZstandardSettings),
65 #[cfg(feature = "bzip")]
66 Bzip2(bzip::Bzip2Settings),
69 #[cfg(feature = "xz")]
70 Xz(xz::XzSettings),
73}
74
75impl From<Codec> for Value {
76 fn from(value: Codec) -> Self {
77 Self::Bytes(<&str>::from(value).as_bytes().to_vec())
78 }
79}
80
81impl Codec {
82 pub fn compress(self, stream: &mut Vec<u8>) -> AvroResult<()> {
84 match self {
85 Codec::Null => (),
86 Codec::Deflate(settings) => {
87 let compressed =
88 miniz_oxide::deflate::compress_to_vec(stream, settings.compression_level());
89 *stream = compressed;
90 }
91 #[cfg(feature = "snappy")]
92 Codec::Snappy => {
93 let mut encoded: Vec<u8> = vec![0; snap::raw::max_compress_len(stream.len())];
94 let compressed_size = snap::raw::Encoder::new()
95 .compress(&stream[..], &mut encoded[..])
96 .map_err(Details::SnappyCompress)?;
97
98 let mut hasher = crc32fast::Hasher::new();
99 hasher.update(&stream[..]);
100 let checksum = hasher.finalize();
101 let checksum_as_bytes = checksum.to_be_bytes();
102 let checksum_len = checksum_as_bytes.len();
103 encoded.truncate(compressed_size + checksum_len);
104 encoded[compressed_size..].copy_from_slice(&checksum_as_bytes);
105
106 *stream = encoded;
107 }
108 #[cfg(feature = "zstandard")]
109 Codec::Zstandard(settings) => {
110 use std::io::Write;
111 let mut encoder =
112 zstd::Encoder::new(Vec::new(), settings.compression_level as i32).unwrap();
113 encoder.write_all(stream).map_err(Details::ZstdCompress)?;
114 *stream = encoder.finish().unwrap();
115 }
116 #[cfg(feature = "bzip")]
117 Codec::Bzip2(settings) => {
118 use bzip2::read::BzEncoder;
119 use std::io::Read;
120
121 let mut encoder = BzEncoder::new(&stream[..], settings.compression());
122 let mut buffer = Vec::new();
123 encoder.read_to_end(&mut buffer).unwrap();
124 *stream = buffer;
125 }
126 #[cfg(feature = "xz")]
127 Codec::Xz(settings) => {
128 use liblzma::read::XzEncoder;
129 use std::io::Read;
130
131 let mut encoder = XzEncoder::new(&stream[..], settings.compression_level as u32);
132 let mut buffer = Vec::new();
133 encoder.read_to_end(&mut buffer).unwrap();
134 *stream = buffer;
135 }
136 };
137
138 Ok(())
139 }
140
141 pub fn decompress(self, stream: &mut Vec<u8>) -> AvroResult<()> {
143 *stream = match self {
144 Codec::Null => return Ok(()),
145 Codec::Deflate(_settings) => miniz_oxide::inflate::decompress_to_vec(stream).map_err(|e| {
146 let err = {
147 use miniz_oxide::inflate::TINFLStatus::*;
148 use std::io::{Error,ErrorKind};
149 match e.status {
150 FailedCannotMakeProgress => Error::from(ErrorKind::UnexpectedEof),
151 BadParam => Error::other("Unexpected error: miniz_oxide reported invalid output buffer size. Please report this to avro-rs developers."), Adler32Mismatch => Error::from(ErrorKind::InvalidData),
153 Failed => Error::from(ErrorKind::InvalidData),
154 Done => Error::other("Unexpected error: miniz_oxide reported an error with a success status. Please report this to avro-rs developers."),
155 NeedsMoreInput => Error::from(ErrorKind::UnexpectedEof),
156 HasMoreOutput => Error::other("Unexpected error: miniz_oxide has more data than the output buffer can hold. Please report this to avro-rs developers."), }
158 };
159 Error::new(Details::DeflateDecompress(err))
160 })?,
161 #[cfg(feature = "snappy")]
162 Codec::Snappy => {
163 let decompressed_size = snap::raw::decompress_len(&stream[..stream.len() - 4])
164 .map_err(Details::GetSnappyDecompressLen)?;
165 let mut decoded = vec![0; decompressed_size];
166 snap::raw::Decoder::new()
167 .decompress(&stream[..stream.len() - 4], &mut decoded[..])
168 .map_err(Details::SnappyDecompress)?;
169
170 let mut last_four: [u8; 4] = [0; 4];
171 last_four.copy_from_slice(&stream[(stream.len() - 4)..]);
172 let expected: u32 = u32::from_be_bytes(last_four);
173
174 let mut hasher = crc32fast::Hasher::new();
175 hasher.update(&decoded);
176 let actual = hasher.finalize();
177
178 if expected != actual {
179 return Err(Details::SnappyCrc32{expected, actual}.into());
180 }
181 decoded
182 }
183 #[cfg(feature = "zstandard")]
184 Codec::Zstandard(_settings) => {
185 use std::io::BufReader;
186 use zstd::zstd_safe;
187
188 let mut decoded = Vec::new();
189 let buffer_size = zstd_safe::DCtx::in_size();
190 let buffer = BufReader::with_capacity(buffer_size, &stream[..]);
191 let mut decoder = zstd::Decoder::new(buffer).unwrap();
192 std::io::copy(&mut decoder, &mut decoded).map_err(Details::ZstdDecompress)?;
193 decoded
194 }
195 #[cfg(feature = "bzip")]
196 Codec::Bzip2(_) => {
197 use bzip2::read::BzDecoder;
198 use std::io::Read;
199
200 let mut decoder = BzDecoder::new(&stream[..]);
201 let mut decoded = Vec::new();
202 decoder.read_to_end(&mut decoded).unwrap();
203 decoded
204 }
205 #[cfg(feature = "xz")]
206 Codec::Xz(_) => {
207 use liblzma::read::XzDecoder;
208 use std::io::Read;
209
210 let mut decoder = XzDecoder::new(&stream[..]);
211 let mut decoded: Vec<u8> = Vec::new();
212 decoder.read_to_end(&mut decoded).unwrap();
213 decoded
214 }
215 };
216 Ok(())
217 }
218}
219
220#[cfg(feature = "bzip")]
221pub mod bzip {
222 use bzip2::Compression;
223
224 #[derive(Clone, Copy, Eq, PartialEq, Debug)]
225 pub struct Bzip2Settings {
226 pub compression_level: u8,
227 }
228
229 impl Bzip2Settings {
230 pub fn new(compression_level: u8) -> Self {
231 Self { compression_level }
232 }
233
234 pub(crate) fn compression(&self) -> Compression {
235 Compression::new(self.compression_level as u32)
236 }
237 }
238
239 impl Default for Bzip2Settings {
240 fn default() -> Self {
241 Bzip2Settings::new(Compression::best().level() as u8)
242 }
243 }
244}
245
246#[cfg(feature = "zstandard")]
247pub mod zstandard {
248 #[derive(Clone, Copy, Eq, PartialEq, Debug)]
249 pub struct ZstandardSettings {
250 pub compression_level: u8,
251 }
252
253 impl ZstandardSettings {
254 pub fn new(compression_level: u8) -> Self {
255 Self { compression_level }
256 }
257 }
258
259 impl Default for ZstandardSettings {
260 fn default() -> Self {
261 Self::new(0)
262 }
263 }
264}
265
266#[cfg(feature = "xz")]
267pub mod xz {
268 #[derive(Clone, Copy, Eq, PartialEq, Debug)]
269 pub struct XzSettings {
270 pub compression_level: u8,
271 }
272
273 impl XzSettings {
274 pub fn new(compression_level: u8) -> Self {
275 Self { compression_level }
276 }
277 }
278
279 impl Default for XzSettings {
280 fn default() -> Self {
281 XzSettings::new(9)
282 }
283 }
284}
285
286#[cfg(test)]
287mod tests {
288 use super::*;
289 use apache_avro_test_helper::TestResult;
290 use miniz_oxide::deflate::CompressionLevel;
291 use pretty_assertions::{assert_eq, assert_ne};
292
293 const INPUT: &[u8] = b"theanswertolifetheuniverseandeverythingis42theanswertolifetheuniverseandeverythingis4theanswertolifetheuniverseandeverythingis2";
294
295 #[test]
296 fn null_compress_and_decompress() -> TestResult {
297 let codec = Codec::Null;
298 let mut stream = INPUT.to_vec();
299 codec.compress(&mut stream)?;
300 assert_eq!(INPUT, stream.as_slice());
301 codec.decompress(&mut stream)?;
302 assert_eq!(INPUT, stream.as_slice());
303 Ok(())
304 }
305
306 #[test]
307 fn deflate_compress_and_decompress() -> TestResult {
308 compress_and_decompress(Codec::Deflate(DeflateSettings::new(
309 CompressionLevel::BestCompression,
310 )))
311 }
312
313 #[cfg(feature = "snappy")]
314 #[test]
315 fn snappy_compress_and_decompress() -> TestResult {
316 compress_and_decompress(Codec::Snappy)
317 }
318
319 #[cfg(feature = "zstandard")]
320 #[test]
321 fn zstd_compress_and_decompress() -> TestResult {
322 compress_and_decompress(Codec::Zstandard(zstandard::ZstandardSettings::default()))
323 }
324
325 #[cfg(feature = "bzip")]
326 #[test]
327 fn bzip_compress_and_decompress() -> TestResult {
328 compress_and_decompress(Codec::Bzip2(bzip::Bzip2Settings::default()))
329 }
330
331 #[cfg(feature = "xz")]
332 #[test]
333 fn xz_compress_and_decompress() -> TestResult {
334 compress_and_decompress(Codec::Xz(xz::XzSettings::default()))
335 }
336
337 fn compress_and_decompress(codec: Codec) -> TestResult {
338 let mut stream = INPUT.to_vec();
339 codec.compress(&mut stream)?;
340 assert_ne!(INPUT, stream.as_slice());
341 assert!(INPUT.len() > stream.len());
342 codec.decompress(&mut stream)?;
343 assert_eq!(INPUT, stream.as_slice());
344 Ok(())
345 }
346
347 #[test]
348 fn codec_to_str() {
349 assert_eq!(<&str>::from(Codec::Null), "null");
350 assert_eq!(
351 <&str>::from(Codec::Deflate(DeflateSettings::default())),
352 "deflate"
353 );
354
355 #[cfg(feature = "snappy")]
356 assert_eq!(<&str>::from(Codec::Snappy), "snappy");
357
358 #[cfg(feature = "zstandard")]
359 assert_eq!(
360 <&str>::from(Codec::Zstandard(zstandard::ZstandardSettings::default())),
361 "zstandard"
362 );
363
364 #[cfg(feature = "bzip")]
365 assert_eq!(
366 <&str>::from(Codec::Bzip2(bzip::Bzip2Settings::default())),
367 "bzip2"
368 );
369
370 #[cfg(feature = "xz")]
371 assert_eq!(<&str>::from(Codec::Xz(xz::XzSettings::default())), "xz");
372 }
373
374 #[test]
375 fn codec_from_str() {
376 use std::str::FromStr;
377
378 assert_eq!(Codec::from_str("null").unwrap(), Codec::Null);
379 assert_eq!(
380 Codec::from_str("deflate").unwrap(),
381 Codec::Deflate(DeflateSettings::default())
382 );
383
384 #[cfg(feature = "snappy")]
385 assert_eq!(Codec::from_str("snappy").unwrap(), Codec::Snappy);
386
387 #[cfg(feature = "zstandard")]
388 assert_eq!(
389 Codec::from_str("zstandard").unwrap(),
390 Codec::Zstandard(zstandard::ZstandardSettings::default())
391 );
392
393 #[cfg(feature = "bzip")]
394 assert_eq!(
395 Codec::from_str("bzip2").unwrap(),
396 Codec::Bzip2(bzip::Bzip2Settings::default())
397 );
398
399 #[cfg(feature = "xz")]
400 assert_eq!(
401 Codec::from_str("xz").unwrap(),
402 Codec::Xz(xz::XzSettings::default())
403 );
404
405 assert!(Codec::from_str("not a codec").is_err());
406 }
407}