1use crate::{AvroResult, error::Details, schema::Documentation};
21use serde_json::{Map, Value};
22use std::{
23 io::{Read, Write},
24 sync::OnceLock,
25};
26
27pub const DEFAULT_MAX_ALLOCATION_BYTES: usize = 512 * 1024 * 1024;
32static MAX_ALLOCATION_BYTES: OnceLock<usize> = OnceLock::new();
33
34pub(crate) static SERDE_HUMAN_READABLE: OnceLock<bool> = OnceLock::new();
39pub const DEFAULT_SERDE_HUMAN_READABLE: bool = false;
41
42pub(crate) trait MapHelper {
43 fn string(&self, key: &str) -> Option<String>;
44
45 fn name(&self) -> Option<String> {
46 self.string("name")
47 }
48
49 fn doc(&self) -> Documentation {
50 self.string("doc")
51 }
52
53 fn aliases(&self) -> Option<Vec<String>>;
54}
55
56impl MapHelper for Map<String, Value> {
57 fn string(&self, key: &str) -> Option<String> {
58 self.get(key)
59 .and_then(|v| v.as_str())
60 .map(|v| v.to_string())
61 }
62
63 fn aliases(&self) -> Option<Vec<String>> {
64 self.get("aliases")
66 .and_then(|aliases| aliases.as_array())
67 .and_then(|aliases| {
68 aliases
69 .iter()
70 .map(|alias| alias.as_str())
71 .map(|alias| alias.map(|a| a.to_string()))
72 .collect::<Option<_>>()
73 })
74 }
75}
76
77pub(crate) fn read_long<R: Read>(reader: &mut R) -> AvroResult<i64> {
78 zag_i64(reader)
79}
80
81pub(crate) fn zig_i32<W: Write>(n: i32, buffer: W) -> AvroResult<usize> {
83 zig_i64(n as i64, buffer)
84}
85
86pub(crate) fn zig_i64<W: Write>(n: i64, writer: W) -> AvroResult<usize> {
88 let zigzagged = ((n << 1) ^ (n >> 63)) as u64;
89 encode_variable(zigzagged, writer)
90}
91
92pub(crate) fn zag_i32<R: Read>(reader: &mut R) -> AvroResult<i32> {
94 let i = zag_i64(reader)?;
95 i32::try_from(i).map_err(|e| Details::ZagI32(e, i).into())
96}
97
98pub(crate) fn zag_i64<R: Read>(reader: &mut R) -> AvroResult<i64> {
100 let z = decode_variable(reader)?;
101 Ok(if z & 0x1 == 0 {
102 (z >> 1) as i64
103 } else {
104 !(z >> 1) as i64
105 })
106}
107
108fn encode_variable<W: Write>(mut zigzagged: u64, mut writer: W) -> AvroResult<usize> {
112 zigzagged = zigzagged.to_le();
114 let mut buffer = [0u8; 10];
116 let mut i: usize = 0;
117 loop {
118 if zigzagged <= 0x7F {
119 buffer[i] = (zigzagged & 0x7F) as u8;
120 i += 1;
121 break;
122 } else {
123 buffer[i] = (0x80 | (zigzagged & 0x7F)) as u8;
124 i += 1;
125 zigzagged >>= 7;
126 }
127 }
128 writer
129 .write(&buffer[..i])
130 .map_err(|e| Details::WriteBytes(e).into())
131}
132
133fn decode_variable<R: Read>(reader: &mut R) -> AvroResult<u64> {
137 let mut i = 0u64;
138 let mut buf = [0u8; 1];
139
140 let mut j = 0;
141 loop {
142 if j > 9 {
143 return Err(Details::IntegerOverflow.into());
145 }
146 reader
147 .read_exact(&mut buf[..])
148 .map_err(Details::ReadVariableIntegerBytes)?;
149 i |= (u64::from(buf[0] & 0x7F)) << (j * 7);
150 if (buf[0] >> 7) == 0 {
151 break;
152 } else {
153 j += 1;
154 }
155 }
156
157 Ok(u64::from_le(i))
158}
159
160pub fn max_allocation_bytes(num_bytes: usize) -> usize {
170 *MAX_ALLOCATION_BYTES.get_or_init(|| num_bytes)
171}
172
173pub(crate) fn safe_len(len: usize) -> AvroResult<usize> {
174 let max_bytes = max_allocation_bytes(DEFAULT_MAX_ALLOCATION_BYTES);
175
176 if len <= max_bytes {
177 Ok(len)
178 } else {
179 Err(Details::MemoryAllocation {
180 desired: len,
181 maximum: max_bytes,
182 }
183 .into())
184 }
185}
186
187pub fn set_serde_human_readable(human_readable: bool) -> bool {
200 *SERDE_HUMAN_READABLE.get_or_init(|| human_readable)
201}
202
203pub(crate) fn is_human_readable() -> bool {
204 *SERDE_HUMAN_READABLE.get_or_init(|| DEFAULT_SERDE_HUMAN_READABLE)
205}
206
207#[cfg(test)]
208mod tests {
209 use super::*;
210 use apache_avro_test_helper::TestResult;
211 use pretty_assertions::assert_eq;
212
213 #[test]
214 fn test_zigzag() {
215 let mut a = Vec::new();
216 let mut b = Vec::new();
217 zig_i32(42i32, &mut a).unwrap();
218 zig_i64(42i64, &mut b).unwrap();
219 assert_eq!(a, b);
220 }
221
222 #[test]
223 fn test_zig_i64() {
224 let mut s = Vec::new();
225
226 zig_i64(0, &mut s).unwrap();
227 assert_eq!(s, [0]);
228
229 s.clear();
230 zig_i64(-1, &mut s).unwrap();
231 assert_eq!(s, [1]);
232
233 s.clear();
234 zig_i64(1, &mut s).unwrap();
235 assert_eq!(s, [2]);
236
237 s.clear();
238 zig_i64(-64, &mut s).unwrap();
239 assert_eq!(s, [127]);
240
241 s.clear();
242 zig_i64(64, &mut s).unwrap();
243 assert_eq!(s, [128, 1]);
244
245 s.clear();
246 zig_i64(i32::MAX as i64, &mut s).unwrap();
247 assert_eq!(s, [254, 255, 255, 255, 15]);
248
249 s.clear();
250 zig_i64(i32::MAX as i64 + 1, &mut s).unwrap();
251 assert_eq!(s, [128, 128, 128, 128, 16]);
252
253 s.clear();
254 zig_i64(i32::MIN as i64, &mut s).unwrap();
255 assert_eq!(s, [255, 255, 255, 255, 15]);
256
257 s.clear();
258 zig_i64(i32::MIN as i64 - 1, &mut s).unwrap();
259 assert_eq!(s, [129, 128, 128, 128, 16]);
260
261 s.clear();
262 zig_i64(i64::MAX, &mut s).unwrap();
263 assert_eq!(s, [254, 255, 255, 255, 255, 255, 255, 255, 255, 1]);
264
265 s.clear();
266 zig_i64(i64::MIN, &mut s).unwrap();
267 assert_eq!(s, [255, 255, 255, 255, 255, 255, 255, 255, 255, 1]);
268 }
269
270 #[test]
271 fn test_zig_i32() {
272 let mut s = Vec::new();
273 zig_i32(i32::MAX / 2, &mut s).unwrap();
274 assert_eq!(s, [254, 255, 255, 255, 7]);
275
276 s.clear();
277 zig_i32(i32::MIN / 2, &mut s).unwrap();
278 assert_eq!(s, [255, 255, 255, 255, 7]);
279
280 s.clear();
281 zig_i32(-(i32::MIN / 2), &mut s).unwrap();
282 assert_eq!(s, [128, 128, 128, 128, 8]);
283
284 s.clear();
285 zig_i32(i32::MIN / 2 - 1, &mut s).unwrap();
286 assert_eq!(s, [129, 128, 128, 128, 8]);
287
288 s.clear();
289 zig_i32(i32::MAX, &mut s).unwrap();
290 assert_eq!(s, [254, 255, 255, 255, 15]);
291
292 s.clear();
293 zig_i32(i32::MIN, &mut s).unwrap();
294 assert_eq!(s, [255, 255, 255, 255, 15]);
295 }
296
297 #[test]
298 fn test_overflow() {
299 let causes_left_shift_overflow: &[u8] = &[0xe1, 0xe1, 0xe1, 0xe1, 0xe1];
300 assert!(decode_variable(&mut &*causes_left_shift_overflow).is_err());
301 }
302
303 #[test]
304 fn test_safe_len() -> TestResult {
305 assert_eq!(42usize, safe_len(42usize)?);
306 assert!(safe_len(1024 * 1024 * 1024).is_err());
307
308 Ok(())
309 }
310}