apache_avro/
codec.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Logic for all supported compression codecs in Avro.
19use crate::{AvroResult, Error, error::Details, types::Value};
20use strum_macros::{EnumIter, EnumString, IntoStaticStr};
21
22/// Settings for the `Deflate` codec.
23#[derive(Clone, Copy, Eq, PartialEq, Debug)]
24pub struct DeflateSettings {
25    compression_level: miniz_oxide::deflate::CompressionLevel,
26}
27
28impl DeflateSettings {
29    pub fn new(compression_level: miniz_oxide::deflate::CompressionLevel) -> Self {
30        DeflateSettings { compression_level }
31    }
32
33    /// Get the compression level as a `u8`, note that this means the [`miniz_oxide::deflate::CompressionLevel::DefaultCompression`] variant
34    /// will appear as `255`, this is normalized by the [`miniz_oxide`] crate later.
35    pub fn compression_level(&self) -> u8 {
36        self.compression_level as u8
37    }
38}
39
40impl Default for DeflateSettings {
41    /// Default compression level is `miniz_oxide::deflate::CompressionLevel::DefaultCompression`.
42    fn default() -> Self {
43        Self::new(miniz_oxide::deflate::CompressionLevel::DefaultCompression)
44    }
45}
46
47/// The compression codec used to compress blocks.
48#[derive(Clone, Copy, Debug, Eq, PartialEq, EnumIter, EnumString, IntoStaticStr)]
49#[strum(serialize_all = "kebab_case")]
50pub enum Codec {
51    /// The `Null` codec simply passes through data uncompressed.
52    Null,
53    /// The `Deflate` codec writes the data block using the deflate algorithm
54    /// as specified in RFC 1951, and typically implemented using the zlib library.
55    /// Note that this format (unlike the "zlib format" in RFC 1950) does not have a checksum.
56    Deflate(DeflateSettings),
57    #[cfg(feature = "snappy")]
58    /// The `Snappy` codec uses Google's [Snappy](http://google.github.io/snappy/)
59    /// compression library. Each compressed block is followed by the 4-byte, big-endian
60    /// CRC32 checksum of the uncompressed data in the block.
61    Snappy,
62    #[cfg(feature = "zstandard")]
63    /// The `Zstandard` codec uses Facebook's [Zstandard](https://facebook.github.io/zstd/)
64    Zstandard(zstandard::ZstandardSettings),
65    #[cfg(feature = "bzip")]
66    /// The `BZip2` codec uses [BZip2](https://sourceware.org/bzip2/)
67    /// compression library.
68    Bzip2(bzip::Bzip2Settings),
69    #[cfg(feature = "xz")]
70    /// The `Xz` codec uses [Xz utils](https://tukaani.org/xz/)
71    /// compression library.
72    Xz(xz::XzSettings),
73}
74
75impl From<Codec> for Value {
76    fn from(value: Codec) -> Self {
77        Self::Bytes(<&str>::from(value).as_bytes().to_vec())
78    }
79}
80
81impl Codec {
82    /// Compress a stream of bytes in-place.
83    pub fn compress(self, stream: &mut Vec<u8>) -> AvroResult<()> {
84        match self {
85            Codec::Null => (),
86            Codec::Deflate(settings) => {
87                let compressed =
88                    miniz_oxide::deflate::compress_to_vec(stream, settings.compression_level());
89                *stream = compressed;
90            }
91            #[cfg(feature = "snappy")]
92            Codec::Snappy => {
93                let mut encoded: Vec<u8> = vec![0; snap::raw::max_compress_len(stream.len())];
94                let compressed_size = snap::raw::Encoder::new()
95                    .compress(&stream[..], &mut encoded[..])
96                    .map_err(Details::SnappyCompress)?;
97
98                let mut hasher = crc32fast::Hasher::new();
99                hasher.update(&stream[..]);
100                let checksum = hasher.finalize();
101                let checksum_as_bytes = checksum.to_be_bytes();
102                let checksum_len = checksum_as_bytes.len();
103                encoded.truncate(compressed_size + checksum_len);
104                encoded[compressed_size..].copy_from_slice(&checksum_as_bytes);
105
106                *stream = encoded;
107            }
108            #[cfg(feature = "zstandard")]
109            Codec::Zstandard(settings) => {
110                use std::io::Write;
111                let mut encoder =
112                    zstd::Encoder::new(Vec::new(), settings.compression_level as i32).unwrap();
113                encoder.write_all(stream).map_err(Details::ZstdCompress)?;
114                *stream = encoder.finish().unwrap();
115            }
116            #[cfg(feature = "bzip")]
117            Codec::Bzip2(settings) => {
118                use bzip2::read::BzEncoder;
119                use std::io::Read;
120
121                let mut encoder = BzEncoder::new(&stream[..], settings.compression());
122                let mut buffer = Vec::new();
123                encoder.read_to_end(&mut buffer).unwrap();
124                *stream = buffer;
125            }
126            #[cfg(feature = "xz")]
127            Codec::Xz(settings) => {
128                use liblzma::read::XzEncoder;
129                use std::io::Read;
130
131                let mut encoder = XzEncoder::new(&stream[..], settings.compression_level as u32);
132                let mut buffer = Vec::new();
133                encoder.read_to_end(&mut buffer).unwrap();
134                *stream = buffer;
135            }
136        };
137
138        Ok(())
139    }
140
141    /// Decompress a stream of bytes in-place.
142    pub fn decompress(self, stream: &mut Vec<u8>) -> AvroResult<()> {
143        *stream = match self {
144            Codec::Null => return Ok(()),
145            Codec::Deflate(_settings) => miniz_oxide::inflate::decompress_to_vec(stream).map_err(|e| {
146                let err = {
147                    use miniz_oxide::inflate::TINFLStatus::*;
148                    use std::io::{Error,ErrorKind};
149                    match e.status {
150                        FailedCannotMakeProgress => Error::from(ErrorKind::UnexpectedEof),
151                        BadParam => Error::other("Unexpected error: miniz_oxide reported invalid output buffer size. Please report this to avro-rs developers."), // not possible for _to_vec()
152                        Adler32Mismatch => Error::from(ErrorKind::InvalidData),
153                        Failed => Error::from(ErrorKind::InvalidData),
154                        Done => Error::other("Unexpected error: miniz_oxide reported an error with a success status. Please report this to avro-rs developers."),
155                        NeedsMoreInput => Error::from(ErrorKind::UnexpectedEof),
156                        HasMoreOutput => Error::other("Unexpected error: miniz_oxide has more data than the output buffer can hold. Please report this to avro-rs developers."), // not possible for _to_vec()
157                    }
158                };
159                Error::new(Details::DeflateDecompress(err))
160            })?,
161            #[cfg(feature = "snappy")]
162            Codec::Snappy => {
163                let decompressed_size = snap::raw::decompress_len(&stream[..stream.len() - 4])
164                    .map_err(Details::GetSnappyDecompressLen)?;
165                let mut decoded = vec![0; decompressed_size];
166                snap::raw::Decoder::new()
167                    .decompress(&stream[..stream.len() - 4], &mut decoded[..])
168                    .map_err(Details::SnappyDecompress)?;
169
170                let mut last_four: [u8; 4] = [0; 4];
171                last_four.copy_from_slice(&stream[(stream.len() - 4)..]);
172                let expected: u32 = u32::from_be_bytes(last_four);
173
174                let mut hasher = crc32fast::Hasher::new();
175                hasher.update(&decoded);
176                let actual = hasher.finalize();
177
178                if expected != actual {
179                    return Err(Details::SnappyCrc32{expected, actual}.into());
180                }
181                decoded
182            }
183            #[cfg(feature = "zstandard")]
184            Codec::Zstandard(_settings) => {
185                use std::io::BufReader;
186                use zstd::zstd_safe;
187
188                let mut decoded = Vec::new();
189                let buffer_size = zstd_safe::DCtx::in_size();
190                let buffer = BufReader::with_capacity(buffer_size, &stream[..]);
191                let mut decoder = zstd::Decoder::new(buffer).unwrap();
192                std::io::copy(&mut decoder, &mut decoded).map_err(Details::ZstdDecompress)?;
193                decoded
194            }
195            #[cfg(feature = "bzip")]
196            Codec::Bzip2(_) => {
197                use bzip2::read::BzDecoder;
198                use std::io::Read;
199
200                let mut decoder = BzDecoder::new(&stream[..]);
201                let mut decoded = Vec::new();
202                decoder.read_to_end(&mut decoded).unwrap();
203                decoded
204            }
205            #[cfg(feature = "xz")]
206            Codec::Xz(_) => {
207                use liblzma::read::XzDecoder;
208                use std::io::Read;
209
210                let mut decoder = XzDecoder::new(&stream[..]);
211                let mut decoded: Vec<u8> = Vec::new();
212                decoder.read_to_end(&mut decoded).unwrap();
213                decoded
214            }
215        };
216        Ok(())
217    }
218}
219
220#[cfg(feature = "bzip")]
221pub mod bzip {
222    use bzip2::Compression;
223
224    #[derive(Clone, Copy, Eq, PartialEq, Debug)]
225    pub struct Bzip2Settings {
226        pub compression_level: u8,
227    }
228
229    impl Bzip2Settings {
230        pub fn new(compression_level: u8) -> Self {
231            Self { compression_level }
232        }
233
234        pub(crate) fn compression(&self) -> Compression {
235            Compression::new(self.compression_level as u32)
236        }
237    }
238
239    impl Default for Bzip2Settings {
240        fn default() -> Self {
241            Bzip2Settings::new(Compression::best().level() as u8)
242        }
243    }
244}
245
246#[cfg(feature = "zstandard")]
247pub mod zstandard {
248    #[derive(Clone, Copy, Eq, PartialEq, Debug)]
249    pub struct ZstandardSettings {
250        pub compression_level: u8,
251    }
252
253    impl ZstandardSettings {
254        pub fn new(compression_level: u8) -> Self {
255            Self { compression_level }
256        }
257    }
258
259    impl Default for ZstandardSettings {
260        fn default() -> Self {
261            Self::new(0)
262        }
263    }
264}
265
266#[cfg(feature = "xz")]
267pub mod xz {
268    #[derive(Clone, Copy, Eq, PartialEq, Debug)]
269    pub struct XzSettings {
270        pub compression_level: u8,
271    }
272
273    impl XzSettings {
274        pub fn new(compression_level: u8) -> Self {
275            Self { compression_level }
276        }
277    }
278
279    impl Default for XzSettings {
280        fn default() -> Self {
281            XzSettings::new(9)
282        }
283    }
284}
285
286#[cfg(test)]
287mod tests {
288    use super::*;
289    use apache_avro_test_helper::TestResult;
290    use miniz_oxide::deflate::CompressionLevel;
291    use pretty_assertions::{assert_eq, assert_ne};
292
293    const INPUT: &[u8] = b"theanswertolifetheuniverseandeverythingis42theanswertolifetheuniverseandeverythingis4theanswertolifetheuniverseandeverythingis2";
294
295    #[test]
296    fn null_compress_and_decompress() -> TestResult {
297        let codec = Codec::Null;
298        let mut stream = INPUT.to_vec();
299        codec.compress(&mut stream)?;
300        assert_eq!(INPUT, stream.as_slice());
301        codec.decompress(&mut stream)?;
302        assert_eq!(INPUT, stream.as_slice());
303        Ok(())
304    }
305
306    #[test]
307    fn deflate_compress_and_decompress() -> TestResult {
308        compress_and_decompress(Codec::Deflate(DeflateSettings::new(
309            CompressionLevel::BestCompression,
310        )))
311    }
312
313    #[cfg(feature = "snappy")]
314    #[test]
315    fn snappy_compress_and_decompress() -> TestResult {
316        compress_and_decompress(Codec::Snappy)
317    }
318
319    #[cfg(feature = "zstandard")]
320    #[test]
321    fn zstd_compress_and_decompress() -> TestResult {
322        compress_and_decompress(Codec::Zstandard(zstandard::ZstandardSettings::default()))
323    }
324
325    #[cfg(feature = "bzip")]
326    #[test]
327    fn bzip_compress_and_decompress() -> TestResult {
328        compress_and_decompress(Codec::Bzip2(bzip::Bzip2Settings::default()))
329    }
330
331    #[cfg(feature = "xz")]
332    #[test]
333    fn xz_compress_and_decompress() -> TestResult {
334        compress_and_decompress(Codec::Xz(xz::XzSettings::default()))
335    }
336
337    fn compress_and_decompress(codec: Codec) -> TestResult {
338        let mut stream = INPUT.to_vec();
339        codec.compress(&mut stream)?;
340        assert_ne!(INPUT, stream.as_slice());
341        assert!(INPUT.len() > stream.len());
342        codec.decompress(&mut stream)?;
343        assert_eq!(INPUT, stream.as_slice());
344        Ok(())
345    }
346
347    #[test]
348    fn codec_to_str() {
349        assert_eq!(<&str>::from(Codec::Null), "null");
350        assert_eq!(
351            <&str>::from(Codec::Deflate(DeflateSettings::default())),
352            "deflate"
353        );
354
355        #[cfg(feature = "snappy")]
356        assert_eq!(<&str>::from(Codec::Snappy), "snappy");
357
358        #[cfg(feature = "zstandard")]
359        assert_eq!(
360            <&str>::from(Codec::Zstandard(zstandard::ZstandardSettings::default())),
361            "zstandard"
362        );
363
364        #[cfg(feature = "bzip")]
365        assert_eq!(
366            <&str>::from(Codec::Bzip2(bzip::Bzip2Settings::default())),
367            "bzip2"
368        );
369
370        #[cfg(feature = "xz")]
371        assert_eq!(<&str>::from(Codec::Xz(xz::XzSettings::default())), "xz");
372    }
373
374    #[test]
375    fn codec_from_str() {
376        use std::str::FromStr;
377
378        assert_eq!(Codec::from_str("null").unwrap(), Codec::Null);
379        assert_eq!(
380            Codec::from_str("deflate").unwrap(),
381            Codec::Deflate(DeflateSettings::default())
382        );
383
384        #[cfg(feature = "snappy")]
385        assert_eq!(Codec::from_str("snappy").unwrap(), Codec::Snappy);
386
387        #[cfg(feature = "zstandard")]
388        assert_eq!(
389            Codec::from_str("zstandard").unwrap(),
390            Codec::Zstandard(zstandard::ZstandardSettings::default())
391        );
392
393        #[cfg(feature = "bzip")]
394        assert_eq!(
395            Codec::from_str("bzip2").unwrap(),
396            Codec::Bzip2(bzip::Bzip2Settings::default())
397        );
398
399        #[cfg(feature = "xz")]
400        assert_eq!(
401            Codec::from_str("xz").unwrap(),
402            Codec::Xz(xz::XzSettings::default())
403        );
404
405        assert!(Codec::from_str("not a codec").is_err());
406    }
407}