apache_avro/
util.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::{schema::Documentation, AvroResult, Error};
19use serde_json::{Map, Value};
20use std::{
21    io::{Read, Write},
22    sync::{
23        atomic::{AtomicBool, AtomicUsize, Ordering},
24        Once,
25    },
26};
27
28/// Maximum number of bytes that can be allocated when decoding
29/// Avro-encoded values. This is a protection against ill-formed
30/// data, whose length field might be interpreted as enormous.
31/// See max_allocation_bytes to change this limit.
32pub const DEFAULT_MAX_ALLOCATION_BYTES: usize = 512 * 1024 * 1024;
33static MAX_ALLOCATION_BYTES: AtomicUsize = AtomicUsize::new(DEFAULT_MAX_ALLOCATION_BYTES);
34static MAX_ALLOCATION_BYTES_ONCE: Once = Once::new();
35
36/// Whether to set serialization & deserialization traits
37/// as `human_readable` or not.
38/// See [set_serde_human_readable] to change this value.
39// crate-visible for testing
40pub(crate) static SERDE_HUMAN_READABLE: AtomicBool = AtomicBool::new(true);
41static SERDE_HUMAN_READABLE_ONCE: Once = Once::new();
42
43pub trait MapHelper {
44    fn string(&self, key: &str) -> Option<String>;
45
46    fn name(&self) -> Option<String> {
47        self.string("name")
48    }
49
50    fn doc(&self) -> Documentation {
51        self.string("doc")
52    }
53
54    fn aliases(&self) -> Option<Vec<String>>;
55}
56
57impl MapHelper for Map<String, Value> {
58    fn string(&self, key: &str) -> Option<String> {
59        self.get(key)
60            .and_then(|v| v.as_str())
61            .map(|v| v.to_string())
62    }
63
64    fn aliases(&self) -> Option<Vec<String>> {
65        // FIXME no warning when aliases aren't a json array of json strings
66        self.get("aliases")
67            .and_then(|aliases| aliases.as_array())
68            .and_then(|aliases| {
69                aliases
70                    .iter()
71                    .map(|alias| alias.as_str())
72                    .map(|alias| alias.map(|a| a.to_string()))
73                    .collect::<Option<_>>()
74            })
75    }
76}
77
78pub fn read_long<R: Read>(reader: &mut R) -> AvroResult<i64> {
79    zag_i64(reader)
80}
81
82pub fn zig_i32<W: Write>(n: i32, buffer: W) -> AvroResult<usize> {
83    zig_i64(n as i64, buffer)
84}
85
86pub fn zig_i64<W: Write>(n: i64, writer: W) -> AvroResult<usize> {
87    encode_variable(((n << 1) ^ (n >> 63)) as u64, writer)
88}
89
90pub fn zag_i32<R: Read>(reader: &mut R) -> AvroResult<i32> {
91    let i = zag_i64(reader)?;
92    i32::try_from(i).map_err(|e| Error::ZagI32(e, i))
93}
94
95pub fn zag_i64<R: Read>(reader: &mut R) -> AvroResult<i64> {
96    let z = decode_variable(reader)?;
97    Ok(if z & 0x1 == 0 {
98        (z >> 1) as i64
99    } else {
100        !(z >> 1) as i64
101    })
102}
103
104fn encode_variable<W: Write>(mut z: u64, mut writer: W) -> AvroResult<usize> {
105    let mut buffer = [0u8; 10];
106    let mut i: usize = 0;
107    loop {
108        if z <= 0x7F {
109            buffer[i] = (z & 0x7F) as u8;
110            i += 1;
111            break;
112        } else {
113            buffer[i] = (0x80 | (z & 0x7F)) as u8;
114            i += 1;
115            z >>= 7;
116        }
117    }
118    writer.write(&buffer[..i]).map_err(Error::WriteBytes)
119}
120
121fn decode_variable<R: Read>(reader: &mut R) -> AvroResult<u64> {
122    let mut i = 0u64;
123    let mut buf = [0u8; 1];
124
125    let mut j = 0;
126    loop {
127        if j > 9 {
128            // if j * 7 > 64
129            return Err(Error::IntegerOverflow);
130        }
131        reader
132            .read_exact(&mut buf[..])
133            .map_err(Error::ReadVariableIntegerBytes)?;
134        i |= (u64::from(buf[0] & 0x7F)) << (j * 7);
135        if (buf[0] >> 7) == 0 {
136            break;
137        } else {
138            j += 1;
139        }
140    }
141
142    Ok(i)
143}
144
145/// Set a new maximum number of bytes that can be allocated when decoding data.
146/// Once called, the limit cannot be changed.
147///
148/// **NOTE** This function must be called before decoding **any** data. The
149/// library leverages [`std::sync::Once`](https://doc.rust-lang.org/std/sync/struct.Once.html)
150/// to set the limit either when calling this method, or when decoding for
151/// the first time.
152pub fn max_allocation_bytes(num_bytes: usize) -> usize {
153    MAX_ALLOCATION_BYTES_ONCE.call_once(|| {
154        MAX_ALLOCATION_BYTES.store(num_bytes, Ordering::Release);
155    });
156    MAX_ALLOCATION_BYTES.load(Ordering::Acquire)
157}
158
159pub fn safe_len(len: usize) -> AvroResult<usize> {
160    let max_bytes = max_allocation_bytes(DEFAULT_MAX_ALLOCATION_BYTES);
161
162    if len <= max_bytes {
163        Ok(len)
164    } else {
165        Err(Error::MemoryAllocation {
166            desired: len,
167            maximum: max_bytes,
168        })
169    }
170}
171
172/// Set whether serializing/deserializing is marked as human readable in serde traits.
173/// This will adjust the return value of `is_human_readable()` for both.
174/// Once called, the value cannot be changed.
175///
176/// **NOTE** This function must be called before serializing/deserializing **any** data. The
177/// library leverages [`std::sync::Once`](https://doc.rust-lang.org/std/sync/struct.Once.html)
178/// to set the limit either when calling this method, or when decoding for
179/// the first time.
180pub fn set_serde_human_readable(human_readable: bool) {
181    SERDE_HUMAN_READABLE_ONCE.call_once(|| {
182        SERDE_HUMAN_READABLE.store(human_readable, Ordering::Release);
183    });
184}
185
186pub(crate) fn is_human_readable() -> bool {
187    SERDE_HUMAN_READABLE.load(Ordering::Acquire)
188}
189
190#[cfg(test)]
191mod tests {
192    use super::*;
193    use apache_avro_test_helper::TestResult;
194    use pretty_assertions::assert_eq;
195
196    #[test]
197    fn test_zigzag() {
198        let mut a = Vec::new();
199        let mut b = Vec::new();
200        zig_i32(42i32, &mut a).unwrap();
201        zig_i64(42i64, &mut b).unwrap();
202        assert_eq!(a, b);
203    }
204
205    #[test]
206    fn test_zig_i64() {
207        let mut s = Vec::new();
208
209        zig_i64(0, &mut s).unwrap();
210        assert_eq!(s, [0]);
211
212        s.clear();
213        zig_i64(-1, &mut s).unwrap();
214        assert_eq!(s, [1]);
215
216        s.clear();
217        zig_i64(1, &mut s).unwrap();
218        assert_eq!(s, [2]);
219
220        s.clear();
221        zig_i64(-64, &mut s).unwrap();
222        assert_eq!(s, [127]);
223
224        s.clear();
225        zig_i64(64, &mut s).unwrap();
226        assert_eq!(s, [128, 1]);
227
228        s.clear();
229        zig_i64(i32::MAX as i64, &mut s).unwrap();
230        assert_eq!(s, [254, 255, 255, 255, 15]);
231
232        s.clear();
233        zig_i64(i32::MAX as i64 + 1, &mut s).unwrap();
234        assert_eq!(s, [128, 128, 128, 128, 16]);
235
236        s.clear();
237        zig_i64(i32::MIN as i64, &mut s).unwrap();
238        assert_eq!(s, [255, 255, 255, 255, 15]);
239
240        s.clear();
241        zig_i64(i32::MIN as i64 - 1, &mut s).unwrap();
242        assert_eq!(s, [129, 128, 128, 128, 16]);
243
244        s.clear();
245        zig_i64(i64::MAX, &mut s).unwrap();
246        assert_eq!(s, [254, 255, 255, 255, 255, 255, 255, 255, 255, 1]);
247
248        s.clear();
249        zig_i64(i64::MIN, &mut s).unwrap();
250        assert_eq!(s, [255, 255, 255, 255, 255, 255, 255, 255, 255, 1]);
251    }
252
253    #[test]
254    fn test_zig_i32() {
255        let mut s = Vec::new();
256        zig_i32(i32::MAX / 2, &mut s).unwrap();
257        assert_eq!(s, [254, 255, 255, 255, 7]);
258
259        s.clear();
260        zig_i32(i32::MIN / 2, &mut s).unwrap();
261        assert_eq!(s, [255, 255, 255, 255, 7]);
262
263        s.clear();
264        zig_i32(-(i32::MIN / 2), &mut s).unwrap();
265        assert_eq!(s, [128, 128, 128, 128, 8]);
266
267        s.clear();
268        zig_i32(i32::MIN / 2 - 1, &mut s).unwrap();
269        assert_eq!(s, [129, 128, 128, 128, 8]);
270
271        s.clear();
272        zig_i32(i32::MAX, &mut s).unwrap();
273        assert_eq!(s, [254, 255, 255, 255, 15]);
274
275        s.clear();
276        zig_i32(i32::MIN, &mut s).unwrap();
277        assert_eq!(s, [255, 255, 255, 255, 15]);
278    }
279
280    #[test]
281    fn test_overflow() {
282        let causes_left_shift_overflow: &[u8] = &[0xe1, 0xe1, 0xe1, 0xe1, 0xe1];
283        assert!(decode_variable(&mut &*causes_left_shift_overflow).is_err());
284    }
285
286    #[test]
287    fn test_safe_len() -> TestResult {
288        assert_eq!(42usize, safe_len(42usize)?);
289        assert!(safe_len(1024 * 1024 * 1024).is_err());
290
291        Ok(())
292    }
293}