apache_avro/
codec.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Logic for all supported compression codecs in Avro.
19use crate::{types::Value, AvroResult, Error};
20use strum_macros::{EnumIter, EnumString, IntoStaticStr};
21
22/// Settings for the `Deflate` codec.
23#[derive(Clone, Copy, Eq, PartialEq, Debug)]
24pub struct DeflateSettings {
25    compression_level: miniz_oxide::deflate::CompressionLevel,
26}
27
28impl DeflateSettings {
29    pub fn new(compression_level: miniz_oxide::deflate::CompressionLevel) -> Self {
30        DeflateSettings { compression_level }
31    }
32
33    fn compression_level(&self) -> u8 {
34        self.compression_level as u8
35    }
36}
37
38impl Default for DeflateSettings {
39    /// Default compression level is `miniz_oxide::deflate::CompressionLevel::DefaultCompression`.
40    fn default() -> Self {
41        Self::new(miniz_oxide::deflate::CompressionLevel::DefaultCompression)
42    }
43}
44
45/// The compression codec used to compress blocks.
46#[derive(Clone, Copy, Debug, Eq, PartialEq, EnumIter, EnumString, IntoStaticStr)]
47#[strum(serialize_all = "kebab_case")]
48pub enum Codec {
49    /// The `Null` codec simply passes through data uncompressed.
50    Null,
51    /// The `Deflate` codec writes the data block using the deflate algorithm
52    /// as specified in RFC 1951, and typically implemented using the zlib library.
53    /// Note that this format (unlike the "zlib format" in RFC 1950) does not have a checksum.
54    Deflate(DeflateSettings),
55    #[cfg(feature = "snappy")]
56    /// The `Snappy` codec uses Google's [Snappy](http://google.github.io/snappy/)
57    /// compression library. Each compressed block is followed by the 4-byte, big-endian
58    /// CRC32 checksum of the uncompressed data in the block.
59    Snappy,
60    #[cfg(feature = "zstandard")]
61    /// The `Zstandard` codec uses Facebook's [Zstandard](https://facebook.github.io/zstd/)
62    Zstandard(zstandard::ZstandardSettings),
63    #[cfg(feature = "bzip")]
64    /// The `BZip2` codec uses [BZip2](https://sourceware.org/bzip2/)
65    /// compression library.
66    Bzip2(bzip::Bzip2Settings),
67    #[cfg(feature = "xz")]
68    /// The `Xz` codec uses [Xz utils](https://tukaani.org/xz/)
69    /// compression library.
70    Xz(xz::XzSettings),
71}
72
73impl From<Codec> for Value {
74    fn from(value: Codec) -> Self {
75        Self::Bytes(<&str>::from(value).as_bytes().to_vec())
76    }
77}
78
79impl Codec {
80    /// Compress a stream of bytes in-place.
81    pub fn compress(self, stream: &mut Vec<u8>) -> AvroResult<()> {
82        match self {
83            Codec::Null => (),
84            Codec::Deflate(settings) => {
85                let compressed =
86                    miniz_oxide::deflate::compress_to_vec(stream, settings.compression_level());
87                *stream = compressed;
88            }
89            #[cfg(feature = "snappy")]
90            Codec::Snappy => {
91                let mut encoded: Vec<u8> = vec![0; snap::raw::max_compress_len(stream.len())];
92                let compressed_size = snap::raw::Encoder::new()
93                    .compress(&stream[..], &mut encoded[..])
94                    .map_err(Error::SnappyCompress)?;
95
96                let mut hasher = crc32fast::Hasher::new();
97                hasher.update(&stream[..]);
98                let checksum = hasher.finalize();
99                let checksum_as_bytes = checksum.to_be_bytes();
100                let checksum_len = checksum_as_bytes.len();
101                encoded.truncate(compressed_size + checksum_len);
102                encoded[compressed_size..].copy_from_slice(&checksum_as_bytes);
103
104                *stream = encoded;
105            }
106            #[cfg(feature = "zstandard")]
107            Codec::Zstandard(settings) => {
108                use std::io::Write;
109                let mut encoder =
110                    zstd::Encoder::new(Vec::new(), settings.compression_level as i32).unwrap();
111                encoder.write_all(stream).map_err(Error::ZstdCompress)?;
112                *stream = encoder.finish().unwrap();
113            }
114            #[cfg(feature = "bzip")]
115            Codec::Bzip2(settings) => {
116                use bzip2::read::BzEncoder;
117                use std::io::Read;
118
119                let mut encoder = BzEncoder::new(&stream[..], settings.compression());
120                let mut buffer = Vec::new();
121                encoder.read_to_end(&mut buffer).unwrap();
122                *stream = buffer;
123            }
124            #[cfg(feature = "xz")]
125            Codec::Xz(settings) => {
126                use std::io::Read;
127                use xz2::read::XzEncoder;
128
129                let mut encoder = XzEncoder::new(&stream[..], settings.compression_level as u32);
130                let mut buffer = Vec::new();
131                encoder.read_to_end(&mut buffer).unwrap();
132                *stream = buffer;
133            }
134        };
135
136        Ok(())
137    }
138
139    /// Decompress a stream of bytes in-place.
140    pub fn decompress(self, stream: &mut Vec<u8>) -> AvroResult<()> {
141        *stream = match self {
142            Codec::Null => return Ok(()),
143            Codec::Deflate(_settings) => miniz_oxide::inflate::decompress_to_vec(stream).map_err(|e| {
144                let err = {
145                    use miniz_oxide::inflate::TINFLStatus::*;
146                    use std::io::{Error,ErrorKind};
147                    match e.status {
148                        FailedCannotMakeProgress => Error::from(ErrorKind::UnexpectedEof),
149                        BadParam => Error::other("Unexpected error: miniz_oxide reported invalid output buffer size. Please report this to avro-rs developers."), // not possible for _to_vec()
150                        Adler32Mismatch => Error::from(ErrorKind::InvalidData),
151                        Failed => Error::from(ErrorKind::InvalidData),
152                        Done => Error::other("Unexpected error: miniz_oxide reported an error with a success status. Please report this to avro-rs developers."),
153                        NeedsMoreInput => Error::from(ErrorKind::UnexpectedEof),
154                        HasMoreOutput => Error::other("Unexpected error: miniz_oxide has more data than the output buffer can hold. Please report this to avro-rs developers."), // not possible for _to_vec()
155                    }
156                };
157                Error::DeflateDecompress(err)
158            })?,
159            #[cfg(feature = "snappy")]
160            Codec::Snappy => {
161                let decompressed_size = snap::raw::decompress_len(&stream[..stream.len() - 4])
162                    .map_err(Error::GetSnappyDecompressLen)?;
163                let mut decoded = vec![0; decompressed_size];
164                snap::raw::Decoder::new()
165                    .decompress(&stream[..stream.len() - 4], &mut decoded[..])
166                    .map_err(Error::SnappyDecompress)?;
167
168                let mut last_four: [u8; 4] = [0; 4];
169                last_four.copy_from_slice(&stream[(stream.len() - 4)..]);
170                let expected: u32 = u32::from_be_bytes(last_four);
171
172                let mut hasher = crc32fast::Hasher::new();
173                hasher.update(&decoded);
174                let actual = hasher.finalize();
175
176                if expected != actual {
177                    return Err(Error::SnappyCrc32 { expected, actual });
178                }
179                decoded
180            }
181            #[cfg(feature = "zstandard")]
182            Codec::Zstandard(_settings) => {
183                use std::io::BufReader;
184                use zstd::zstd_safe;
185
186                let mut decoded = Vec::new();
187                let buffer_size = zstd_safe::DCtx::in_size();
188                let buffer = BufReader::with_capacity(buffer_size, &stream[..]);
189                let mut decoder = zstd::Decoder::new(buffer).unwrap();
190                std::io::copy(&mut decoder, &mut decoded).map_err(Error::ZstdDecompress)?;
191                decoded
192            }
193            #[cfg(feature = "bzip")]
194            Codec::Bzip2(_) => {
195                use bzip2::read::BzDecoder;
196                use std::io::Read;
197
198                let mut decoder = BzDecoder::new(&stream[..]);
199                let mut decoded = Vec::new();
200                decoder.read_to_end(&mut decoded).unwrap();
201                decoded
202            }
203            #[cfg(feature = "xz")]
204            Codec::Xz(_) => {
205                use xz2::read::XzDecoder;
206                use std::io::Read;
207
208                let mut decoder = XzDecoder::new(&stream[..]);
209                let mut decoded: Vec<u8> = Vec::new();
210                decoder.read_to_end(&mut decoded).unwrap();
211                decoded
212            }
213        };
214        Ok(())
215    }
216}
217
218#[cfg(feature = "bzip")]
219pub mod bzip {
220    use bzip2::Compression;
221
222    #[derive(Clone, Copy, Eq, PartialEq, Debug)]
223    pub struct Bzip2Settings {
224        pub compression_level: u8,
225    }
226
227    impl Bzip2Settings {
228        pub fn new(compression_level: u8) -> Self {
229            Self { compression_level }
230        }
231
232        pub(crate) fn compression(&self) -> Compression {
233            Compression::new(self.compression_level as u32)
234        }
235    }
236
237    impl Default for Bzip2Settings {
238        fn default() -> Self {
239            Bzip2Settings::new(Compression::best().level() as u8)
240        }
241    }
242}
243
244#[cfg(feature = "zstandard")]
245pub mod zstandard {
246    #[derive(Clone, Copy, Eq, PartialEq, Debug)]
247    pub struct ZstandardSettings {
248        pub compression_level: u8,
249    }
250
251    impl ZstandardSettings {
252        pub fn new(compression_level: u8) -> Self {
253            Self { compression_level }
254        }
255    }
256
257    impl Default for ZstandardSettings {
258        fn default() -> Self {
259            Self::new(0)
260        }
261    }
262}
263
264#[cfg(feature = "xz")]
265pub mod xz {
266    #[derive(Clone, Copy, Eq, PartialEq, Debug)]
267    pub struct XzSettings {
268        pub compression_level: u8,
269    }
270
271    impl XzSettings {
272        pub fn new(compression_level: u8) -> Self {
273            Self { compression_level }
274        }
275    }
276
277    impl Default for XzSettings {
278        fn default() -> Self {
279            XzSettings::new(9)
280        }
281    }
282}
283
284#[cfg(test)]
285mod tests {
286    use super::*;
287    use apache_avro_test_helper::TestResult;
288    use miniz_oxide::deflate::CompressionLevel;
289    use pretty_assertions::{assert_eq, assert_ne};
290
291    const INPUT: &[u8] = b"theanswertolifetheuniverseandeverythingis42theanswertolifetheuniverseandeverythingis4theanswertolifetheuniverseandeverythingis2";
292
293    #[test]
294    fn null_compress_and_decompress() -> TestResult {
295        let codec = Codec::Null;
296        let mut stream = INPUT.to_vec();
297        codec.compress(&mut stream)?;
298        assert_eq!(INPUT, stream.as_slice());
299        codec.decompress(&mut stream)?;
300        assert_eq!(INPUT, stream.as_slice());
301        Ok(())
302    }
303
304    #[test]
305    fn deflate_compress_and_decompress() -> TestResult {
306        compress_and_decompress(Codec::Deflate(DeflateSettings::new(
307            CompressionLevel::BestCompression,
308        )))
309    }
310
311    #[cfg(feature = "snappy")]
312    #[test]
313    fn snappy_compress_and_decompress() -> TestResult {
314        compress_and_decompress(Codec::Snappy)
315    }
316
317    #[cfg(feature = "zstandard")]
318    #[test]
319    fn zstd_compress_and_decompress() -> TestResult {
320        compress_and_decompress(Codec::Zstandard(zstandard::ZstandardSettings::default()))
321    }
322
323    #[cfg(feature = "bzip")]
324    #[test]
325    fn bzip_compress_and_decompress() -> TestResult {
326        compress_and_decompress(Codec::Bzip2(bzip::Bzip2Settings::default()))
327    }
328
329    #[cfg(feature = "xz")]
330    #[test]
331    fn xz_compress_and_decompress() -> TestResult {
332        compress_and_decompress(Codec::Xz(xz::XzSettings::default()))
333    }
334
335    fn compress_and_decompress(codec: Codec) -> TestResult {
336        let mut stream = INPUT.to_vec();
337        codec.compress(&mut stream)?;
338        assert_ne!(INPUT, stream.as_slice());
339        assert!(INPUT.len() > stream.len());
340        codec.decompress(&mut stream)?;
341        assert_eq!(INPUT, stream.as_slice());
342        Ok(())
343    }
344
345    #[test]
346    fn codec_to_str() {
347        assert_eq!(<&str>::from(Codec::Null), "null");
348        assert_eq!(
349            <&str>::from(Codec::Deflate(DeflateSettings::default())),
350            "deflate"
351        );
352
353        #[cfg(feature = "snappy")]
354        assert_eq!(<&str>::from(Codec::Snappy), "snappy");
355
356        #[cfg(feature = "zstandard")]
357        assert_eq!(
358            <&str>::from(Codec::Zstandard(zstandard::ZstandardSettings::default())),
359            "zstandard"
360        );
361
362        #[cfg(feature = "bzip")]
363        assert_eq!(
364            <&str>::from(Codec::Bzip2(bzip::Bzip2Settings::default())),
365            "bzip2"
366        );
367
368        #[cfg(feature = "xz")]
369        assert_eq!(<&str>::from(Codec::Xz(xz::XzSettings::default())), "xz");
370    }
371
372    #[test]
373    fn codec_from_str() {
374        use std::str::FromStr;
375
376        assert_eq!(Codec::from_str("null").unwrap(), Codec::Null);
377        assert_eq!(
378            Codec::from_str("deflate").unwrap(),
379            Codec::Deflate(DeflateSettings::default())
380        );
381
382        #[cfg(feature = "snappy")]
383        assert_eq!(Codec::from_str("snappy").unwrap(), Codec::Snappy);
384
385        #[cfg(feature = "zstandard")]
386        assert_eq!(
387            Codec::from_str("zstandard").unwrap(),
388            Codec::Zstandard(zstandard::ZstandardSettings::default())
389        );
390
391        #[cfg(feature = "bzip")]
392        assert_eq!(
393            Codec::from_str("bzip2").unwrap(),
394            Codec::Bzip2(bzip::Bzip2Settings::default())
395        );
396
397        #[cfg(feature = "xz")]
398        assert_eq!(
399            Codec::from_str("xz").unwrap(),
400            Codec::Xz(xz::XzSettings::default())
401        );
402
403        assert!(Codec::from_str("not a codec").is_err());
404    }
405}