apache_avro/
codec.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Logic for all supported compression codecs in Avro.
19use crate::{AvroResult, Error, error::Details, types::Value};
20use strum_macros::{EnumIter, EnumString, IntoStaticStr};
21
22/// Settings for the `Deflate` codec.
23#[derive(Clone, Copy, Eq, PartialEq, Debug)]
24pub struct DeflateSettings {
25    compression_level: miniz_oxide::deflate::CompressionLevel,
26}
27
28impl DeflateSettings {
29    pub fn new(compression_level: miniz_oxide::deflate::CompressionLevel) -> Self {
30        DeflateSettings { compression_level }
31    }
32
33    /// Get the compression level as a `u8`, note that this means the [`miniz_oxide::deflate::CompressionLevel::DefaultCompression`] variant
34    /// will appear as `255`, this is normalized by the [`miniz_oxide`] crate later.
35    pub fn compression_level(&self) -> u8 {
36        self.compression_level as u8
37    }
38}
39
40impl Default for DeflateSettings {
41    /// Default compression level is `miniz_oxide::deflate::CompressionLevel::DefaultCompression`.
42    fn default() -> Self {
43        Self::new(miniz_oxide::deflate::CompressionLevel::DefaultCompression)
44    }
45}
46
47/// The compression codec used to compress blocks.
48#[derive(Clone, Copy, Debug, Eq, PartialEq, EnumIter, EnumString, IntoStaticStr)]
49#[strum(serialize_all = "kebab_case")]
50pub enum Codec {
51    /// The `Null` codec simply passes through data uncompressed.
52    Null,
53    /// The `Deflate` codec writes the data block using the deflate algorithm
54    /// as specified in RFC 1951, and typically implemented using the zlib library.
55    /// Note that this format (unlike the "zlib format" in RFC 1950) does not have a checksum.
56    Deflate(DeflateSettings),
57    #[cfg(feature = "snappy")]
58    /// The `Snappy` codec uses Google's [Snappy](http://google.github.io/snappy/)
59    /// compression library. Each compressed block is followed by the 4-byte, big-endian
60    /// CRC32 checksum of the uncompressed data in the block.
61    Snappy,
62    #[cfg(feature = "zstandard")]
63    /// The `Zstandard` codec uses Facebook's [Zstandard](https://facebook.github.io/zstd/)
64    Zstandard(zstandard::ZstandardSettings),
65    #[cfg(feature = "bzip")]
66    /// The `BZip2` codec uses [BZip2](https://sourceware.org/bzip2/)
67    /// compression library.
68    Bzip2(bzip::Bzip2Settings),
69    #[cfg(feature = "xz")]
70    /// The `Xz` codec uses [Xz utils](https://tukaani.org/xz/)
71    /// compression library.
72    Xz(xz::XzSettings),
73}
74
75impl From<Codec> for Value {
76    fn from(value: Codec) -> Self {
77        Self::Bytes(<&str>::from(value).as_bytes().to_vec())
78    }
79}
80
81impl Codec {
82    /// Compress a stream of bytes in-place.
83    pub fn compress(self, stream: &mut Vec<u8>) -> AvroResult<()> {
84        match self {
85            Codec::Null => (),
86            Codec::Deflate(settings) => {
87                let compressed =
88                    miniz_oxide::deflate::compress_to_vec(stream, settings.compression_level());
89                *stream = compressed;
90            }
91            #[cfg(feature = "snappy")]
92            Codec::Snappy => {
93                let mut encoded: Vec<u8> = vec![0; snap::raw::max_compress_len(stream.len())];
94                let compressed_size = snap::raw::Encoder::new()
95                    .compress(&stream[..], &mut encoded[..])
96                    .map_err(Details::SnappyCompress)?;
97
98                let mut hasher = crc32fast::Hasher::new();
99                hasher.update(&stream[..]);
100                let checksum = hasher.finalize();
101                let checksum_as_bytes = checksum.to_be_bytes();
102                let checksum_len = checksum_as_bytes.len();
103                encoded.truncate(compressed_size + checksum_len);
104                encoded[compressed_size..].copy_from_slice(&checksum_as_bytes);
105
106                *stream = encoded;
107            }
108            #[cfg(feature = "zstandard")]
109            Codec::Zstandard(settings) => {
110                use std::io::Write;
111                let mut encoder =
112                    zstd::Encoder::new(Vec::new(), settings.compression_level as i32).unwrap();
113                encoder.write_all(stream).map_err(Details::ZstdCompress)?;
114                *stream = encoder.finish().unwrap();
115            }
116            #[cfg(feature = "bzip")]
117            Codec::Bzip2(settings) => {
118                use bzip2::read::BzEncoder;
119                use std::io::Read;
120
121                let mut encoder = BzEncoder::new(&stream[..], settings.compression());
122                let mut buffer = Vec::new();
123                encoder.read_to_end(&mut buffer).unwrap();
124                *stream = buffer;
125            }
126            #[cfg(feature = "xz")]
127            Codec::Xz(settings) => {
128                use liblzma::read::XzEncoder;
129                use std::io::Read;
130
131                let mut encoder = XzEncoder::new(&stream[..], settings.compression_level as u32);
132                let mut buffer = Vec::new();
133                encoder.read_to_end(&mut buffer).unwrap();
134                *stream = buffer;
135            }
136        };
137
138        Ok(())
139    }
140
141    /// Decompress a stream of bytes in-place.
142    pub fn decompress(self, stream: &mut Vec<u8>) -> AvroResult<()> {
143        *stream = match self {
144            Codec::Null => return Ok(()),
145            Codec::Deflate(_settings) => miniz_oxide::inflate::decompress_to_vec(stream).map_err(|e| {
146                let err = {
147                    use miniz_oxide::inflate::TINFLStatus::*;
148                    use std::io::{Error,ErrorKind};
149                    match e.status {
150                        FailedCannotMakeProgress => Error::from(ErrorKind::UnexpectedEof),
151                        BadParam => Error::other("Unexpected error: miniz_oxide reported invalid output buffer size. Please report this to avro-rs developers."), // not possible for _to_vec()
152                        Adler32Mismatch => Error::from(ErrorKind::InvalidData),
153                        Failed => Error::from(ErrorKind::InvalidData),
154                        Done => Error::other("Unexpected error: miniz_oxide reported an error with a success status. Please report this to avro-rs developers."),
155                        NeedsMoreInput => Error::from(ErrorKind::UnexpectedEof),
156                        HasMoreOutput => Error::other("Unexpected error: miniz_oxide has more data than the output buffer can hold. Please report this to avro-rs developers."), // not possible for _to_vec()
157                        other => Error::other(format!("Unexpected error: {other:?}"))
158                    }
159                };
160                Error::new(Details::DeflateDecompress(err))
161            })?,
162            #[cfg(feature = "snappy")]
163            Codec::Snappy => {
164                let decompressed_size = snap::raw::decompress_len(&stream[..stream.len() - 4])
165                    .map_err(Details::GetSnappyDecompressLen)?;
166                let mut decoded = vec![0; decompressed_size];
167                snap::raw::Decoder::new()
168                    .decompress(&stream[..stream.len() - 4], &mut decoded[..])
169                    .map_err(Details::SnappyDecompress)?;
170
171                let mut last_four: [u8; 4] = [0; 4];
172                last_four.copy_from_slice(&stream[(stream.len() - 4)..]);
173                let expected: u32 = u32::from_be_bytes(last_four);
174
175                let mut hasher = crc32fast::Hasher::new();
176                hasher.update(&decoded);
177                let actual = hasher.finalize();
178
179                if expected != actual {
180                    return Err(Details::SnappyCrc32{expected, actual}.into());
181                }
182                decoded
183            }
184            #[cfg(feature = "zstandard")]
185            Codec::Zstandard(_settings) => {
186                use std::io::BufReader;
187                use zstd::zstd_safe;
188
189                let mut decoded = Vec::new();
190                let buffer_size = zstd_safe::DCtx::in_size();
191                let buffer = BufReader::with_capacity(buffer_size, &stream[..]);
192                let mut decoder = zstd::Decoder::new(buffer).unwrap();
193                std::io::copy(&mut decoder, &mut decoded).map_err(Details::ZstdDecompress)?;
194                decoded
195            }
196            #[cfg(feature = "bzip")]
197            Codec::Bzip2(_) => {
198                use bzip2::read::BzDecoder;
199                use std::io::Read;
200
201                let mut decoder = BzDecoder::new(&stream[..]);
202                let mut decoded = Vec::new();
203                decoder.read_to_end(&mut decoded).unwrap();
204                decoded
205            }
206            #[cfg(feature = "xz")]
207            Codec::Xz(_) => {
208                use liblzma::read::XzDecoder;
209                use std::io::Read;
210
211                let mut decoder = XzDecoder::new(&stream[..]);
212                let mut decoded: Vec<u8> = Vec::new();
213                decoder.read_to_end(&mut decoded).unwrap();
214                decoded
215            }
216        };
217        Ok(())
218    }
219}
220
221#[cfg(feature = "bzip")]
222pub mod bzip {
223    use bzip2::Compression;
224
225    #[derive(Clone, Copy, Eq, PartialEq, Debug)]
226    pub struct Bzip2Settings {
227        pub compression_level: u8,
228    }
229
230    impl Bzip2Settings {
231        pub fn new(compression_level: u8) -> Self {
232            Self { compression_level }
233        }
234
235        pub(crate) fn compression(&self) -> Compression {
236            Compression::new(self.compression_level as u32)
237        }
238    }
239
240    impl Default for Bzip2Settings {
241        fn default() -> Self {
242            Bzip2Settings::new(Compression::best().level() as u8)
243        }
244    }
245}
246
247#[cfg(feature = "zstandard")]
248pub mod zstandard {
249    #[derive(Clone, Copy, Eq, PartialEq, Debug)]
250    pub struct ZstandardSettings {
251        pub compression_level: u8,
252    }
253
254    impl ZstandardSettings {
255        pub fn new(compression_level: u8) -> Self {
256            Self { compression_level }
257        }
258    }
259
260    impl Default for ZstandardSettings {
261        fn default() -> Self {
262            Self::new(0)
263        }
264    }
265}
266
267#[cfg(feature = "xz")]
268pub mod xz {
269    #[derive(Clone, Copy, Eq, PartialEq, Debug)]
270    pub struct XzSettings {
271        pub compression_level: u8,
272    }
273
274    impl XzSettings {
275        pub fn new(compression_level: u8) -> Self {
276            Self { compression_level }
277        }
278    }
279
280    impl Default for XzSettings {
281        fn default() -> Self {
282            XzSettings::new(9)
283        }
284    }
285}
286
287#[cfg(test)]
288mod tests {
289    use super::*;
290    use apache_avro_test_helper::TestResult;
291    use miniz_oxide::deflate::CompressionLevel;
292    use pretty_assertions::{assert_eq, assert_ne};
293
294    const INPUT: &[u8] = b"theanswertolifetheuniverseandeverythingis42theanswertolifetheuniverseandeverythingis4theanswertolifetheuniverseandeverythingis2";
295
296    #[test]
297    fn null_compress_and_decompress() -> TestResult {
298        let codec = Codec::Null;
299        let mut stream = INPUT.to_vec();
300        codec.compress(&mut stream)?;
301        assert_eq!(INPUT, stream.as_slice());
302        codec.decompress(&mut stream)?;
303        assert_eq!(INPUT, stream.as_slice());
304        Ok(())
305    }
306
307    #[test]
308    fn deflate_compress_and_decompress() -> TestResult {
309        compress_and_decompress(Codec::Deflate(DeflateSettings::new(
310            CompressionLevel::BestCompression,
311        )))
312    }
313
314    #[cfg(feature = "snappy")]
315    #[test]
316    fn snappy_compress_and_decompress() -> TestResult {
317        compress_and_decompress(Codec::Snappy)
318    }
319
320    #[cfg(feature = "zstandard")]
321    #[test]
322    fn zstd_compress_and_decompress() -> TestResult {
323        compress_and_decompress(Codec::Zstandard(zstandard::ZstandardSettings::default()))
324    }
325
326    #[cfg(feature = "bzip")]
327    #[test]
328    fn bzip_compress_and_decompress() -> TestResult {
329        compress_and_decompress(Codec::Bzip2(bzip::Bzip2Settings::default()))
330    }
331
332    #[cfg(feature = "xz")]
333    #[test]
334    fn xz_compress_and_decompress() -> TestResult {
335        compress_and_decompress(Codec::Xz(xz::XzSettings::default()))
336    }
337
338    fn compress_and_decompress(codec: Codec) -> TestResult {
339        let mut stream = INPUT.to_vec();
340        codec.compress(&mut stream)?;
341        assert_ne!(INPUT, stream.as_slice());
342        assert!(INPUT.len() > stream.len());
343        codec.decompress(&mut stream)?;
344        assert_eq!(INPUT, stream.as_slice());
345        Ok(())
346    }
347
348    #[test]
349    fn codec_to_str() {
350        assert_eq!(<&str>::from(Codec::Null), "null");
351        assert_eq!(
352            <&str>::from(Codec::Deflate(DeflateSettings::default())),
353            "deflate"
354        );
355
356        #[cfg(feature = "snappy")]
357        assert_eq!(<&str>::from(Codec::Snappy), "snappy");
358
359        #[cfg(feature = "zstandard")]
360        assert_eq!(
361            <&str>::from(Codec::Zstandard(zstandard::ZstandardSettings::default())),
362            "zstandard"
363        );
364
365        #[cfg(feature = "bzip")]
366        assert_eq!(
367            <&str>::from(Codec::Bzip2(bzip::Bzip2Settings::default())),
368            "bzip2"
369        );
370
371        #[cfg(feature = "xz")]
372        assert_eq!(<&str>::from(Codec::Xz(xz::XzSettings::default())), "xz");
373    }
374
375    #[test]
376    fn codec_from_str() {
377        use std::str::FromStr;
378
379        assert_eq!(Codec::from_str("null").unwrap(), Codec::Null);
380        assert_eq!(
381            Codec::from_str("deflate").unwrap(),
382            Codec::Deflate(DeflateSettings::default())
383        );
384
385        #[cfg(feature = "snappy")]
386        assert_eq!(Codec::from_str("snappy").unwrap(), Codec::Snappy);
387
388        #[cfg(feature = "zstandard")]
389        assert_eq!(
390            Codec::from_str("zstandard").unwrap(),
391            Codec::Zstandard(zstandard::ZstandardSettings::default())
392        );
393
394        #[cfg(feature = "bzip")]
395        assert_eq!(
396            Codec::from_str("bzip2").unwrap(),
397            Codec::Bzip2(bzip::Bzip2Settings::default())
398        );
399
400        #[cfg(feature = "xz")]
401        assert_eq!(
402            Codec::from_str("xz").unwrap(),
403            Codec::Xz(xz::XzSettings::default())
404        );
405
406        assert!(Codec::from_str("not a codec").is_err());
407    }
408}