Skip to main content

apache_avro/serde/deser_schema/
block.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::{borrow::Borrow, io::Read};
19
20use serde::de::{DeserializeSeed, MapAccess, SeqAccess};
21
22use super::{Config, SchemaAwareDeserializer};
23use crate::schema::MapSchema;
24use crate::{Error, Schema, schema::ArraySchema, util::zag_i64};
25
26/// Deserialize sequences from an Avro array.
27pub struct BlockDeserializer<'s, 'r, R: Read, S: Borrow<Schema>> {
28    reader: &'r mut R,
29    schema: &'s Schema,
30    config: Config<'s, S>,
31    /// Track where we are in reading the array
32    remaining: Option<u64>,
33}
34
35impl<'s, 'r, R: Read, S: Borrow<Schema>> BlockDeserializer<'s, 'r, R, S> {
36    pub fn array(
37        reader: &'r mut R,
38        schema: &'s ArraySchema,
39        config: Config<'s, S>,
40    ) -> Result<Self, Error> {
41        let schema = if let Schema::Ref { name } = schema.items.as_ref() {
42            config.get_schema(name)?
43        } else {
44            &schema.items
45        };
46        let remaining = Self::read_block_header(reader)?;
47        Ok(Self {
48            reader,
49            schema,
50            config,
51            remaining,
52        })
53    }
54
55    pub fn map(
56        reader: &'r mut R,
57        schema: &'s MapSchema,
58        config: Config<'s, S>,
59    ) -> Result<Self, Error> {
60        let schema = if let Schema::Ref { name } = schema.types.as_ref() {
61            config.get_schema(name)?
62        } else {
63            &schema.types
64        };
65        let remaining = Self::read_block_header(reader)?;
66        Ok(Self {
67            reader,
68            schema,
69            config,
70            remaining,
71        })
72    }
73
74    fn read_block_header(reader: &mut R) -> Result<Option<u64>, Error> {
75        let remaining = zag_i64(reader)?;
76        if remaining < 0 {
77            // If the block size is negative the next number is the size of the block in bytes
78            let _bytes = zag_i64(reader)?;
79        }
80        if remaining == 0 {
81            // If the block size is zero the array/map is finished
82            Ok(None)
83        } else {
84            Ok(Some(remaining.unsigned_abs()))
85        }
86    }
87}
88
89/// Deserialize as an array
90impl<'de, 's, 'r, R: Read, S: Borrow<Schema>> SeqAccess<'de> for BlockDeserializer<'s, 'r, R, S> {
91    type Error = Error;
92
93    fn next_element_seed<T>(&mut self, seed: T) -> Result<Option<T::Value>, Self::Error>
94    where
95        T: DeserializeSeed<'de>,
96    {
97        if let Some(mut remaining) = self.remaining {
98            let value = seed.deserialize(SchemaAwareDeserializer::new(
99                self.reader,
100                self.schema,
101                self.config,
102            )?)?;
103            remaining -= 1;
104            if remaining == 0 {
105                self.remaining = Self::read_block_header(self.reader)?;
106            } else {
107                self.remaining = Some(remaining);
108            }
109            Ok(Some(value))
110        } else {
111            Ok(None)
112        }
113    }
114
115    fn size_hint(&self) -> Option<usize> {
116        self.remaining
117            .map(|x| usize::try_from(x).unwrap_or(usize::MAX))
118    }
119}
120
121/// Deserialize as a map
122impl<'de, 's, 'r, R: Read, S: Borrow<Schema>> MapAccess<'de> for BlockDeserializer<'s, 'r, R, S> {
123    type Error = Error;
124
125    fn next_key_seed<K>(&mut self, seed: K) -> Result<Option<K::Value>, Self::Error>
126    where
127        K: DeserializeSeed<'de>,
128    {
129        if self.remaining.is_some() {
130            seed.deserialize(SchemaAwareDeserializer::new(
131                self.reader,
132                &Schema::String,
133                self.config,
134            )?)
135            .map(Some)
136        } else {
137            Ok(None)
138        }
139    }
140
141    fn next_value_seed<V>(&mut self, seed: V) -> Result<V::Value, Self::Error>
142    where
143        V: DeserializeSeed<'de>,
144    {
145        let mut remaining = self
146            .remaining
147            .expect("next_key returned None, next_value should not have been called");
148        let value = seed.deserialize(SchemaAwareDeserializer::new(
149            self.reader,
150            self.schema,
151            self.config,
152        )?)?;
153
154        remaining -= 1;
155        if remaining == 0 {
156            self.remaining = Self::read_block_header(self.reader)?;
157        } else {
158            self.remaining = Some(remaining);
159        }
160        Ok(value)
161    }
162
163    fn next_entry_seed<K, V>(
164        &mut self,
165        kseed: K,
166        vseed: V,
167    ) -> Result<Option<(K::Value, V::Value)>, Self::Error>
168    where
169        K: DeserializeSeed<'de>,
170        V: DeserializeSeed<'de>,
171    {
172        if let Some(mut remaining) = self.remaining {
173            let key = kseed.deserialize(SchemaAwareDeserializer::new(
174                self.reader,
175                &Schema::String,
176                self.config,
177            )?)?;
178            let value = vseed.deserialize(SchemaAwareDeserializer::new(
179                self.reader,
180                self.schema,
181                self.config,
182            )?)?;
183
184            remaining -= 1;
185            if remaining == 0 {
186                self.remaining = Self::read_block_header(self.reader)?;
187            } else {
188                self.remaining = Some(remaining);
189            }
190
191            Ok(Some((key, value)))
192        } else {
193            Ok(None)
194        }
195    }
196
197    fn size_hint(&self) -> Option<usize> {
198        self.remaining
199            .map(|x| usize::try_from(x).unwrap_or(usize::MAX))
200    }
201}