Skip to main content

apache_avro/
codec.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Logic for all supported compression codecs in Avro.
19use crate::{AvroResult, Error, error::Details, types::Value};
20use strum::{EnumIter, EnumString, IntoStaticStr};
21
22/// Settings for the `Deflate` codec.
23#[derive(Clone, Copy, Eq, PartialEq, Debug)]
24pub struct DeflateSettings {
25    compression_level: miniz_oxide::deflate::CompressionLevel,
26}
27
28impl DeflateSettings {
29    pub fn new(compression_level: miniz_oxide::deflate::CompressionLevel) -> Self {
30        DeflateSettings { compression_level }
31    }
32
33    /// Get the compression level as a `u8`, note that this means the [`miniz_oxide::deflate::CompressionLevel::DefaultCompression`] variant
34    /// will appear as `255`, this is normalized by the [`miniz_oxide`] crate later.
35    pub fn compression_level(&self) -> u8 {
36        self.compression_level as u8
37    }
38}
39
40impl Default for DeflateSettings {
41    /// Default compression level is `miniz_oxide::deflate::CompressionLevel::DefaultCompression`.
42    fn default() -> Self {
43        Self::new(miniz_oxide::deflate::CompressionLevel::DefaultCompression)
44    }
45}
46
47/// The compression codec used to compress blocks.
48#[derive(Clone, Copy, Debug, Eq, PartialEq, EnumIter, EnumString, IntoStaticStr)]
49#[strum(serialize_all = "kebab_case")]
50pub enum Codec {
51    /// The `Null` codec simply passes through data uncompressed.
52    Null,
53    /// The `Deflate` codec writes the data block using the deflate algorithm
54    /// as specified in RFC 1951, and typically implemented using the zlib library.
55    /// Note that this format (unlike the "zlib format" in RFC 1950) does not have a checksum.
56    Deflate(DeflateSettings),
57    #[cfg(feature = "snappy")]
58    /// The `Snappy` codec uses Google's [Snappy](http://google.github.io/snappy/)
59    /// compression library. Each compressed block is followed by the 4-byte, big-endian
60    /// CRC32 checksum of the uncompressed data in the block.
61    Snappy,
62    #[cfg(feature = "zstandard")]
63    /// The `Zstandard` codec uses Facebook's [Zstandard](https://facebook.github.io/zstd/)
64    Zstandard(zstandard::ZstandardSettings),
65    #[cfg(feature = "bzip")]
66    /// The `BZip2` codec uses [BZip2](https://sourceware.org/bzip2/)
67    /// compression library.
68    Bzip2(bzip::Bzip2Settings),
69    #[cfg(feature = "xz")]
70    /// The `Xz` codec uses [Xz utils](https://tukaani.org/xz/)
71    /// compression library.
72    Xz(xz::XzSettings),
73}
74
75impl From<Codec> for Value {
76    fn from(value: Codec) -> Self {
77        Self::Bytes(<&str>::from(value).as_bytes().to_vec())
78    }
79}
80
81impl Codec {
82    /// Compress a stream of bytes in-place.
83    pub fn compress(self, stream: &mut Vec<u8>) -> AvroResult<()> {
84        match self {
85            Codec::Null => (),
86            Codec::Deflate(settings) => {
87                let compressed =
88                    miniz_oxide::deflate::compress_to_vec(stream, settings.compression_level());
89                *stream = compressed;
90            }
91            #[cfg(feature = "snappy")]
92            Codec::Snappy => {
93                let mut encoded: Vec<u8> = vec![0; snap::raw::max_compress_len(stream.len())];
94                let compressed_size = snap::raw::Encoder::new()
95                    .compress(&stream[..], &mut encoded[..])
96                    .map_err(Details::SnappyCompress)?;
97
98                let mut hasher = crc32fast::Hasher::new();
99                hasher.update(&stream[..]);
100                let checksum = hasher.finalize();
101                let checksum_as_bytes = checksum.to_be_bytes();
102                let checksum_len = checksum_as_bytes.len();
103                encoded.truncate(compressed_size + checksum_len);
104                encoded[compressed_size..].copy_from_slice(&checksum_as_bytes);
105
106                *stream = encoded;
107            }
108            #[cfg(feature = "zstandard")]
109            Codec::Zstandard(settings) => {
110                use std::io::Write;
111                let mut encoder = zstd::Encoder::new(Vec::new(), settings.compression_level as i32)
112                    .map_err(Details::ZstdCompress)?;
113                encoder.write_all(stream).map_err(Details::ZstdCompress)?;
114                *stream = encoder.finish().map_err(Details::ZstdCompress)?;
115            }
116            #[cfg(feature = "bzip")]
117            Codec::Bzip2(settings) => {
118                use bzip2::read::BzEncoder;
119                use std::io::Read;
120
121                let mut encoder = BzEncoder::new(&stream[..], settings.compression());
122                let mut buffer = Vec::new();
123                encoder
124                    .read_to_end(&mut buffer)
125                    .unwrap_or_else(|_| unreachable!("No I/O errors possible with Vec<u8>"));
126                *stream = buffer;
127            }
128            #[cfg(feature = "xz")]
129            Codec::Xz(settings) => {
130                use liblzma::read::XzEncoder;
131                use std::io::Read;
132
133                let mut encoder = XzEncoder::new(&stream[..], settings.compression_level as u32);
134                let mut buffer = Vec::new();
135                encoder
136                    .read_to_end(&mut buffer)
137                    .unwrap_or_else(|_| unreachable!("No I/O errors possible with Vec<u8>"));
138                *stream = buffer;
139            }
140        };
141
142        Ok(())
143    }
144
145    /// Decompress a stream of bytes in-place.
146    pub fn decompress(self, stream: &mut Vec<u8>) -> AvroResult<()> {
147        *stream = match self {
148            Codec::Null => return Ok(()),
149            Codec::Deflate(_settings) => miniz_oxide::inflate::decompress_to_vec(stream).map_err(|e| {
150                let err = {
151                    use miniz_oxide::inflate::TINFLStatus::{FailedCannotMakeProgress, BadParam, Adler32Mismatch, Failed, Done, NeedsMoreInput, HasMoreOutput};
152                    use std::io::{Error,ErrorKind};
153                    match e.status {
154                        FailedCannotMakeProgress => Error::from(ErrorKind::UnexpectedEof),
155                        BadParam => Error::other("Unexpected error: miniz_oxide reported invalid output buffer size. Please report this to avro-rs developers."), // not possible for _to_vec()
156                        Adler32Mismatch => Error::from(ErrorKind::InvalidData),
157                        Failed => Error::from(ErrorKind::InvalidData),
158                        Done => Error::other("Unexpected error: miniz_oxide reported an error with a success status. Please report this to avro-rs developers."),
159                        NeedsMoreInput => Error::from(ErrorKind::UnexpectedEof),
160                        HasMoreOutput => Error::other("Unexpected error: miniz_oxide has more data than the output buffer can hold. Please report this to avro-rs developers."), // not possible for _to_vec()
161                        other => Error::other(format!("Unexpected error: {other:?}"))
162                    }
163                };
164                Error::new(Details::DeflateDecompress(err))
165            })?,
166            #[cfg(feature = "snappy")]
167            Codec::Snappy => {
168                let decompressed_size = snap::raw::decompress_len(&stream[..stream.len() - 4])
169                    .map_err(Details::GetSnappyDecompressLen)?;
170                let mut decoded = vec![0; decompressed_size];
171                snap::raw::Decoder::new()
172                    .decompress(&stream[..stream.len() - 4], &mut decoded[..])
173                    .map_err(Details::SnappyDecompress)?;
174
175                let mut last_four: [u8; 4] = [0; 4];
176                last_four.copy_from_slice(&stream[(stream.len() - 4)..]);
177                let expected: u32 = u32::from_be_bytes(last_four);
178
179                let mut hasher = crc32fast::Hasher::new();
180                hasher.update(&decoded);
181                let actual = hasher.finalize();
182
183                if expected != actual {
184                    return Err(Details::SnappyCrc32{expected, actual}.into());
185                }
186                decoded
187            }
188            #[cfg(feature = "zstandard")]
189            Codec::Zstandard(_settings) => {
190                use std::io::BufReader;
191                use zstd::zstd_safe;
192
193                let mut decoded = Vec::new();
194                let buffer_size = zstd_safe::DCtx::in_size();
195                let buffer = BufReader::with_capacity(buffer_size, &stream[..]);
196                let mut decoder = zstd::Decoder::new(buffer).map_err(Details::ZstdDecompress)?;
197                std::io::copy(&mut decoder, &mut decoded).map_err(Details::ZstdDecompress)?;
198                decoded
199            }
200            #[cfg(feature = "bzip")]
201            Codec::Bzip2(_) => {
202                use bzip2::read::BzDecoder;
203                use std::io::Read;
204
205                let mut decoder = BzDecoder::new(&stream[..]);
206                let mut decoded = Vec::new();
207                decoder.read_to_end(&mut decoded).unwrap_or_else(|_| unreachable!("No I/O errors possible with Vec<u8>"));
208                decoded
209            }
210            #[cfg(feature = "xz")]
211            Codec::Xz(_) => {
212                use liblzma::read::XzDecoder;
213                use std::io::Read;
214
215                let mut decoder = XzDecoder::new(&stream[..]);
216                let mut decoded: Vec<u8> = Vec::new();
217                decoder.read_to_end(&mut decoded).unwrap_or_else(|_| unreachable!("No I/O errors possible with Vec<u8>"));
218                decoded
219            }
220        };
221        Ok(())
222    }
223}
224
225#[cfg(feature = "bzip")]
226pub mod bzip {
227    use bzip2::Compression;
228
229    #[derive(Clone, Copy, Eq, PartialEq, Debug)]
230    pub struct Bzip2Settings {
231        pub compression_level: u8,
232    }
233
234    impl Bzip2Settings {
235        pub fn new(compression_level: u8) -> Self {
236            Self { compression_level }
237        }
238
239        pub(crate) fn compression(&self) -> Compression {
240            Compression::new(self.compression_level as u32)
241        }
242    }
243
244    impl Default for Bzip2Settings {
245        fn default() -> Self {
246            Bzip2Settings::new(Compression::best().level() as u8)
247        }
248    }
249}
250
251#[cfg(feature = "zstandard")]
252pub mod zstandard {
253    #[derive(Clone, Copy, Eq, PartialEq, Debug)]
254    pub struct ZstandardSettings {
255        pub compression_level: u8,
256    }
257
258    impl ZstandardSettings {
259        pub fn new(compression_level: u8) -> Self {
260            Self { compression_level }
261        }
262    }
263
264    impl Default for ZstandardSettings {
265        fn default() -> Self {
266            Self::new(0)
267        }
268    }
269}
270
271#[cfg(feature = "xz")]
272pub mod xz {
273    #[derive(Clone, Copy, Eq, PartialEq, Debug)]
274    pub struct XzSettings {
275        pub compression_level: u8,
276    }
277
278    impl XzSettings {
279        pub fn new(compression_level: u8) -> Self {
280            Self { compression_level }
281        }
282    }
283
284    impl Default for XzSettings {
285        fn default() -> Self {
286            XzSettings::new(9)
287        }
288    }
289}
290
291#[cfg(test)]
292mod tests {
293    use super::*;
294    use apache_avro_test_helper::TestResult;
295    use miniz_oxide::deflate::CompressionLevel;
296    use pretty_assertions::{assert_eq, assert_ne};
297
298    const INPUT: &[u8] = b"theanswertolifetheuniverseandeverythingis42theanswertolifetheuniverseandeverythingis4theanswertolifetheuniverseandeverythingis2";
299
300    #[test]
301    fn null_compress_and_decompress() -> TestResult {
302        let codec = Codec::Null;
303        let mut stream = INPUT.to_vec();
304        codec.compress(&mut stream)?;
305        assert_eq!(INPUT, stream.as_slice());
306        codec.decompress(&mut stream)?;
307        assert_eq!(INPUT, stream.as_slice());
308        Ok(())
309    }
310
311    #[test]
312    fn deflate_compress_and_decompress() -> TestResult {
313        compress_and_decompress(Codec::Deflate(DeflateSettings::new(
314            CompressionLevel::BestCompression,
315        )))
316    }
317
318    #[cfg(feature = "snappy")]
319    #[test]
320    fn snappy_compress_and_decompress() -> TestResult {
321        compress_and_decompress(Codec::Snappy)
322    }
323
324    #[cfg(feature = "zstandard")]
325    #[test]
326    fn zstd_compress_and_decompress() -> TestResult {
327        compress_and_decompress(Codec::Zstandard(zstandard::ZstandardSettings::default()))
328    }
329
330    #[cfg(feature = "bzip")]
331    #[test]
332    fn bzip_compress_and_decompress() -> TestResult {
333        compress_and_decompress(Codec::Bzip2(bzip::Bzip2Settings::default()))
334    }
335
336    #[cfg(feature = "xz")]
337    #[test]
338    fn xz_compress_and_decompress() -> TestResult {
339        compress_and_decompress(Codec::Xz(xz::XzSettings::default()))
340    }
341
342    fn compress_and_decompress(codec: Codec) -> TestResult {
343        let mut stream = INPUT.to_vec();
344        codec.compress(&mut stream)?;
345        assert_ne!(INPUT, stream.as_slice());
346        assert!(INPUT.len() > stream.len());
347        codec.decompress(&mut stream)?;
348        assert_eq!(INPUT, stream.as_slice());
349        Ok(())
350    }
351
352    #[test]
353    fn codec_to_str() {
354        assert_eq!(<&str>::from(Codec::Null), "null");
355        assert_eq!(
356            <&str>::from(Codec::Deflate(DeflateSettings::default())),
357            "deflate"
358        );
359
360        #[cfg(feature = "snappy")]
361        assert_eq!(<&str>::from(Codec::Snappy), "snappy");
362
363        #[cfg(feature = "zstandard")]
364        assert_eq!(
365            <&str>::from(Codec::Zstandard(zstandard::ZstandardSettings::default())),
366            "zstandard"
367        );
368
369        #[cfg(feature = "bzip")]
370        assert_eq!(
371            <&str>::from(Codec::Bzip2(bzip::Bzip2Settings::default())),
372            "bzip2"
373        );
374
375        #[cfg(feature = "xz")]
376        assert_eq!(<&str>::from(Codec::Xz(xz::XzSettings::default())), "xz");
377    }
378
379    #[test]
380    fn codec_from_str() {
381        use std::str::FromStr;
382
383        assert_eq!(Codec::from_str("null").unwrap(), Codec::Null);
384        assert_eq!(
385            Codec::from_str("deflate").unwrap(),
386            Codec::Deflate(DeflateSettings::default())
387        );
388
389        #[cfg(feature = "snappy")]
390        assert_eq!(Codec::from_str("snappy").unwrap(), Codec::Snappy);
391
392        #[cfg(feature = "zstandard")]
393        assert_eq!(
394            Codec::from_str("zstandard").unwrap(),
395            Codec::Zstandard(zstandard::ZstandardSettings::default())
396        );
397
398        #[cfg(feature = "bzip")]
399        assert_eq!(
400            Codec::from_str("bzip2").unwrap(),
401            Codec::Bzip2(bzip::Bzip2Settings::default())
402        );
403
404        #[cfg(feature = "xz")]
405        assert_eq!(
406            Codec::from_str("xz").unwrap(),
407            Codec::Xz(xz::XzSettings::default())
408        );
409
410        assert!(Codec::from_str("not a codec").is_err());
411    }
412}