apache_avro/reader/
datum.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::io::Read;
19
20use bon::bon;
21
22use crate::{AvroResult, Schema, decode::decode_internal, schema::ResolvedSchema, types::Value};
23
24/// Reader for reading raw Avro data.
25///
26/// This is most likely not what you need. Most users should use [`Reader`][crate::Reader],
27/// [`GenericSingleObjectReader`][crate::GenericSingleObjectReader], or
28/// [`SpecificSingleObjectReader`][crate::SpecificSingleObjectReader] instead.
29pub struct GenericDatumReader<'s> {
30    writer: &'s Schema,
31    resolved: ResolvedSchema<'s>,
32    reader: Option<(&'s Schema, ResolvedSchema<'s>)>,
33}
34
35#[bon]
36impl<'s> GenericDatumReader<'s> {
37    /// Build a [`GenericDatumReader`].
38    ///
39    /// This is most likely not what you need. Most users should use [`Reader`][crate::Reader],
40    /// [`GenericSingleObjectReader`][crate::GenericSingleObjectReader], or
41    /// [`SpecificSingleObjectReader`][crate::SpecificSingleObjectReader] instead.
42    #[builder]
43    pub fn new(
44        /// The schema that was used to write the Avro datum.
45        #[builder(start_fn)]
46        writer_schema: &'s Schema,
47        /// Already resolved schemata that will be used to resolve references in the writer's schema.
48        resolved_writer_schemata: Option<ResolvedSchema<'s>>,
49        /// The schema that will be used to resolve the value to conform the the new schema.
50        reader_schema: Option<&'s Schema>,
51        /// Already resolved schemata that will be used to resolve references in the reader's schema.
52        resolved_reader_schemata: Option<ResolvedSchema<'s>>,
53    ) -> AvroResult<Self> {
54        let resolved_writer_schemata = if let Some(resolved) = resolved_writer_schemata {
55            resolved
56        } else {
57            ResolvedSchema::try_from(writer_schema)?
58        };
59
60        let reader = if let Some(reader) = reader_schema {
61            if let Some(resolved) = resolved_reader_schemata {
62                Some((reader, resolved))
63            } else {
64                Some((reader, ResolvedSchema::try_from(reader)?))
65            }
66        } else {
67            None
68        };
69
70        Ok(Self {
71            writer: writer_schema,
72            resolved: resolved_writer_schemata,
73            reader,
74        })
75    }
76}
77
78impl<'s, S: generic_datum_reader_builder::State> GenericDatumReaderBuilder<'s, S> {
79    /// Set the schemata that will be used to resolve any references in the writer's schema.
80    ///
81    /// This is equivalent to `.resolved_writer_schemata(ResolvedSchema::new_with_schemata(schemata)?)`.
82    /// If you already have a [`ResolvedSchema`], use that function instead.
83    pub fn writer_schemata(
84        self,
85        schemata: Vec<&'s Schema>,
86    ) -> AvroResult<
87        GenericDatumReaderBuilder<'s, generic_datum_reader_builder::SetResolvedWriterSchemata<S>>,
88    >
89    where
90        S::ResolvedWriterSchemata: generic_datum_reader_builder::IsUnset,
91    {
92        let resolved = ResolvedSchema::new_with_schemata(schemata)?;
93        Ok(self.resolved_writer_schemata(resolved))
94    }
95
96    /// Set the schemata that will be used to resolve any references in the reader's schema.
97    ///
98    /// This is equivalent to `.resolved_reader_schemata(ResolvedSchema::new_with_schemata(schemata)?)`.
99    /// If you already have a [`ResolvedSchema`], use that function instead.
100    ///
101    /// This function can only be called after the reader schema is set.
102    pub fn reader_schemata(
103        self,
104        schemata: Vec<&'s Schema>,
105    ) -> AvroResult<
106        GenericDatumReaderBuilder<'s, generic_datum_reader_builder::SetResolvedReaderSchemata<S>>,
107    >
108    where
109        S::ResolvedReaderSchemata: generic_datum_reader_builder::IsUnset,
110        S::ReaderSchema: generic_datum_reader_builder::IsSet,
111    {
112        let resolved = ResolvedSchema::new_with_schemata(schemata)?;
113        Ok(self.resolved_reader_schemata(resolved))
114    }
115}
116
117impl<'s> GenericDatumReader<'s> {
118    /// Read a Avro datum from the reader.
119    pub fn read_value<R: Read>(&self, reader: &mut R) -> AvroResult<Value> {
120        let value = decode_internal(self.writer, self.resolved.get_names(), None, reader)?;
121        if let Some((reader, resolved)) = &self.reader {
122            value.resolve_internal(reader, resolved.get_names(), None, &None)
123        } else {
124            Ok(value)
125        }
126    }
127}
128
129/// Deprecated.
130///
131/// This is equivalent to
132/// ```ignore
133/// GenericDatumReader::builder(writer_schema)
134///    .maybe_reader_schema(reader_schema)
135///    .build()?
136///    .read_value(reader)
137/// ```
138///
139/// Decode a `Value` encoded in Avro format given its `Schema` and anything implementing `io::Read`
140/// to read from.
141///
142/// In case a reader `Schema` is provided, schema resolution will also be performed.
143///
144/// **NOTE** This function has a quite small niche of usage and does NOT take care of reading the
145/// header and consecutive data blocks; use [`Reader`](struct.Reader.html) if you don't know what
146/// you are doing, instead.
147#[deprecated(since = "0.22.0", note = "Use `GenericDatumReader` instead")]
148pub fn from_avro_datum<R: Read>(
149    writer_schema: &Schema,
150    reader: &mut R,
151    reader_schema: Option<&Schema>,
152) -> AvroResult<Value> {
153    GenericDatumReader::builder(writer_schema)
154        .maybe_reader_schema(reader_schema)
155        .build()?
156        .read_value(reader)
157}
158
159/// Deprecated.
160///
161/// This is equivalent to
162/// ```ignore
163/// GenericDatumReader::builder(writer_schema)
164///    .writer_schemata(writer_schemata)?
165///    .maybe_reader_schema(reader_schema)
166///    .build()?
167///    .read_value(reader)
168/// ```
169///
170/// Decode a `Value` from raw Avro data.
171///
172/// If the writer schema is incomplete, i.e. contains `Schema::Ref`s then it will use the provided
173/// schemata to resolve any dependencies.
174///
175/// When a reader `Schema` is provided, schema resolution will also be performed.
176#[deprecated(since = "0.22.0", note = "Use `GenericDatumReader` instead")]
177pub fn from_avro_datum_schemata<R: Read>(
178    writer_schema: &Schema,
179    writer_schemata: Vec<&Schema>,
180    reader: &mut R,
181    reader_schema: Option<&Schema>,
182) -> AvroResult<Value> {
183    GenericDatumReader::builder(writer_schema)
184        .writer_schemata(writer_schemata)?
185        .maybe_reader_schema(reader_schema)
186        .build()?
187        .read_value(reader)
188}
189
190/// Deprecated.
191///
192/// This is equivalent to
193/// ```ignore
194/// GenericDatumReader::builder(writer_schema)
195///    .writer_schemata(writer_schemata)?
196///    .maybe_reader_schema(reader_schema)
197///    .reader_schemata(reader_schemata)?
198///    .build()?
199///    .read_value(reader)
200/// ```
201///
202/// Decode a `Value` from raw Avro data.
203///
204/// If the writer schema is incomplete, i.e. contains `Schema::Ref`s then it will use the provided
205/// schemata to resolve any dependencies.
206///
207/// When a reader `Schema` is provided, schema resolution will also be performed.
208#[deprecated(since = "0.22.0", note = "Use `GenericDatumReader` instead")]
209pub fn from_avro_datum_reader_schemata<R: Read>(
210    writer_schema: &Schema,
211    writer_schemata: Vec<&Schema>,
212    reader: &mut R,
213    reader_schema: Option<&Schema>,
214    reader_schemata: Vec<&Schema>,
215) -> AvroResult<Value> {
216    GenericDatumReader::builder(writer_schema)
217        .writer_schemata(writer_schemata)?
218        .maybe_reader_schema(reader_schema)
219        .reader_schemata(reader_schemata)?
220        .build()?
221        .read_value(reader)
222}
223
224#[cfg(test)]
225mod tests {
226    use apache_avro_test_helper::TestResult;
227    use serde::Deserialize;
228
229    use crate::{
230        Schema, from_value,
231        reader::datum::GenericDatumReader,
232        types::{Record, Value},
233    };
234
235    #[test]
236    fn test_from_avro_datum() -> TestResult {
237        let schema = Schema::parse_str(
238            r#"{
239            "type": "record",
240            "name": "test",
241            "fields": [
242                {
243                    "name": "a",
244                    "type": "long",
245                    "default": 42
246                },
247                {
248                    "name": "b",
249                    "type": "string"
250                }
251            ]
252        }"#,
253        )?;
254        let mut encoded: &'static [u8] = &[54, 6, 102, 111, 111];
255
256        let mut record = Record::new(&schema).unwrap();
257        record.put("a", 27i64);
258        record.put("b", "foo");
259        let expected = record.into();
260
261        let avro_datum = GenericDatumReader::builder(&schema)
262            .build()?
263            .read_value(&mut encoded)?;
264
265        assert_eq!(avro_datum, expected);
266
267        Ok(())
268    }
269
270    #[test]
271    fn test_from_avro_datum_with_union_to_struct() -> TestResult {
272        const TEST_RECORD_SCHEMA_3240: &str = r#"
273    {
274      "type": "record",
275      "name": "test",
276      "fields": [
277        {
278          "name": "a",
279          "type": "long",
280          "default": 42
281        },
282        {
283          "name": "b",
284          "type": "string"
285        },
286        {
287            "name": "a_nullable_array",
288            "type": ["null", {"type": "array", "items": {"type": "string"}}],
289            "default": null
290        },
291        {
292            "name": "a_nullable_boolean",
293            "type": ["null", {"type": "boolean"}],
294            "default": null
295        },
296        {
297            "name": "a_nullable_string",
298            "type": ["null", {"type": "string"}],
299            "default": null
300        }
301      ]
302    }
303    "#;
304        #[derive(Default, Debug, Deserialize, PartialEq, Eq)]
305        struct TestRecord3240 {
306            a: i64,
307            b: String,
308            a_nullable_array: Option<Vec<String>>,
309            // we are missing the 'a_nullable_boolean' field to simulate missing keys
310            // a_nullable_boolean: Option<bool>,
311            a_nullable_string: Option<String>,
312        }
313
314        let schema = Schema::parse_str(TEST_RECORD_SCHEMA_3240)?;
315        let mut encoded: &'static [u8] = &[54, 6, 102, 111, 111];
316
317        let expected_record: TestRecord3240 = TestRecord3240 {
318            a: 27i64,
319            b: String::from("foo"),
320            a_nullable_array: None,
321            a_nullable_string: None,
322        };
323
324        let avro_datum = GenericDatumReader::builder(&schema)
325            .build()?
326            .read_value(&mut encoded)?;
327        let parsed_record: TestRecord3240 = match &avro_datum {
328            Value::Record(_) => from_value::<TestRecord3240>(&avro_datum)?,
329            unexpected => {
330                panic!("could not map avro data to struct, found unexpected: {unexpected:?}")
331            }
332        };
333
334        assert_eq!(parsed_record, expected_record);
335
336        Ok(())
337    }
338
339    #[test]
340    fn test_null_union() -> TestResult {
341        let schema = Schema::parse_str(r#"["null", "long"]"#)?;
342        let mut encoded: &'static [u8] = &[2, 0];
343
344        let avro_datum = GenericDatumReader::builder(&schema)
345            .build()?
346            .read_value(&mut encoded)?;
347        assert_eq!(avro_datum, Value::Union(1, Box::new(Value::Long(0))));
348
349        Ok(())
350    }
351}