Skip to main content

apache_avro/reader/
datum.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::{io::Read, marker::PhantomData};
19
20use bon::bon;
21use serde::de::DeserializeOwned;
22
23use crate::{
24    AvroResult, AvroSchema, Schema,
25    decode::decode_internal,
26    schema::{ResolvedOwnedSchema, ResolvedSchema},
27    serde::deser_schema::{Config, SchemaAwareDeserializer},
28    types::Value,
29    util::is_human_readable,
30};
31
32/// Reader for reading raw Avro data.
33///
34/// This is most likely not what you need. Most users should use [`Reader`][crate::Reader],
35/// [`GenericSingleObjectReader`][crate::GenericSingleObjectReader], or
36/// [`SpecificSingleObjectReader`][crate::SpecificSingleObjectReader] instead.
37pub struct GenericDatumReader<'s> {
38    writer: &'s Schema,
39    resolved: ResolvedSchema<'s>,
40    reader: Option<(&'s Schema, ResolvedSchema<'s>)>,
41    human_readable: bool,
42}
43
44#[bon]
45impl<'s> GenericDatumReader<'s> {
46    /// Build a [`GenericDatumReader`].
47    ///
48    /// This is most likely not what you need. Most users should use [`Reader`][crate::Reader],
49    /// [`GenericSingleObjectReader`][crate::GenericSingleObjectReader], or
50    /// [`SpecificSingleObjectReader`][crate::SpecificSingleObjectReader] instead.
51    #[builder]
52    pub fn new(
53        /// The schema that was used to write the Avro datum.
54        #[builder(start_fn)]
55        writer_schema: &'s Schema,
56        /// Already resolved schemata that will be used to resolve references in the writer's schema.
57        resolved_writer_schemata: Option<ResolvedSchema<'s>>,
58        /// The schema that will be used to resolve the value to conform the the new schema.
59        reader_schema: Option<&'s Schema>,
60        /// Already resolved schemata that will be used to resolve references in the reader's schema.
61        resolved_reader_schemata: Option<ResolvedSchema<'s>>,
62        /// Was the data serialized with `human_readable`.
63        #[builder(default = is_human_readable())]
64        human_readable: bool,
65    ) -> AvroResult<Self> {
66        let resolved_writer_schemata = if let Some(resolved) = resolved_writer_schemata {
67            resolved
68        } else {
69            ResolvedSchema::try_from(writer_schema)?
70        };
71
72        let reader = if let Some(reader) = reader_schema {
73            if let Some(resolved) = resolved_reader_schemata {
74                Some((reader, resolved))
75            } else {
76                Some((reader, ResolvedSchema::try_from(reader)?))
77            }
78        } else {
79            None
80        };
81
82        Ok(Self {
83            writer: writer_schema,
84            resolved: resolved_writer_schemata,
85            reader,
86            human_readable,
87        })
88    }
89}
90
91impl<'s, S: generic_datum_reader_builder::State> GenericDatumReaderBuilder<'s, S> {
92    /// Set the schemata that will be used to resolve any references in the writer's schema.
93    ///
94    /// This is equivalent to `.resolved_writer_schemata(ResolvedSchema::new_with_schemata(schemata)?)`.
95    /// If you already have a [`ResolvedSchema`], use that function instead.
96    pub fn writer_schemata(
97        self,
98        schemata: Vec<&'s Schema>,
99    ) -> AvroResult<
100        GenericDatumReaderBuilder<'s, generic_datum_reader_builder::SetResolvedWriterSchemata<S>>,
101    >
102    where
103        S::ResolvedWriterSchemata: generic_datum_reader_builder::IsUnset,
104    {
105        let resolved = ResolvedSchema::new_with_schemata(schemata)?;
106        Ok(self.resolved_writer_schemata(resolved))
107    }
108
109    /// Set the schemata that will be used to resolve any references in the reader's schema.
110    ///
111    /// This is equivalent to `.resolved_reader_schemata(ResolvedSchema::new_with_schemata(schemata)?)`.
112    /// If you already have a [`ResolvedSchema`], use that function instead.
113    ///
114    /// This function can only be called after the reader schema is set.
115    pub fn reader_schemata(
116        self,
117        schemata: Vec<&'s Schema>,
118    ) -> AvroResult<
119        GenericDatumReaderBuilder<'s, generic_datum_reader_builder::SetResolvedReaderSchemata<S>>,
120    >
121    where
122        S::ResolvedReaderSchemata: generic_datum_reader_builder::IsUnset,
123        S::ReaderSchema: generic_datum_reader_builder::IsSet,
124    {
125        let resolved = ResolvedSchema::new_with_schemata(schemata)?;
126        Ok(self.resolved_reader_schemata(resolved))
127    }
128}
129
130impl<'s> GenericDatumReader<'s> {
131    /// Read a Avro datum from the reader.
132    pub fn read_value<R: Read>(&self, reader: &mut R) -> AvroResult<Value> {
133        let value = decode_internal(self.writer, self.resolved.get_names(), None, reader)?;
134        if let Some((reader, resolved)) = &self.reader {
135            value.resolve_internal(reader, resolved.get_names(), None, &None)
136        } else {
137            Ok(value)
138        }
139    }
140
141    /// Read a Avro datum from the reader.
142    ///
143    /// # Panics
144    /// Will panic if a reader schema has been configured, this is a WIP.
145    pub fn read_deser<T: DeserializeOwned>(&self, reader: &mut impl Read) -> AvroResult<T> {
146        // `reader` is `impl Read` instead of a generic on the function like T so it's easier to
147        // specify the type wanted (`read_deser<String>` vs `read_deser<String, _>`)
148        if let Some((_, _)) = &self.reader {
149            // TODO: Implement SchemaAwareResolvingDeserializer
150            panic!("Schema aware deserialisation does not resolve schemas yet");
151        } else {
152            T::deserialize(SchemaAwareDeserializer::new(
153                reader,
154                self.writer,
155                Config {
156                    names: self.resolved.get_names(),
157                    human_readable: self.human_readable,
158                },
159            )?)
160        }
161    }
162}
163
164/// Reader for reading raw Avro data.
165///
166/// This is most likely not what you need. Most users should use [`Reader`][crate::Reader],
167/// [`GenericSingleObjectReader`][crate::GenericSingleObjectReader], or
168/// [`SpecificSingleObjectReader`][crate::SpecificSingleObjectReader] instead.
169pub struct SpecificDatumReader<T: AvroSchema> {
170    resolved: ResolvedOwnedSchema,
171    human_readable: bool,
172    phantom: PhantomData<T>,
173}
174
175#[bon]
176impl<T: AvroSchema> SpecificDatumReader<T> {
177    /// Build a [`SpecificDatumReader`].
178    ///
179    /// This is most likely not what you need. Most users should use [`Reader`][crate::Reader],
180    /// [`GenericSingleObjectReader`][crate::GenericSingleObjectReader], or
181    /// [`SpecificSingleObjectReader`][crate::SpecificSingleObjectReader] instead.
182    #[builder]
183    pub fn new(#[builder(default = is_human_readable())] human_readable: bool) -> AvroResult<Self> {
184        Ok(Self {
185            resolved: T::get_schema().try_into()?,
186            human_readable,
187            phantom: PhantomData,
188        })
189    }
190}
191
192impl<T: AvroSchema + DeserializeOwned> SpecificDatumReader<T> {
193    pub fn read<R: Read>(&self, reader: &mut R) -> AvroResult<T> {
194        T::deserialize(SchemaAwareDeserializer::new(
195            reader,
196            self.resolved.get_root_schema(),
197            Config {
198                names: self.resolved.get_names(),
199                human_readable: self.human_readable,
200            },
201        )?)
202    }
203}
204
205/// Deprecated.
206///
207/// This is equivalent to
208/// ```ignore
209/// GenericDatumReader::builder(writer_schema)
210///    .maybe_reader_schema(reader_schema)
211///    .build()?
212///    .read_value(reader)
213/// ```
214///
215/// Decode a `Value` encoded in Avro format given its `Schema` and anything implementing `io::Read`
216/// to read from.
217///
218/// In case a reader `Schema` is provided, schema resolution will also be performed.
219///
220/// **NOTE** This function has a quite small niche of usage and does NOT take care of reading the
221/// header and consecutive data blocks; use [`Reader`](struct.Reader.html) if you don't know what
222/// you are doing, instead.
223#[deprecated(since = "0.22.0", note = "Use `GenericDatumReader` instead")]
224pub fn from_avro_datum<R: Read>(
225    writer_schema: &Schema,
226    reader: &mut R,
227    reader_schema: Option<&Schema>,
228) -> AvroResult<Value> {
229    GenericDatumReader::builder(writer_schema)
230        .maybe_reader_schema(reader_schema)
231        .build()?
232        .read_value(reader)
233}
234
235/// Deprecated.
236///
237/// This is equivalent to
238/// ```ignore
239/// GenericDatumReader::builder(writer_schema)
240///    .writer_schemata(writer_schemata)?
241///    .maybe_reader_schema(reader_schema)
242///    .build()?
243///    .read_value(reader)
244/// ```
245///
246/// Decode a `Value` from raw Avro data.
247///
248/// If the writer schema is incomplete, i.e. contains `Schema::Ref`s then it will use the provided
249/// schemata to resolve any dependencies.
250///
251/// When a reader `Schema` is provided, schema resolution will also be performed.
252#[deprecated(since = "0.22.0", note = "Use `GenericDatumReader` instead")]
253pub fn from_avro_datum_schemata<R: Read>(
254    writer_schema: &Schema,
255    writer_schemata: Vec<&Schema>,
256    reader: &mut R,
257    reader_schema: Option<&Schema>,
258) -> AvroResult<Value> {
259    GenericDatumReader::builder(writer_schema)
260        .writer_schemata(writer_schemata)?
261        .maybe_reader_schema(reader_schema)
262        .build()?
263        .read_value(reader)
264}
265
266/// Deprecated.
267///
268/// This is equivalent to
269/// ```ignore
270/// GenericDatumReader::builder(writer_schema)
271///    .writer_schemata(writer_schemata)?
272///    .maybe_reader_schema(reader_schema)
273///    .reader_schemata(reader_schemata)?
274///    .build()?
275///    .read_value(reader)
276/// ```
277///
278/// Decode a `Value` from raw Avro data.
279///
280/// If the writer schema is incomplete, i.e. contains `Schema::Ref`s then it will use the provided
281/// schemata to resolve any dependencies.
282///
283/// When a reader `Schema` is provided, schema resolution will also be performed.
284#[deprecated(since = "0.22.0", note = "Use `GenericDatumReader` instead")]
285pub fn from_avro_datum_reader_schemata<R: Read>(
286    writer_schema: &Schema,
287    writer_schemata: Vec<&Schema>,
288    reader: &mut R,
289    reader_schema: Option<&Schema>,
290    reader_schemata: Vec<&Schema>,
291) -> AvroResult<Value> {
292    GenericDatumReader::builder(writer_schema)
293        .writer_schemata(writer_schemata)?
294        .maybe_reader_schema(reader_schema)
295        .reader_schemata(reader_schemata)?
296        .build()?
297        .read_value(reader)
298}
299
300#[cfg(test)]
301mod tests {
302    use apache_avro_test_helper::TestResult;
303    use serde::Deserialize;
304
305    use crate::{
306        Schema,
307        reader::datum::GenericDatumReader,
308        types::{Record, Value},
309    };
310
311    #[test]
312    fn test_from_avro_datum() -> TestResult {
313        let schema = Schema::parse_str(
314            r#"{
315            "type": "record",
316            "name": "test",
317            "fields": [
318                {
319                    "name": "a",
320                    "type": "long",
321                    "default": 42
322                },
323                {
324                    "name": "b",
325                    "type": "string"
326                }
327            ]
328        }"#,
329        )?;
330        let mut encoded: &'static [u8] = &[54, 6, 102, 111, 111];
331
332        let mut record = Record::new(&schema).unwrap();
333        record.put("a", 27i64);
334        record.put("b", "foo");
335        let expected = record.into();
336
337        let avro_datum = GenericDatumReader::builder(&schema)
338            .build()?
339            .read_value(&mut encoded)?;
340
341        assert_eq!(avro_datum, expected);
342
343        Ok(())
344    }
345
346    #[test]
347    fn test_from_avro_datum_with_union_to_struct() -> TestResult {
348        const TEST_RECORD_SCHEMA_3240: &str = r#"
349    {
350      "type": "record",
351      "name": "TestRecord3240",
352      "fields": [
353        {
354          "name": "a",
355          "type": "long",
356          "default": 42
357        },
358        {
359          "name": "b",
360          "type": "string"
361        },
362        {
363            "name": "a_nullable_array",
364            "type": ["null", {"type": "array", "items": {"type": "string"}}],
365            "default": null
366        },
367        {
368            "name": "a_nullable_boolean",
369            "type": ["null", {"type": "boolean"}],
370            "default": null
371        },
372        {
373            "name": "a_nullable_string",
374            "type": ["null", {"type": "string"}],
375            "default": null
376        }
377      ]
378    }
379    "#;
380        #[derive(Default, Debug, Deserialize, PartialEq, Eq)]
381        struct TestRecord3240 {
382            a: i64,
383            b: String,
384            a_nullable_array: Option<Vec<String>>,
385            // we are missing the 'a_nullable_boolean' field to simulate missing keys
386            // a_nullable_boolean: Option<bool>,
387            a_nullable_string: Option<String>,
388        }
389
390        let schema = Schema::parse_str(TEST_RECORD_SCHEMA_3240)?;
391        let mut encoded: &[u8] = &[54, 6, 102, 111, 111];
392
393        let error = GenericDatumReader::builder(&schema)
394            .build()?
395            .read_deser::<TestRecord3240>(&mut encoded)
396            .unwrap_err();
397        // TODO: Create a version of this test that does schema resolution
398        assert_eq!(
399            error.to_string(),
400            "Failed to read bytes for decoding variable length integer: failed to fill whole buffer"
401        );
402
403        Ok(())
404    }
405
406    #[test]
407    fn test_null_union() -> TestResult {
408        let schema = Schema::parse_str(r#"["null", "long"]"#)?;
409        let mut encoded: &'static [u8] = &[2, 0];
410
411        let avro_datum = GenericDatumReader::builder(&schema)
412            .build()?
413            .read_value(&mut encoded)?;
414        assert_eq!(avro_datum, Value::Union(1, Box::new(Value::Long(0))));
415
416        Ok(())
417    }
418}