1use crate::{schema::Documentation, AvroResult, Error};
19use serde_json::{Map, Value};
20use std::{
21 io::{Read, Write},
22 sync::{
23 atomic::{AtomicBool, AtomicUsize, Ordering},
24 Once,
25 },
26};
27
28pub const DEFAULT_MAX_ALLOCATION_BYTES: usize = 512 * 1024 * 1024;
33static MAX_ALLOCATION_BYTES: AtomicUsize = AtomicUsize::new(DEFAULT_MAX_ALLOCATION_BYTES);
34static MAX_ALLOCATION_BYTES_ONCE: Once = Once::new();
35
36pub(crate) static SERDE_HUMAN_READABLE: AtomicBool = AtomicBool::new(true);
41static SERDE_HUMAN_READABLE_ONCE: Once = Once::new();
42
43pub trait MapHelper {
44 fn string(&self, key: &str) -> Option<String>;
45
46 fn name(&self) -> Option<String> {
47 self.string("name")
48 }
49
50 fn doc(&self) -> Documentation {
51 self.string("doc")
52 }
53
54 fn aliases(&self) -> Option<Vec<String>>;
55}
56
57impl MapHelper for Map<String, Value> {
58 fn string(&self, key: &str) -> Option<String> {
59 self.get(key)
60 .and_then(|v| v.as_str())
61 .map(|v| v.to_string())
62 }
63
64 fn aliases(&self) -> Option<Vec<String>> {
65 self.get("aliases")
67 .and_then(|aliases| aliases.as_array())
68 .and_then(|aliases| {
69 aliases
70 .iter()
71 .map(|alias| alias.as_str())
72 .map(|alias| alias.map(|a| a.to_string()))
73 .collect::<Option<_>>()
74 })
75 }
76}
77
78pub fn read_long<R: Read>(reader: &mut R) -> AvroResult<i64> {
79 zag_i64(reader)
80}
81
82pub fn zig_i32<W: Write>(n: i32, buffer: W) -> AvroResult<usize> {
83 zig_i64(n as i64, buffer)
84}
85
86pub fn zig_i64<W: Write>(n: i64, writer: W) -> AvroResult<usize> {
87 encode_variable(((n << 1) ^ (n >> 63)) as u64, writer)
88}
89
90pub fn zag_i32<R: Read>(reader: &mut R) -> AvroResult<i32> {
91 let i = zag_i64(reader)?;
92 i32::try_from(i).map_err(|e| Error::ZagI32(e, i))
93}
94
95pub fn zag_i64<R: Read>(reader: &mut R) -> AvroResult<i64> {
96 let z = decode_variable(reader)?;
97 Ok(if z & 0x1 == 0 {
98 (z >> 1) as i64
99 } else {
100 !(z >> 1) as i64
101 })
102}
103
104fn encode_variable<W: Write>(mut z: u64, mut writer: W) -> AvroResult<usize> {
105 let mut buffer = [0u8; 10];
106 let mut i: usize = 0;
107 loop {
108 if z <= 0x7F {
109 buffer[i] = (z & 0x7F) as u8;
110 i += 1;
111 break;
112 } else {
113 buffer[i] = (0x80 | (z & 0x7F)) as u8;
114 i += 1;
115 z >>= 7;
116 }
117 }
118 writer.write(&buffer[..i]).map_err(Error::WriteBytes)
119}
120
121fn decode_variable<R: Read>(reader: &mut R) -> AvroResult<u64> {
122 let mut i = 0u64;
123 let mut buf = [0u8; 1];
124
125 let mut j = 0;
126 loop {
127 if j > 9 {
128 return Err(Error::IntegerOverflow);
130 }
131 reader
132 .read_exact(&mut buf[..])
133 .map_err(Error::ReadVariableIntegerBytes)?;
134 i |= (u64::from(buf[0] & 0x7F)) << (j * 7);
135 if (buf[0] >> 7) == 0 {
136 break;
137 } else {
138 j += 1;
139 }
140 }
141
142 Ok(i)
143}
144
145pub fn max_allocation_bytes(num_bytes: usize) -> usize {
153 MAX_ALLOCATION_BYTES_ONCE.call_once(|| {
154 MAX_ALLOCATION_BYTES.store(num_bytes, Ordering::Release);
155 });
156 MAX_ALLOCATION_BYTES.load(Ordering::Acquire)
157}
158
159pub fn safe_len(len: usize) -> AvroResult<usize> {
160 let max_bytes = max_allocation_bytes(DEFAULT_MAX_ALLOCATION_BYTES);
161
162 if len <= max_bytes {
163 Ok(len)
164 } else {
165 Err(Error::MemoryAllocation {
166 desired: len,
167 maximum: max_bytes,
168 })
169 }
170}
171
172pub fn set_serde_human_readable(human_readable: bool) {
181 SERDE_HUMAN_READABLE_ONCE.call_once(|| {
182 SERDE_HUMAN_READABLE.store(human_readable, Ordering::Release);
183 });
184}
185
186pub(crate) fn is_human_readable() -> bool {
187 SERDE_HUMAN_READABLE.load(Ordering::Acquire)
188}
189
190#[cfg(test)]
191mod tests {
192 use super::*;
193 use apache_avro_test_helper::TestResult;
194 use pretty_assertions::assert_eq;
195
196 #[test]
197 fn test_zigzag() {
198 let mut a = Vec::new();
199 let mut b = Vec::new();
200 zig_i32(42i32, &mut a).unwrap();
201 zig_i64(42i64, &mut b).unwrap();
202 assert_eq!(a, b);
203 }
204
205 #[test]
206 fn test_zig_i64() {
207 let mut s = Vec::new();
208
209 zig_i64(0, &mut s).unwrap();
210 assert_eq!(s, [0]);
211
212 s.clear();
213 zig_i64(-1, &mut s).unwrap();
214 assert_eq!(s, [1]);
215
216 s.clear();
217 zig_i64(1, &mut s).unwrap();
218 assert_eq!(s, [2]);
219
220 s.clear();
221 zig_i64(-64, &mut s).unwrap();
222 assert_eq!(s, [127]);
223
224 s.clear();
225 zig_i64(64, &mut s).unwrap();
226 assert_eq!(s, [128, 1]);
227
228 s.clear();
229 zig_i64(i32::MAX as i64, &mut s).unwrap();
230 assert_eq!(s, [254, 255, 255, 255, 15]);
231
232 s.clear();
233 zig_i64(i32::MAX as i64 + 1, &mut s).unwrap();
234 assert_eq!(s, [128, 128, 128, 128, 16]);
235
236 s.clear();
237 zig_i64(i32::MIN as i64, &mut s).unwrap();
238 assert_eq!(s, [255, 255, 255, 255, 15]);
239
240 s.clear();
241 zig_i64(i32::MIN as i64 - 1, &mut s).unwrap();
242 assert_eq!(s, [129, 128, 128, 128, 16]);
243
244 s.clear();
245 zig_i64(i64::MAX, &mut s).unwrap();
246 assert_eq!(s, [254, 255, 255, 255, 255, 255, 255, 255, 255, 1]);
247
248 s.clear();
249 zig_i64(i64::MIN, &mut s).unwrap();
250 assert_eq!(s, [255, 255, 255, 255, 255, 255, 255, 255, 255, 1]);
251 }
252
253 #[test]
254 fn test_zig_i32() {
255 let mut s = Vec::new();
256 zig_i32(i32::MAX / 2, &mut s).unwrap();
257 assert_eq!(s, [254, 255, 255, 255, 7]);
258
259 s.clear();
260 zig_i32(i32::MIN / 2, &mut s).unwrap();
261 assert_eq!(s, [255, 255, 255, 255, 7]);
262
263 s.clear();
264 zig_i32(-(i32::MIN / 2), &mut s).unwrap();
265 assert_eq!(s, [128, 128, 128, 128, 8]);
266
267 s.clear();
268 zig_i32(i32::MIN / 2 - 1, &mut s).unwrap();
269 assert_eq!(s, [129, 128, 128, 128, 8]);
270
271 s.clear();
272 zig_i32(i32::MAX, &mut s).unwrap();
273 assert_eq!(s, [254, 255, 255, 255, 15]);
274
275 s.clear();
276 zig_i32(i32::MIN, &mut s).unwrap();
277 assert_eq!(s, [255, 255, 255, 255, 15]);
278 }
279
280 #[test]
281 fn test_overflow() {
282 let causes_left_shift_overflow: &[u8] = &[0xe1, 0xe1, 0xe1, 0xe1, 0xe1];
283 assert!(decode_variable(&mut &*causes_left_shift_overflow).is_err());
284 }
285
286 #[test]
287 fn test_safe_len() -> TestResult {
288 assert_eq!(42usize, safe_len(42usize)?);
289 assert!(safe_len(1024 * 1024 * 1024).is_err());
290
291 Ok(())
292 }
293}