apache_avro/reader/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Logic handling reading from Avro format at user level.
19
20mod block;
21pub mod single_object;
22
23use crate::{
24    AvroResult,
25    decode::{decode, decode_internal},
26    schema::{ResolvedSchema, Schema},
27    types::Value,
28};
29use block::Block;
30use bon::bon;
31use std::{collections::HashMap, io::Read};
32
33/// Main interface for reading Avro formatted values.
34///
35/// To be used as an iterator:
36///
37/// ```no_run
38/// # use apache_avro::Reader;
39/// # use std::io::Cursor;
40/// # let input = Cursor::new(Vec::<u8>::new());
41/// for value in Reader::new(input).unwrap() {
42///     match value {
43///         Ok(v) => println!("{:?}", v),
44///         Err(e) => println!("Error: {}", e),
45///     };
46/// }
47/// ```
48pub struct Reader<'a, R> {
49    block: Block<'a, R>,
50    reader_schema: Option<&'a Schema>,
51    errored: bool,
52    should_resolve_schema: bool,
53}
54
55#[bon]
56impl<'a, R: Read> Reader<'a, R> {
57    /// Creates a `Reader` given something implementing the `io::Read` trait to read from.
58    /// No reader `Schema` will be set.
59    ///
60    /// **NOTE** The avro header is going to be read automatically upon creation of the `Reader`.
61    pub fn new(reader: R) -> AvroResult<Reader<'a, R>> {
62        Reader::builder(reader).build()
63    }
64
65    /// Creates a `Reader` given something implementing the `io::Read` trait to read from.
66    /// With an optional reader `Schema` and optional schemata to use for resolving schema
67    /// references.
68    ///
69    /// **NOTE** The avro header is going to be read automatically upon creation of the `Reader`.
70    #[builder(finish_fn = build)]
71    pub fn builder(
72        #[builder(start_fn)] reader: R,
73        reader_schema: Option<&'a Schema>,
74        schemata: Option<Vec<&'a Schema>>,
75    ) -> AvroResult<Reader<'a, R>> {
76        let schemata =
77            schemata.unwrap_or_else(|| reader_schema.map(|rs| vec![rs]).unwrap_or_default());
78
79        let block = Block::new(reader, schemata)?;
80        let mut reader = Reader {
81            block,
82            reader_schema,
83            errored: false,
84            should_resolve_schema: false,
85        };
86        // Check if the reader and writer schemas disagree.
87        reader.should_resolve_schema =
88            reader_schema.is_some_and(|reader_schema| reader.writer_schema() != reader_schema);
89        Ok(reader)
90    }
91
92    /// Get a reference to the writer `Schema`.
93    #[inline]
94    pub fn writer_schema(&self) -> &Schema {
95        &self.block.writer_schema
96    }
97
98    /// Get a reference to the optional reader `Schema`.
99    #[inline]
100    pub fn reader_schema(&self) -> Option<&Schema> {
101        self.reader_schema
102    }
103
104    /// Get a reference to the user metadata
105    #[inline]
106    pub fn user_metadata(&self) -> &HashMap<String, Vec<u8>> {
107        &self.block.user_metadata
108    }
109
110    #[inline]
111    fn read_next(&mut self) -> AvroResult<Option<Value>> {
112        let read_schema = if self.should_resolve_schema {
113            self.reader_schema
114        } else {
115            None
116        };
117
118        self.block.read_next(read_schema)
119    }
120}
121
122impl<R: Read> Iterator for Reader<'_, R> {
123    type Item = AvroResult<Value>;
124
125    fn next(&mut self) -> Option<Self::Item> {
126        // to prevent keep on reading after the first error occurs
127        if self.errored {
128            return None;
129        };
130        match self.read_next() {
131            Ok(opt) => opt.map(Ok),
132            Err(e) => {
133                self.errored = true;
134                Some(Err(e))
135            }
136        }
137    }
138}
139
140/// Decode a `Value` encoded in Avro format given its `Schema` and anything implementing `io::Read`
141/// to read from.
142///
143/// In case a reader `Schema` is provided, schema resolution will also be performed.
144///
145/// **NOTE** This function has a quite small niche of usage and does NOT take care of reading the
146/// header and consecutive data blocks; use [`Reader`](struct.Reader.html) if you don't know what
147/// you are doing, instead.
148pub fn from_avro_datum<R: Read>(
149    writer_schema: &Schema,
150    reader: &mut R,
151    reader_schema: Option<&Schema>,
152) -> AvroResult<Value> {
153    let value = decode(writer_schema, reader)?;
154    match reader_schema {
155        Some(schema) => value.resolve(schema),
156        None => Ok(value),
157    }
158}
159
160/// Decode a `Value` from raw Avro data.
161///
162/// If the writer schema is incomplete, i.e. contains `Schema::Ref`s then it will use the provided
163/// schemata to resolve any dependencies.
164///
165/// When a reader `Schema` is provided, schema resolution will also be performed.
166pub fn from_avro_datum_schemata<R: Read>(
167    writer_schema: &Schema,
168    writer_schemata: Vec<&Schema>,
169    reader: &mut R,
170    reader_schema: Option<&Schema>,
171) -> AvroResult<Value> {
172    from_avro_datum_reader_schemata(
173        writer_schema,
174        writer_schemata,
175        reader,
176        reader_schema,
177        Vec::with_capacity(0),
178    )
179}
180
181/// Decode a `Value` from raw Avro data.
182///
183/// If the writer schema is incomplete, i.e. contains `Schema::Ref`s then it will use the provided
184/// schemata to resolve any dependencies.
185///
186/// When a reader `Schema` is provided, schema resolution will also be performed.
187pub fn from_avro_datum_reader_schemata<R: Read>(
188    writer_schema: &Schema,
189    writer_schemata: Vec<&Schema>,
190    reader: &mut R,
191    reader_schema: Option<&Schema>,
192    reader_schemata: Vec<&Schema>,
193) -> AvroResult<Value> {
194    let rs = ResolvedSchema::try_from(writer_schemata)?;
195    let value = decode_internal(writer_schema, rs.get_names(), &None, reader)?;
196    match reader_schema {
197        Some(schema) => {
198            if reader_schemata.is_empty() {
199                value.resolve(schema)
200            } else {
201                value.resolve_schemata(schema, reader_schemata)
202            }
203        }
204        None => Ok(value),
205    }
206}
207
208/// Reads the marker bytes from Avro bytes generated earlier by a `Writer`
209pub fn read_marker(bytes: &[u8]) -> [u8; 16] {
210    assert!(
211        bytes.len() > 16,
212        "The bytes are too short to read a marker from them"
213    );
214    let mut marker = [0_u8; 16];
215    marker.clone_from_slice(&bytes[(bytes.len() - 16)..]);
216    marker
217}
218
219#[cfg(test)]
220mod tests {
221    use super::*;
222    use crate::from_value;
223    use crate::types::Record;
224    use apache_avro_test_helper::TestResult;
225    use pretty_assertions::assert_eq;
226    use serde::Deserialize;
227    use std::io::Cursor;
228
229    const SCHEMA: &str = r#"
230    {
231      "type": "record",
232      "name": "test",
233      "fields": [
234        {
235          "name": "a",
236          "type": "long",
237          "default": 42
238        },
239        {
240          "name": "b",
241          "type": "string"
242        }
243      ]
244    }
245    "#;
246    const UNION_SCHEMA: &str = r#"["null", "long"]"#;
247    const ENCODED: &[u8] = &[
248        79u8, 98u8, 106u8, 1u8, 4u8, 22u8, 97u8, 118u8, 114u8, 111u8, 46u8, 115u8, 99u8, 104u8,
249        101u8, 109u8, 97u8, 222u8, 1u8, 123u8, 34u8, 116u8, 121u8, 112u8, 101u8, 34u8, 58u8, 34u8,
250        114u8, 101u8, 99u8, 111u8, 114u8, 100u8, 34u8, 44u8, 34u8, 110u8, 97u8, 109u8, 101u8, 34u8,
251        58u8, 34u8, 116u8, 101u8, 115u8, 116u8, 34u8, 44u8, 34u8, 102u8, 105u8, 101u8, 108u8,
252        100u8, 115u8, 34u8, 58u8, 91u8, 123u8, 34u8, 110u8, 97u8, 109u8, 101u8, 34u8, 58u8, 34u8,
253        97u8, 34u8, 44u8, 34u8, 116u8, 121u8, 112u8, 101u8, 34u8, 58u8, 34u8, 108u8, 111u8, 110u8,
254        103u8, 34u8, 44u8, 34u8, 100u8, 101u8, 102u8, 97u8, 117u8, 108u8, 116u8, 34u8, 58u8, 52u8,
255        50u8, 125u8, 44u8, 123u8, 34u8, 110u8, 97u8, 109u8, 101u8, 34u8, 58u8, 34u8, 98u8, 34u8,
256        44u8, 34u8, 116u8, 121u8, 112u8, 101u8, 34u8, 58u8, 34u8, 115u8, 116u8, 114u8, 105u8,
257        110u8, 103u8, 34u8, 125u8, 93u8, 125u8, 20u8, 97u8, 118u8, 114u8, 111u8, 46u8, 99u8, 111u8,
258        100u8, 101u8, 99u8, 8u8, 110u8, 117u8, 108u8, 108u8, 0u8, 94u8, 61u8, 54u8, 221u8, 190u8,
259        207u8, 108u8, 180u8, 158u8, 57u8, 114u8, 40u8, 173u8, 199u8, 228u8, 239u8, 4u8, 20u8, 54u8,
260        6u8, 102u8, 111u8, 111u8, 84u8, 6u8, 98u8, 97u8, 114u8, 94u8, 61u8, 54u8, 221u8, 190u8,
261        207u8, 108u8, 180u8, 158u8, 57u8, 114u8, 40u8, 173u8, 199u8, 228u8, 239u8,
262    ];
263
264    #[test]
265    fn test_from_avro_datum() -> TestResult {
266        let schema = Schema::parse_str(SCHEMA)?;
267        let mut encoded: &'static [u8] = &[54, 6, 102, 111, 111];
268
269        let mut record = Record::new(&schema).unwrap();
270        record.put("a", 27i64);
271        record.put("b", "foo");
272        let expected = record.into();
273
274        assert_eq!(from_avro_datum(&schema, &mut encoded, None)?, expected);
275
276        Ok(())
277    }
278
279    #[test]
280    fn test_from_avro_datum_with_union_to_struct() -> TestResult {
281        const TEST_RECORD_SCHEMA_3240: &str = r#"
282    {
283      "type": "record",
284      "name": "test",
285      "fields": [
286        {
287          "name": "a",
288          "type": "long",
289          "default": 42
290        },
291        {
292          "name": "b",
293          "type": "string"
294        },
295        {
296            "name": "a_nullable_array",
297            "type": ["null", {"type": "array", "items": {"type": "string"}}],
298            "default": null
299        },
300        {
301            "name": "a_nullable_boolean",
302            "type": ["null", {"type": "boolean"}],
303            "default": null
304        },
305        {
306            "name": "a_nullable_string",
307            "type": ["null", {"type": "string"}],
308            "default": null
309        }
310      ]
311    }
312    "#;
313        #[derive(Default, Debug, Deserialize, PartialEq, Eq)]
314        struct TestRecord3240 {
315            a: i64,
316            b: String,
317            a_nullable_array: Option<Vec<String>>,
318            // we are missing the 'a_nullable_boolean' field to simulate missing keys
319            // a_nullable_boolean: Option<bool>,
320            a_nullable_string: Option<String>,
321        }
322
323        let schema = Schema::parse_str(TEST_RECORD_SCHEMA_3240)?;
324        let mut encoded: &'static [u8] = &[54, 6, 102, 111, 111];
325
326        let expected_record: TestRecord3240 = TestRecord3240 {
327            a: 27i64,
328            b: String::from("foo"),
329            a_nullable_array: None,
330            a_nullable_string: None,
331        };
332
333        let avro_datum = from_avro_datum(&schema, &mut encoded, None)?;
334        let parsed_record: TestRecord3240 = match &avro_datum {
335            Value::Record(_) => from_value::<TestRecord3240>(&avro_datum)?,
336            unexpected => {
337                panic!("could not map avro data to struct, found unexpected: {unexpected:?}")
338            }
339        };
340
341        assert_eq!(parsed_record, expected_record);
342
343        Ok(())
344    }
345
346    #[test]
347    fn test_null_union() -> TestResult {
348        let schema = Schema::parse_str(UNION_SCHEMA)?;
349        let mut encoded: &'static [u8] = &[2, 0];
350
351        assert_eq!(
352            from_avro_datum(&schema, &mut encoded, None)?,
353            Value::Union(1, Box::new(Value::Long(0)))
354        );
355
356        Ok(())
357    }
358
359    #[test]
360    fn test_reader_iterator() -> TestResult {
361        let schema = Schema::parse_str(SCHEMA)?;
362        let reader = Reader::builder(ENCODED).reader_schema(&schema).build()?;
363
364        let mut record1 = Record::new(&schema).unwrap();
365        record1.put("a", 27i64);
366        record1.put("b", "foo");
367
368        let mut record2 = Record::new(&schema).unwrap();
369        record2.put("a", 42i64);
370        record2.put("b", "bar");
371
372        let expected = [record1.into(), record2.into()];
373
374        for (i, value) in reader.enumerate() {
375            assert_eq!(value?, expected[i]);
376        }
377
378        Ok(())
379    }
380
381    #[test]
382    fn test_reader_invalid_header() -> TestResult {
383        let schema = Schema::parse_str(SCHEMA)?;
384        let mut invalid = &ENCODED[1..];
385        assert!(
386            Reader::builder(&mut invalid)
387                .reader_schema(&schema)
388                .build()
389                .is_err()
390        );
391
392        Ok(())
393    }
394
395    #[test]
396    fn test_reader_invalid_block() -> TestResult {
397        let schema = Schema::parse_str(SCHEMA)?;
398        let mut invalid = &ENCODED[0..ENCODED.len() - 19];
399        let reader = Reader::builder(&mut invalid)
400            .reader_schema(&schema)
401            .build()?;
402        for value in reader {
403            assert!(value.is_err());
404        }
405
406        Ok(())
407    }
408
409    #[test]
410    fn test_reader_empty_buffer() -> TestResult {
411        let empty = Cursor::new(Vec::new());
412        assert!(Reader::new(empty).is_err());
413
414        Ok(())
415    }
416
417    #[test]
418    fn test_reader_only_header() -> TestResult {
419        let mut invalid = &ENCODED[..165];
420        let reader = Reader::new(&mut invalid)?;
421        for value in reader {
422            assert!(value.is_err());
423        }
424
425        Ok(())
426    }
427
428    #[test]
429    fn test_avro_3405_read_user_metadata_success() -> TestResult {
430        use crate::writer::Writer;
431
432        let schema = Schema::parse_str(SCHEMA)?;
433        let mut writer = Writer::new(&schema, Vec::new())?;
434
435        let mut user_meta_data: HashMap<String, Vec<u8>> = HashMap::new();
436        user_meta_data.insert(
437            "stringKey".to_string(),
438            "stringValue".to_string().into_bytes(),
439        );
440        user_meta_data.insert("bytesKey".to_string(), b"bytesValue".to_vec());
441        user_meta_data.insert("vecKey".to_string(), vec![1, 2, 3]);
442
443        for (k, v) in user_meta_data.iter() {
444            writer.add_user_metadata(k.to_string(), v)?;
445        }
446
447        let mut record = Record::new(&schema).unwrap();
448        record.put("a", 27i64);
449        record.put("b", "foo");
450
451        writer.append_value(record.clone())?;
452        writer.append_value(record.clone())?;
453        writer.flush()?;
454        let result = writer.into_inner()?;
455
456        let reader = Reader::new(&result[..])?;
457        assert_eq!(reader.user_metadata(), &user_meta_data);
458
459        Ok(())
460    }
461
462    #[cfg(not(feature = "snappy"))]
463    #[test]
464    fn test_avro_3549_read_not_enabled_codec() {
465        let snappy_compressed_avro = vec![
466            79, 98, 106, 1, 4, 22, 97, 118, 114, 111, 46, 115, 99, 104, 101, 109, 97, 210, 1, 123,
467            34, 102, 105, 101, 108, 100, 115, 34, 58, 91, 123, 34, 110, 97, 109, 101, 34, 58, 34,
468            110, 117, 109, 34, 44, 34, 116, 121, 112, 101, 34, 58, 34, 115, 116, 114, 105, 110,
469            103, 34, 125, 93, 44, 34, 110, 97, 109, 101, 34, 58, 34, 101, 118, 101, 110, 116, 34,
470            44, 34, 110, 97, 109, 101, 115, 112, 97, 99, 101, 34, 58, 34, 101, 120, 97, 109, 112,
471            108, 101, 110, 97, 109, 101, 115, 112, 97, 99, 101, 34, 44, 34, 116, 121, 112, 101, 34,
472            58, 34, 114, 101, 99, 111, 114, 100, 34, 125, 20, 97, 118, 114, 111, 46, 99, 111, 100,
473            101, 99, 12, 115, 110, 97, 112, 112, 121, 0, 213, 209, 241, 208, 200, 110, 164, 47,
474            203, 25, 90, 235, 161, 167, 195, 177, 2, 20, 4, 12, 6, 49, 50, 51, 115, 38, 58, 0, 213,
475            209, 241, 208, 200, 110, 164, 47, 203, 25, 90, 235, 161, 167, 195, 177,
476        ];
477
478        if let Err(err) = Reader::new(snappy_compressed_avro.as_slice()) {
479            assert_eq!("Codec 'snappy' is not supported/enabled", err.to_string());
480        } else {
481            panic!("Expected an error in the reading of the codec!");
482        }
483    }
484}