Skip to main content

apache_avro/reader/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Logic handling reading from Avro format at user level.
19
20mod block;
21pub mod datum;
22pub mod single_object;
23
24use std::{collections::HashMap, io::Read, marker::PhantomData};
25
26use block::Block;
27use bon::bon;
28use serde::de::DeserializeOwned;
29
30use crate::{AvroResult, schema::Schema, types::Value, util::is_human_readable};
31
32/// Main interface for reading Avro formatted values.
33///
34/// To be used as an iterator:
35///
36/// ```no_run
37/// # use apache_avro::Reader;
38/// # use std::io::Cursor;
39/// # let input = Cursor::new(Vec::<u8>::new());
40/// for value in Reader::new(input).unwrap() {
41///     match value {
42///         Ok(v) => println!("{:?}", v),
43///         Err(e) => println!("Error: {}", e),
44///     };
45/// }
46/// ```
47pub struct Reader<'a, R> {
48    block: Block<'a, R>,
49    reader_schema: Option<&'a Schema>,
50    errored: bool,
51    should_resolve_schema: bool,
52}
53
54#[bon]
55impl<'a, R: Read> Reader<'a, R> {
56    /// Creates a `Reader` given something implementing the `io::Read` trait to read from.
57    /// No reader `Schema` will be set.
58    ///
59    /// **NOTE** The avro header is going to be read automatically upon creation of the `Reader`.
60    pub fn new(reader: R) -> AvroResult<Reader<'a, R>> {
61        Reader::builder(reader).build()
62    }
63
64    /// Creates a `Reader` given something implementing the `io::Read` trait to read from.
65    /// With an optional reader `Schema` and optional schemata to use for resolving schema
66    /// references.
67    ///
68    /// **NOTE** The avro header is going to be read automatically upon creation of the `Reader`.
69    #[builder(finish_fn = build)]
70    pub fn builder(
71        #[builder(start_fn)] reader: R,
72        /// The schema the written data needs to be adapted to.
73        ///
74        /// This is currently not compatible with [`ReaderDeser`] and will panic during reading if the
75        /// writer schema is not the same as the reader schema.
76        reader_schema: Option<&'a Schema>,
77        schemata: Option<Vec<&'a Schema>>,
78        #[builder(default = is_human_readable())] human_readable: bool,
79    ) -> AvroResult<Reader<'a, R>> {
80        let schemata =
81            schemata.unwrap_or_else(|| reader_schema.map(|rs| vec![rs]).unwrap_or_default());
82
83        let block = Block::new(reader, schemata, human_readable)?;
84        let mut reader = Reader {
85            block,
86            reader_schema,
87            errored: false,
88            should_resolve_schema: false,
89        };
90        // Check if the reader and writer schemas disagree.
91        reader.should_resolve_schema =
92            reader_schema.is_some_and(|reader_schema| reader.writer_schema() != reader_schema);
93        Ok(reader)
94    }
95
96    /// Get a reference to the writer `Schema`.
97    #[inline]
98    pub fn writer_schema(&self) -> &Schema {
99        &self.block.writer_schema
100    }
101
102    /// Get a reference to the optional reader `Schema`.
103    #[inline]
104    pub fn reader_schema(&self) -> Option<&Schema> {
105        self.reader_schema
106    }
107
108    /// Get a reference to the user metadata.
109    #[inline]
110    pub fn user_metadata(&self) -> &HashMap<String, Vec<u8>> {
111        &self.block.user_metadata
112    }
113
114    /// Convert this reader into an iterator that deserializes to `T`.
115    pub fn into_deser_iter<T: DeserializeOwned>(self) -> ReaderDeser<'a, R, T> {
116        ReaderDeser {
117            inner: self,
118            phantom: PhantomData,
119        }
120    }
121
122    #[inline]
123    fn read_next(&mut self) -> AvroResult<Option<Value>> {
124        let read_schema = if self.should_resolve_schema {
125            self.reader_schema
126        } else {
127            None
128        };
129
130        self.block.read_next(read_schema)
131    }
132
133    fn read_next_deser<T: DeserializeOwned>(&mut self) -> AvroResult<Option<T>> {
134        // TODO: Implement SchemaAwareResolvingDeserializer
135        assert!(
136            !self.should_resolve_schema,
137            "Schema aware deserialisation does not resolve schemas yet"
138        );
139
140        self.block.read_next_deser(self.reader_schema)
141    }
142}
143
144impl<R: Read> Iterator for Reader<'_, R> {
145    type Item = AvroResult<Value>;
146
147    fn next(&mut self) -> Option<Self::Item> {
148        // to prevent keep on reading after the first error occurs
149        if self.errored {
150            return None;
151        };
152        match self.read_next() {
153            Ok(opt) => opt.map(Ok),
154            Err(e) => {
155                self.errored = true;
156                Some(Err(e))
157            }
158        }
159    }
160}
161
162/// Wrapper around [`Reader`] where the iterator deserializes `T`.
163pub struct ReaderDeser<'a, R, T> {
164    inner: Reader<'a, R>,
165    phantom: PhantomData<T>,
166}
167
168impl<R: Read, T: DeserializeOwned> Iterator for ReaderDeser<'_, R, T> {
169    type Item = AvroResult<T>;
170
171    fn next(&mut self) -> Option<Self::Item> {
172        // Don't continue when we've errored before
173        if self.inner.errored {
174            return None;
175        }
176        match self.inner.read_next_deser::<T>() {
177            Ok(opt) => opt.map(Ok),
178            Err(e) => {
179                self.inner.errored = true;
180                Some(Err(e))
181            }
182        }
183    }
184}
185
186/// Reads the marker bytes from Avro bytes generated earlier by a `Writer`
187///
188/// # Panics
189/// Will panic if `bytes` does not contain at least 16 bytes.
190pub fn read_marker(bytes: &[u8]) -> [u8; 16] {
191    assert!(
192        bytes.len() > 16,
193        "The bytes are too short to read a marker from them"
194    );
195    let mut marker = [0_u8; 16];
196    marker.clone_from_slice(&bytes[(bytes.len() - 16)..]);
197    marker
198}
199
200#[cfg(test)]
201mod tests {
202    use std::io::Cursor;
203
204    use apache_avro_test_helper::TestResult;
205    use pretty_assertions::assert_eq;
206
207    use super::*;
208    use crate::types::Record;
209
210    const SCHEMA: &str = r#"
211    {
212      "type": "record",
213      "name": "test",
214      "fields": [
215        {
216          "name": "a",
217          "type": "long",
218          "default": 42
219        },
220        {
221          "name": "b",
222          "type": "string"
223        }
224      ]
225    }
226    "#;
227    const ENCODED: &[u8] = &[
228        79u8, 98u8, 106u8, 1u8, 4u8, 22u8, 97u8, 118u8, 114u8, 111u8, 46u8, 115u8, 99u8, 104u8,
229        101u8, 109u8, 97u8, 222u8, 1u8, 123u8, 34u8, 116u8, 121u8, 112u8, 101u8, 34u8, 58u8, 34u8,
230        114u8, 101u8, 99u8, 111u8, 114u8, 100u8, 34u8, 44u8, 34u8, 110u8, 97u8, 109u8, 101u8, 34u8,
231        58u8, 34u8, 116u8, 101u8, 115u8, 116u8, 34u8, 44u8, 34u8, 102u8, 105u8, 101u8, 108u8,
232        100u8, 115u8, 34u8, 58u8, 91u8, 123u8, 34u8, 110u8, 97u8, 109u8, 101u8, 34u8, 58u8, 34u8,
233        97u8, 34u8, 44u8, 34u8, 116u8, 121u8, 112u8, 101u8, 34u8, 58u8, 34u8, 108u8, 111u8, 110u8,
234        103u8, 34u8, 44u8, 34u8, 100u8, 101u8, 102u8, 97u8, 117u8, 108u8, 116u8, 34u8, 58u8, 52u8,
235        50u8, 125u8, 44u8, 123u8, 34u8, 110u8, 97u8, 109u8, 101u8, 34u8, 58u8, 34u8, 98u8, 34u8,
236        44u8, 34u8, 116u8, 121u8, 112u8, 101u8, 34u8, 58u8, 34u8, 115u8, 116u8, 114u8, 105u8,
237        110u8, 103u8, 34u8, 125u8, 93u8, 125u8, 20u8, 97u8, 118u8, 114u8, 111u8, 46u8, 99u8, 111u8,
238        100u8, 101u8, 99u8, 8u8, 110u8, 117u8, 108u8, 108u8, 0u8, 94u8, 61u8, 54u8, 221u8, 190u8,
239        207u8, 108u8, 180u8, 158u8, 57u8, 114u8, 40u8, 173u8, 199u8, 228u8, 239u8, 4u8, 20u8, 54u8,
240        6u8, 102u8, 111u8, 111u8, 84u8, 6u8, 98u8, 97u8, 114u8, 94u8, 61u8, 54u8, 221u8, 190u8,
241        207u8, 108u8, 180u8, 158u8, 57u8, 114u8, 40u8, 173u8, 199u8, 228u8, 239u8,
242    ];
243
244    #[test]
245    fn test_reader_iterator() -> TestResult {
246        let schema = Schema::parse_str(SCHEMA)?;
247        let reader = Reader::builder(ENCODED).reader_schema(&schema).build()?;
248
249        let mut record1 = Record::new(&schema).unwrap();
250        record1.put("a", 27i64);
251        record1.put("b", "foo");
252
253        let mut record2 = Record::new(&schema).unwrap();
254        record2.put("a", 42i64);
255        record2.put("b", "bar");
256
257        let expected = [record1.into(), record2.into()];
258
259        for (i, value) in reader.enumerate() {
260            assert_eq!(value?, expected[i]);
261        }
262
263        Ok(())
264    }
265
266    #[test]
267    fn test_reader_invalid_header() -> TestResult {
268        let schema = Schema::parse_str(SCHEMA)?;
269        let mut invalid = &ENCODED[1..];
270        assert!(
271            Reader::builder(&mut invalid)
272                .reader_schema(&schema)
273                .build()
274                .is_err()
275        );
276
277        Ok(())
278    }
279
280    #[test]
281    fn test_reader_invalid_block() -> TestResult {
282        let schema = Schema::parse_str(SCHEMA)?;
283        let mut invalid = &ENCODED[0..ENCODED.len() - 19];
284        let reader = Reader::builder(&mut invalid)
285            .reader_schema(&schema)
286            .build()?;
287        for value in reader {
288            assert!(value.is_err());
289        }
290
291        Ok(())
292    }
293
294    #[test]
295    fn test_reader_empty_buffer() -> TestResult {
296        let empty = Cursor::new(Vec::new());
297        assert!(Reader::new(empty).is_err());
298
299        Ok(())
300    }
301
302    #[test]
303    fn test_reader_only_header() -> TestResult {
304        let mut invalid = &ENCODED[..165];
305        let reader = Reader::new(&mut invalid)?;
306        for value in reader {
307            assert!(value.is_err());
308        }
309
310        Ok(())
311    }
312
313    #[test]
314    fn test_avro_3405_read_user_metadata_success() -> TestResult {
315        use crate::writer::Writer;
316
317        let schema = Schema::parse_str(SCHEMA)?;
318        let mut writer = Writer::new(&schema, Vec::new())?;
319
320        let mut user_meta_data: HashMap<String, Vec<u8>> = HashMap::new();
321        user_meta_data.insert(
322            "stringKey".to_string(),
323            "stringValue".to_string().into_bytes(),
324        );
325        user_meta_data.insert("bytesKey".to_string(), b"bytesValue".to_vec());
326        user_meta_data.insert("vecKey".to_string(), vec![1, 2, 3]);
327
328        for (k, v) in user_meta_data.iter() {
329            writer.add_user_metadata(k.to_string(), v)?;
330        }
331
332        let mut record = Record::new(&schema).unwrap();
333        record.put("a", 27i64);
334        record.put("b", "foo");
335
336        writer.append_value(record.clone())?;
337        writer.append_value(record.clone())?;
338        writer.flush()?;
339        let result = writer.into_inner()?;
340
341        let reader = Reader::new(&result[..])?;
342        assert_eq!(reader.user_metadata(), &user_meta_data);
343
344        Ok(())
345    }
346
347    #[cfg(not(feature = "snappy"))]
348    #[test]
349    fn test_avro_3549_read_not_enabled_codec() {
350        let snappy_compressed_avro = vec![
351            79, 98, 106, 1, 4, 22, 97, 118, 114, 111, 46, 115, 99, 104, 101, 109, 97, 210, 1, 123,
352            34, 102, 105, 101, 108, 100, 115, 34, 58, 91, 123, 34, 110, 97, 109, 101, 34, 58, 34,
353            110, 117, 109, 34, 44, 34, 116, 121, 112, 101, 34, 58, 34, 115, 116, 114, 105, 110,
354            103, 34, 125, 93, 44, 34, 110, 97, 109, 101, 34, 58, 34, 101, 118, 101, 110, 116, 34,
355            44, 34, 110, 97, 109, 101, 115, 112, 97, 99, 101, 34, 58, 34, 101, 120, 97, 109, 112,
356            108, 101, 110, 97, 109, 101, 115, 112, 97, 99, 101, 34, 44, 34, 116, 121, 112, 101, 34,
357            58, 34, 114, 101, 99, 111, 114, 100, 34, 125, 20, 97, 118, 114, 111, 46, 99, 111, 100,
358            101, 99, 12, 115, 110, 97, 112, 112, 121, 0, 213, 209, 241, 208, 200, 110, 164, 47,
359            203, 25, 90, 235, 161, 167, 195, 177, 2, 20, 4, 12, 6, 49, 50, 51, 115, 38, 58, 0, 213,
360            209, 241, 208, 200, 110, 164, 47, 203, 25, 90, 235, 161, 167, 195, 177,
361        ];
362
363        if let Err(err) = Reader::new(snappy_compressed_avro.as_slice()) {
364            assert_eq!("Codec 'snappy' is not supported/enabled", err.to_string());
365        } else {
366            panic!("Expected an error in the reading of the codec!");
367        }
368    }
369}