apache_avro/reader/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Logic handling reading from Avro format at user level.
19
20mod block;
21pub mod datum;
22pub mod single_object;
23
24use crate::{AvroResult, schema::Schema, types::Value};
25use block::Block;
26use bon::bon;
27use std::{collections::HashMap, io::Read};
28
29/// Main interface for reading Avro formatted values.
30///
31/// To be used as an iterator:
32///
33/// ```no_run
34/// # use apache_avro::Reader;
35/// # use std::io::Cursor;
36/// # let input = Cursor::new(Vec::<u8>::new());
37/// for value in Reader::new(input).unwrap() {
38///     match value {
39///         Ok(v) => println!("{:?}", v),
40///         Err(e) => println!("Error: {}", e),
41///     };
42/// }
43/// ```
44pub struct Reader<'a, R> {
45    block: Block<'a, R>,
46    reader_schema: Option<&'a Schema>,
47    errored: bool,
48    should_resolve_schema: bool,
49}
50
51#[bon]
52impl<'a, R: Read> Reader<'a, R> {
53    /// Creates a `Reader` given something implementing the `io::Read` trait to read from.
54    /// No reader `Schema` will be set.
55    ///
56    /// **NOTE** The avro header is going to be read automatically upon creation of the `Reader`.
57    pub fn new(reader: R) -> AvroResult<Reader<'a, R>> {
58        Reader::builder(reader).build()
59    }
60
61    /// Creates a `Reader` given something implementing the `io::Read` trait to read from.
62    /// With an optional reader `Schema` and optional schemata to use for resolving schema
63    /// references.
64    ///
65    /// **NOTE** The avro header is going to be read automatically upon creation of the `Reader`.
66    #[builder(finish_fn = build)]
67    pub fn builder(
68        #[builder(start_fn)] reader: R,
69        reader_schema: Option<&'a Schema>,
70        schemata: Option<Vec<&'a Schema>>,
71    ) -> AvroResult<Reader<'a, R>> {
72        let schemata =
73            schemata.unwrap_or_else(|| reader_schema.map(|rs| vec![rs]).unwrap_or_default());
74
75        let block = Block::new(reader, schemata)?;
76        let mut reader = Reader {
77            block,
78            reader_schema,
79            errored: false,
80            should_resolve_schema: false,
81        };
82        // Check if the reader and writer schemas disagree.
83        reader.should_resolve_schema =
84            reader_schema.is_some_and(|reader_schema| reader.writer_schema() != reader_schema);
85        Ok(reader)
86    }
87
88    /// Get a reference to the writer `Schema`.
89    #[inline]
90    pub fn writer_schema(&self) -> &Schema {
91        &self.block.writer_schema
92    }
93
94    /// Get a reference to the optional reader `Schema`.
95    #[inline]
96    pub fn reader_schema(&self) -> Option<&Schema> {
97        self.reader_schema
98    }
99
100    /// Get a reference to the user metadata
101    #[inline]
102    pub fn user_metadata(&self) -> &HashMap<String, Vec<u8>> {
103        &self.block.user_metadata
104    }
105
106    #[inline]
107    fn read_next(&mut self) -> AvroResult<Option<Value>> {
108        let read_schema = if self.should_resolve_schema {
109            self.reader_schema
110        } else {
111            None
112        };
113
114        self.block.read_next(read_schema)
115    }
116}
117
118impl<R: Read> Iterator for Reader<'_, R> {
119    type Item = AvroResult<Value>;
120
121    fn next(&mut self) -> Option<Self::Item> {
122        // to prevent keep on reading after the first error occurs
123        if self.errored {
124            return None;
125        };
126        match self.read_next() {
127            Ok(opt) => opt.map(Ok),
128            Err(e) => {
129                self.errored = true;
130                Some(Err(e))
131            }
132        }
133    }
134}
135
136/// Reads the marker bytes from Avro bytes generated earlier by a `Writer`
137pub fn read_marker(bytes: &[u8]) -> [u8; 16] {
138    assert!(
139        bytes.len() > 16,
140        "The bytes are too short to read a marker from them"
141    );
142    let mut marker = [0_u8; 16];
143    marker.clone_from_slice(&bytes[(bytes.len() - 16)..]);
144    marker
145}
146
147#[cfg(test)]
148mod tests {
149    use super::*;
150    use crate::types::Record;
151    use apache_avro_test_helper::TestResult;
152    use pretty_assertions::assert_eq;
153    use std::io::Cursor;
154
155    const SCHEMA: &str = r#"
156    {
157      "type": "record",
158      "name": "test",
159      "fields": [
160        {
161          "name": "a",
162          "type": "long",
163          "default": 42
164        },
165        {
166          "name": "b",
167          "type": "string"
168        }
169      ]
170    }
171    "#;
172    const ENCODED: &[u8] = &[
173        79u8, 98u8, 106u8, 1u8, 4u8, 22u8, 97u8, 118u8, 114u8, 111u8, 46u8, 115u8, 99u8, 104u8,
174        101u8, 109u8, 97u8, 222u8, 1u8, 123u8, 34u8, 116u8, 121u8, 112u8, 101u8, 34u8, 58u8, 34u8,
175        114u8, 101u8, 99u8, 111u8, 114u8, 100u8, 34u8, 44u8, 34u8, 110u8, 97u8, 109u8, 101u8, 34u8,
176        58u8, 34u8, 116u8, 101u8, 115u8, 116u8, 34u8, 44u8, 34u8, 102u8, 105u8, 101u8, 108u8,
177        100u8, 115u8, 34u8, 58u8, 91u8, 123u8, 34u8, 110u8, 97u8, 109u8, 101u8, 34u8, 58u8, 34u8,
178        97u8, 34u8, 44u8, 34u8, 116u8, 121u8, 112u8, 101u8, 34u8, 58u8, 34u8, 108u8, 111u8, 110u8,
179        103u8, 34u8, 44u8, 34u8, 100u8, 101u8, 102u8, 97u8, 117u8, 108u8, 116u8, 34u8, 58u8, 52u8,
180        50u8, 125u8, 44u8, 123u8, 34u8, 110u8, 97u8, 109u8, 101u8, 34u8, 58u8, 34u8, 98u8, 34u8,
181        44u8, 34u8, 116u8, 121u8, 112u8, 101u8, 34u8, 58u8, 34u8, 115u8, 116u8, 114u8, 105u8,
182        110u8, 103u8, 34u8, 125u8, 93u8, 125u8, 20u8, 97u8, 118u8, 114u8, 111u8, 46u8, 99u8, 111u8,
183        100u8, 101u8, 99u8, 8u8, 110u8, 117u8, 108u8, 108u8, 0u8, 94u8, 61u8, 54u8, 221u8, 190u8,
184        207u8, 108u8, 180u8, 158u8, 57u8, 114u8, 40u8, 173u8, 199u8, 228u8, 239u8, 4u8, 20u8, 54u8,
185        6u8, 102u8, 111u8, 111u8, 84u8, 6u8, 98u8, 97u8, 114u8, 94u8, 61u8, 54u8, 221u8, 190u8,
186        207u8, 108u8, 180u8, 158u8, 57u8, 114u8, 40u8, 173u8, 199u8, 228u8, 239u8,
187    ];
188
189    #[test]
190    fn test_reader_iterator() -> TestResult {
191        let schema = Schema::parse_str(SCHEMA)?;
192        let reader = Reader::builder(ENCODED).reader_schema(&schema).build()?;
193
194        let mut record1 = Record::new(&schema).unwrap();
195        record1.put("a", 27i64);
196        record1.put("b", "foo");
197
198        let mut record2 = Record::new(&schema).unwrap();
199        record2.put("a", 42i64);
200        record2.put("b", "bar");
201
202        let expected = [record1.into(), record2.into()];
203
204        for (i, value) in reader.enumerate() {
205            assert_eq!(value?, expected[i]);
206        }
207
208        Ok(())
209    }
210
211    #[test]
212    fn test_reader_invalid_header() -> TestResult {
213        let schema = Schema::parse_str(SCHEMA)?;
214        let mut invalid = &ENCODED[1..];
215        assert!(
216            Reader::builder(&mut invalid)
217                .reader_schema(&schema)
218                .build()
219                .is_err()
220        );
221
222        Ok(())
223    }
224
225    #[test]
226    fn test_reader_invalid_block() -> TestResult {
227        let schema = Schema::parse_str(SCHEMA)?;
228        let mut invalid = &ENCODED[0..ENCODED.len() - 19];
229        let reader = Reader::builder(&mut invalid)
230            .reader_schema(&schema)
231            .build()?;
232        for value in reader {
233            assert!(value.is_err());
234        }
235
236        Ok(())
237    }
238
239    #[test]
240    fn test_reader_empty_buffer() -> TestResult {
241        let empty = Cursor::new(Vec::new());
242        assert!(Reader::new(empty).is_err());
243
244        Ok(())
245    }
246
247    #[test]
248    fn test_reader_only_header() -> TestResult {
249        let mut invalid = &ENCODED[..165];
250        let reader = Reader::new(&mut invalid)?;
251        for value in reader {
252            assert!(value.is_err());
253        }
254
255        Ok(())
256    }
257
258    #[test]
259    fn test_avro_3405_read_user_metadata_success() -> TestResult {
260        use crate::writer::Writer;
261
262        let schema = Schema::parse_str(SCHEMA)?;
263        let mut writer = Writer::new(&schema, Vec::new())?;
264
265        let mut user_meta_data: HashMap<String, Vec<u8>> = HashMap::new();
266        user_meta_data.insert(
267            "stringKey".to_string(),
268            "stringValue".to_string().into_bytes(),
269        );
270        user_meta_data.insert("bytesKey".to_string(), b"bytesValue".to_vec());
271        user_meta_data.insert("vecKey".to_string(), vec![1, 2, 3]);
272
273        for (k, v) in user_meta_data.iter() {
274            writer.add_user_metadata(k.to_string(), v)?;
275        }
276
277        let mut record = Record::new(&schema).unwrap();
278        record.put("a", 27i64);
279        record.put("b", "foo");
280
281        writer.append_value(record.clone())?;
282        writer.append_value(record.clone())?;
283        writer.flush()?;
284        let result = writer.into_inner()?;
285
286        let reader = Reader::new(&result[..])?;
287        assert_eq!(reader.user_metadata(), &user_meta_data);
288
289        Ok(())
290    }
291
292    #[cfg(not(feature = "snappy"))]
293    #[test]
294    fn test_avro_3549_read_not_enabled_codec() {
295        let snappy_compressed_avro = vec![
296            79, 98, 106, 1, 4, 22, 97, 118, 114, 111, 46, 115, 99, 104, 101, 109, 97, 210, 1, 123,
297            34, 102, 105, 101, 108, 100, 115, 34, 58, 91, 123, 34, 110, 97, 109, 101, 34, 58, 34,
298            110, 117, 109, 34, 44, 34, 116, 121, 112, 101, 34, 58, 34, 115, 116, 114, 105, 110,
299            103, 34, 125, 93, 44, 34, 110, 97, 109, 101, 34, 58, 34, 101, 118, 101, 110, 116, 34,
300            44, 34, 110, 97, 109, 101, 115, 112, 97, 99, 101, 34, 58, 34, 101, 120, 97, 109, 112,
301            108, 101, 110, 97, 109, 101, 115, 112, 97, 99, 101, 34, 44, 34, 116, 121, 112, 101, 34,
302            58, 34, 114, 101, 99, 111, 114, 100, 34, 125, 20, 97, 118, 114, 111, 46, 99, 111, 100,
303            101, 99, 12, 115, 110, 97, 112, 112, 121, 0, 213, 209, 241, 208, 200, 110, 164, 47,
304            203, 25, 90, 235, 161, 167, 195, 177, 2, 20, 4, 12, 6, 49, 50, 51, 115, 38, 58, 0, 213,
305            209, 241, 208, 200, 110, 164, 47, 203, 25, 90, 235, 161, 167, 195, 177,
306        ];
307
308        if let Err(err) = Reader::new(snappy_compressed_avro.as_slice()) {
309            assert_eq!("Codec 'snappy' is not supported/enabled", err.to_string());
310        } else {
311            panic!("Expected an error in the reading of the codec!");
312        }
313    }
314}