apache_avro/serde/deser_schema/
block.rs1use std::{borrow::Borrow, io::Read};
19
20use serde::de::{DeserializeSeed, MapAccess, SeqAccess};
21
22use super::{Config, SchemaAwareDeserializer};
23use crate::schema::MapSchema;
24use crate::{Error, Schema, schema::ArraySchema, util::zag_i64};
25
26pub struct BlockDeserializer<'s, 'r, R: Read, S: Borrow<Schema>> {
28 reader: &'r mut R,
29 schema: &'s Schema,
30 config: Config<'s, S>,
31 remaining: Option<u64>,
33}
34
35impl<'s, 'r, R: Read, S: Borrow<Schema>> BlockDeserializer<'s, 'r, R, S> {
36 pub fn array(
37 reader: &'r mut R,
38 schema: &'s ArraySchema,
39 config: Config<'s, S>,
40 ) -> Result<Self, Error> {
41 let schema = if let Schema::Ref { name } = schema.items.as_ref() {
42 config.get_schema(name)?
43 } else {
44 &schema.items
45 };
46 let remaining = Self::read_block_header(reader)?;
47 Ok(Self {
48 reader,
49 schema,
50 config,
51 remaining,
52 })
53 }
54
55 pub fn map(
56 reader: &'r mut R,
57 schema: &'s MapSchema,
58 config: Config<'s, S>,
59 ) -> Result<Self, Error> {
60 let schema = if let Schema::Ref { name } = schema.types.as_ref() {
61 config.get_schema(name)?
62 } else {
63 &schema.types
64 };
65 let remaining = Self::read_block_header(reader)?;
66 Ok(Self {
67 reader,
68 schema,
69 config,
70 remaining,
71 })
72 }
73
74 fn read_block_header(reader: &mut R) -> Result<Option<u64>, Error> {
75 let remaining = zag_i64(reader)?;
76 if remaining < 0 {
77 let _bytes = zag_i64(reader)?;
79 }
80 if remaining == 0 {
81 Ok(None)
83 } else {
84 Ok(Some(remaining.unsigned_abs()))
85 }
86 }
87}
88
89impl<'de, 's, 'r, R: Read, S: Borrow<Schema>> SeqAccess<'de> for BlockDeserializer<'s, 'r, R, S> {
91 type Error = Error;
92
93 fn next_element_seed<T>(&mut self, seed: T) -> Result<Option<T::Value>, Self::Error>
94 where
95 T: DeserializeSeed<'de>,
96 {
97 if let Some(mut remaining) = self.remaining {
98 let value = seed.deserialize(SchemaAwareDeserializer::new(
99 self.reader,
100 self.schema,
101 self.config,
102 )?)?;
103 remaining -= 1;
104 if remaining == 0 {
105 self.remaining = Self::read_block_header(self.reader)?;
106 } else {
107 self.remaining = Some(remaining);
108 }
109 Ok(Some(value))
110 } else {
111 Ok(None)
112 }
113 }
114
115 fn size_hint(&self) -> Option<usize> {
116 self.remaining
117 .map(|x| usize::try_from(x).unwrap_or(usize::MAX))
118 }
119}
120
121impl<'de, 's, 'r, R: Read, S: Borrow<Schema>> MapAccess<'de> for BlockDeserializer<'s, 'r, R, S> {
123 type Error = Error;
124
125 fn next_key_seed<K>(&mut self, seed: K) -> Result<Option<K::Value>, Self::Error>
126 where
127 K: DeserializeSeed<'de>,
128 {
129 if self.remaining.is_some() {
130 seed.deserialize(SchemaAwareDeserializer::new(
131 self.reader,
132 &Schema::String,
133 self.config,
134 )?)
135 .map(Some)
136 } else {
137 Ok(None)
138 }
139 }
140
141 fn next_value_seed<V>(&mut self, seed: V) -> Result<V::Value, Self::Error>
142 where
143 V: DeserializeSeed<'de>,
144 {
145 let mut remaining = self
146 .remaining
147 .expect("next_key returned None, next_value should not have been called");
148 let value = seed.deserialize(SchemaAwareDeserializer::new(
149 self.reader,
150 self.schema,
151 self.config,
152 )?)?;
153
154 remaining -= 1;
155 if remaining == 0 {
156 self.remaining = Self::read_block_header(self.reader)?;
157 } else {
158 self.remaining = Some(remaining);
159 }
160 Ok(value)
161 }
162
163 fn next_entry_seed<K, V>(
164 &mut self,
165 kseed: K,
166 vseed: V,
167 ) -> Result<Option<(K::Value, V::Value)>, Self::Error>
168 where
169 K: DeserializeSeed<'de>,
170 V: DeserializeSeed<'de>,
171 {
172 if let Some(mut remaining) = self.remaining {
173 let key = kseed.deserialize(SchemaAwareDeserializer::new(
174 self.reader,
175 &Schema::String,
176 self.config,
177 )?)?;
178 let value = vseed.deserialize(SchemaAwareDeserializer::new(
179 self.reader,
180 self.schema,
181 self.config,
182 )?)?;
183
184 remaining -= 1;
185 if remaining == 0 {
186 self.remaining = Self::read_block_header(self.reader)?;
187 } else {
188 self.remaining = Some(remaining);
189 }
190
191 Ok(Some((key, value)))
192 } else {
193 Ok(None)
194 }
195 }
196
197 fn size_hint(&self) -> Option<usize> {
198 self.remaining
199 .map(|x| usize::try_from(x).unwrap_or(usize::MAX))
200 }
201}