apache_avro/
reader.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Logic handling reading from Avro format at user level.
19use crate::{
20    decode::{decode, decode_internal},
21    from_value,
22    headers::{HeaderBuilder, RabinFingerprintHeader},
23    schema::{
24        resolve_names, resolve_names_with_schemata, AvroSchema, Names, ResolvedOwnedSchema,
25        ResolvedSchema, Schema,
26    },
27    types::Value,
28    util, AvroResult, Codec, Error,
29};
30use log::warn;
31use serde::de::DeserializeOwned;
32use serde_json::from_slice;
33use std::{
34    collections::HashMap,
35    io::{ErrorKind, Read},
36    marker::PhantomData,
37    str::FromStr,
38};
39
40/// Internal Block reader.
41#[derive(Debug, Clone)]
42struct Block<'r, R> {
43    reader: R,
44    /// Internal buffering to reduce allocation.
45    buf: Vec<u8>,
46    buf_idx: usize,
47    /// Number of elements expected to exist within this block.
48    message_count: usize,
49    marker: [u8; 16],
50    codec: Codec,
51    writer_schema: Schema,
52    schemata: Vec<&'r Schema>,
53    user_metadata: HashMap<String, Vec<u8>>,
54    names_refs: Names,
55}
56
57impl<'r, R: Read> Block<'r, R> {
58    fn new(reader: R, schemata: Vec<&'r Schema>) -> AvroResult<Block<'r, R>> {
59        let mut block = Block {
60            reader,
61            codec: Codec::Null,
62            writer_schema: Schema::Null,
63            schemata,
64            buf: vec![],
65            buf_idx: 0,
66            message_count: 0,
67            marker: [0; 16],
68            user_metadata: Default::default(),
69            names_refs: Default::default(),
70        };
71
72        block.read_header()?;
73        Ok(block)
74    }
75
76    /// Try to read the header and to set the writer `Schema`, the `Codec` and the marker based on
77    /// its content.
78    fn read_header(&mut self) -> AvroResult<()> {
79        let mut buf = [0u8; 4];
80        self.reader
81            .read_exact(&mut buf)
82            .map_err(Error::ReadHeader)?;
83
84        if buf != [b'O', b'b', b'j', 1u8] {
85            return Err(Error::HeaderMagic);
86        }
87
88        let meta_schema = Schema::map(Schema::Bytes);
89        if let Value::Map(metadata) = decode(&meta_schema, &mut self.reader)? {
90            self.read_writer_schema(&metadata)?;
91            self.codec = read_codec(&metadata)?;
92
93            for (key, value) in metadata {
94                if key == "avro.schema"
95                    || key == "avro.codec"
96                    || key == "avro.codec.compression_level"
97                {
98                    // already processed
99                } else if key.starts_with("avro.") {
100                    warn!("Ignoring unknown metadata key: {key}");
101                } else {
102                    self.read_user_metadata(key, value);
103                }
104            }
105        } else {
106            return Err(Error::GetHeaderMetadata);
107        }
108
109        self.reader
110            .read_exact(&mut self.marker)
111            .map_err(Error::ReadMarker)
112    }
113
114    fn fill_buf(&mut self, n: usize) -> AvroResult<()> {
115        // The buffer needs to contain exactly `n` elements, otherwise codecs will potentially read
116        // invalid bytes.
117        //
118        // The are two cases to handle here:
119        //
120        // 1. `n > self.buf.len()`:
121        //    In this case we call `Vec::resize`, which guarantees that `self.buf.len() == n`.
122        // 2. `n < self.buf.len()`:
123        //    We need to resize to ensure that the buffer len is safe to read `n` elements.
124        //
125        // TODO: Figure out a way to avoid having to truncate for the second case.
126        self.buf.resize(util::safe_len(n)?, 0);
127        self.reader
128            .read_exact(&mut self.buf)
129            .map_err(Error::ReadIntoBuf)?;
130        self.buf_idx = 0;
131        Ok(())
132    }
133
134    /// Try to read a data block, also performing schema resolution for the objects contained in
135    /// the block. The objects are stored in an internal buffer to the `Reader`.
136    fn read_block_next(&mut self) -> AvroResult<()> {
137        assert!(self.is_empty(), "Expected self to be empty!");
138        match util::read_long(&mut self.reader) {
139            Ok(block_len) => {
140                self.message_count = block_len as usize;
141                let block_bytes = util::read_long(&mut self.reader)?;
142                self.fill_buf(block_bytes as usize)?;
143                let mut marker = [0u8; 16];
144                self.reader
145                    .read_exact(&mut marker)
146                    .map_err(Error::ReadBlockMarker)?;
147
148                if marker != self.marker {
149                    return Err(Error::GetBlockMarker);
150                }
151
152                // NOTE (JAB): This doesn't fit this Reader pattern very well.
153                // `self.buf` is a growable buffer that is reused as the reader is iterated.
154                // For non `Codec::Null` variants, `decompress` will allocate a new `Vec`
155                // and replace `buf` with the new one, instead of reusing the same buffer.
156                // We can address this by using some "limited read" type to decode directly
157                // into the buffer. But this is fine, for now.
158                self.codec.decompress(&mut self.buf)
159            }
160            Err(Error::ReadVariableIntegerBytes(io_err)) => {
161                if let ErrorKind::UnexpectedEof = io_err.kind() {
162                    // to not return any error in case we only finished to read cleanly from the stream
163                    Ok(())
164                } else {
165                    Err(Error::ReadVariableIntegerBytes(io_err))
166                }
167            }
168            Err(e) => Err(e),
169        }
170    }
171
172    fn len(&self) -> usize {
173        self.message_count
174    }
175
176    fn is_empty(&self) -> bool {
177        self.len() == 0
178    }
179
180    fn read_next(&mut self, read_schema: Option<&Schema>) -> AvroResult<Option<Value>> {
181        if self.is_empty() {
182            self.read_block_next()?;
183            if self.is_empty() {
184                return Ok(None);
185            }
186        }
187
188        let mut block_bytes = &self.buf[self.buf_idx..];
189        let b_original = block_bytes.len();
190
191        let item = decode_internal(
192            &self.writer_schema,
193            &self.names_refs,
194            &None,
195            &mut block_bytes,
196        )?;
197        let item = match read_schema {
198            Some(schema) => item.resolve(schema)?,
199            None => item,
200        };
201
202        if b_original != 0 && b_original == block_bytes.len() {
203            // from_avro_datum did not consume any bytes, so return an error to avoid an infinite loop
204            return Err(Error::ReadBlock);
205        }
206        self.buf_idx += b_original - block_bytes.len();
207        self.message_count -= 1;
208        Ok(Some(item))
209    }
210
211    fn read_writer_schema(&mut self, metadata: &HashMap<String, Value>) -> AvroResult<()> {
212        let json: serde_json::Value = metadata
213            .get("avro.schema")
214            .and_then(|bytes| {
215                if let Value::Bytes(ref bytes) = *bytes {
216                    from_slice(bytes.as_ref()).ok()
217                } else {
218                    None
219                }
220            })
221            .ok_or(Error::GetAvroSchemaFromMap)?;
222        if !self.schemata.is_empty() {
223            let rs = ResolvedSchema::try_from(self.schemata.clone())?;
224            let names: Names = rs
225                .get_names()
226                .iter()
227                .map(|(name, schema)| (name.clone(), (*schema).clone()))
228                .collect();
229            self.writer_schema = Schema::parse_with_names(&json, names)?;
230            resolve_names_with_schemata(&self.schemata, &mut self.names_refs, &None)?;
231        } else {
232            self.writer_schema = Schema::parse(&json)?;
233            resolve_names(&self.writer_schema, &mut self.names_refs, &None)?;
234        }
235        Ok(())
236    }
237
238    fn read_user_metadata(&mut self, key: String, value: Value) {
239        match value {
240            Value::Bytes(ref vec) => {
241                self.user_metadata.insert(key, vec.clone());
242            }
243            wrong => {
244                warn!("User metadata values must be Value::Bytes, found {wrong:?}");
245            }
246        }
247    }
248}
249
250fn read_codec(metadata: &HashMap<String, Value>) -> AvroResult<Codec> {
251    let result = metadata
252        .get("avro.codec")
253        .map(|codec| {
254            if let Value::Bytes(ref bytes) = *codec {
255                match std::str::from_utf8(bytes.as_ref()) {
256                    Ok(utf8) => Ok(utf8),
257                    Err(utf8_error) => Err(Error::ConvertToUtf8Error(utf8_error)),
258                }
259            } else {
260                Err(Error::BadCodecMetadata)
261            }
262        })
263        .map(|codec_res| match codec_res {
264            Ok(codec) => match Codec::from_str(codec) {
265                Ok(codec) => match codec {
266                    #[cfg(feature = "bzip")]
267                    Codec::Bzip2(_) => {
268                        use crate::Bzip2Settings;
269                        if let Some(Value::Bytes(bytes)) =
270                            metadata.get("avro.codec.compression_level")
271                        {
272                            Ok(Codec::Bzip2(Bzip2Settings::new(bytes[0])))
273                        } else {
274                            Ok(codec)
275                        }
276                    }
277                    #[cfg(feature = "xz")]
278                    Codec::Xz(_) => {
279                        use crate::XzSettings;
280                        if let Some(Value::Bytes(bytes)) =
281                            metadata.get("avro.codec.compression_level")
282                        {
283                            Ok(Codec::Xz(XzSettings::new(bytes[0])))
284                        } else {
285                            Ok(codec)
286                        }
287                    }
288                    #[cfg(feature = "zstandard")]
289                    Codec::Zstandard(_) => {
290                        use crate::ZstandardSettings;
291                        if let Some(Value::Bytes(bytes)) =
292                            metadata.get("avro.codec.compression_level")
293                        {
294                            Ok(Codec::Zstandard(ZstandardSettings::new(bytes[0])))
295                        } else {
296                            Ok(codec)
297                        }
298                    }
299                    _ => Ok(codec),
300                },
301                Err(_) => Err(Error::CodecNotSupported(codec.to_owned())),
302            },
303            Err(err) => Err(err),
304        });
305
306    result.unwrap_or(Ok(Codec::Null))
307}
308
309/// Main interface for reading Avro formatted values.
310///
311/// To be used as an iterator:
312///
313/// ```no_run
314/// # use apache_avro::Reader;
315/// # use std::io::Cursor;
316/// # let input = Cursor::new(Vec::<u8>::new());
317/// for value in Reader::new(input).unwrap() {
318///     match value {
319///         Ok(v) => println!("{:?}", v),
320///         Err(e) => println!("Error: {}", e),
321///     };
322/// }
323/// ```
324pub struct Reader<'a, R> {
325    block: Block<'a, R>,
326    reader_schema: Option<&'a Schema>,
327    errored: bool,
328    should_resolve_schema: bool,
329}
330
331impl<'a, R: Read> Reader<'a, R> {
332    /// Creates a `Reader` given something implementing the `io::Read` trait to read from.
333    /// No reader `Schema` will be set.
334    ///
335    /// **NOTE** The avro header is going to be read automatically upon creation of the `Reader`.
336    pub fn new(reader: R) -> AvroResult<Reader<'a, R>> {
337        let block = Block::new(reader, vec![])?;
338        let reader = Reader {
339            block,
340            reader_schema: None,
341            errored: false,
342            should_resolve_schema: false,
343        };
344        Ok(reader)
345    }
346
347    /// Creates a `Reader` given a reader `Schema` and something implementing the `io::Read` trait
348    /// to read from.
349    ///
350    /// **NOTE** The avro header is going to be read automatically upon creation of the `Reader`.
351    pub fn with_schema(schema: &'a Schema, reader: R) -> AvroResult<Reader<'a, R>> {
352        let block = Block::new(reader, vec![schema])?;
353        let mut reader = Reader {
354            block,
355            reader_schema: Some(schema),
356            errored: false,
357            should_resolve_schema: false,
358        };
359        // Check if the reader and writer schemas disagree.
360        reader.should_resolve_schema = reader.writer_schema() != schema;
361        Ok(reader)
362    }
363
364    /// Creates a `Reader` given a reader `Schema` and something implementing the `io::Read` trait
365    /// to read from.
366    ///
367    /// **NOTE** The avro header is going to be read automatically upon creation of the `Reader`.
368    pub fn with_schemata(
369        schema: &'a Schema,
370        schemata: Vec<&'a Schema>,
371        reader: R,
372    ) -> AvroResult<Reader<'a, R>> {
373        let block = Block::new(reader, schemata)?;
374        let mut reader = Reader {
375            block,
376            reader_schema: Some(schema),
377            errored: false,
378            should_resolve_schema: false,
379        };
380        // Check if the reader and writer schemas disagree.
381        reader.should_resolve_schema = reader.writer_schema() != schema;
382        Ok(reader)
383    }
384
385    /// Get a reference to the writer `Schema`.
386    #[inline]
387    pub fn writer_schema(&self) -> &Schema {
388        &self.block.writer_schema
389    }
390
391    /// Get a reference to the optional reader `Schema`.
392    #[inline]
393    pub fn reader_schema(&self) -> Option<&Schema> {
394        self.reader_schema
395    }
396
397    /// Get a reference to the user metadata
398    #[inline]
399    pub fn user_metadata(&self) -> &HashMap<String, Vec<u8>> {
400        &self.block.user_metadata
401    }
402
403    #[inline]
404    fn read_next(&mut self) -> AvroResult<Option<Value>> {
405        let read_schema = if self.should_resolve_schema {
406            self.reader_schema
407        } else {
408            None
409        };
410
411        self.block.read_next(read_schema)
412    }
413}
414
415impl<R: Read> Iterator for Reader<'_, R> {
416    type Item = AvroResult<Value>;
417
418    fn next(&mut self) -> Option<Self::Item> {
419        // to prevent keep on reading after the first error occurs
420        if self.errored {
421            return None;
422        };
423        match self.read_next() {
424            Ok(opt) => opt.map(Ok),
425            Err(e) => {
426                self.errored = true;
427                Some(Err(e))
428            }
429        }
430    }
431}
432
433/// Decode a `Value` encoded in Avro format given its `Schema` and anything implementing `io::Read`
434/// to read from.
435///
436/// In case a reader `Schema` is provided, schema resolution will also be performed.
437///
438/// **NOTE** This function has a quite small niche of usage and does NOT take care of reading the
439/// header and consecutive data blocks; use [`Reader`](struct.Reader.html) if you don't know what
440/// you are doing, instead.
441pub fn from_avro_datum<R: Read>(
442    writer_schema: &Schema,
443    reader: &mut R,
444    reader_schema: Option<&Schema>,
445) -> AvroResult<Value> {
446    let value = decode(writer_schema, reader)?;
447    match reader_schema {
448        Some(schema) => value.resolve(schema),
449        None => Ok(value),
450    }
451}
452
453/// Decode a `Value` encoded in Avro format given the provided `Schema` and anything implementing `io::Read`
454/// to read from.
455/// If the writer schema is incomplete, i.e. contains `Schema::Ref`s then it will use the provided
456/// schemata to resolve any dependencies.
457///
458/// In case a reader `Schema` is provided, schema resolution will also be performed.
459pub fn from_avro_datum_schemata<R: Read>(
460    writer_schema: &Schema,
461    writer_schemata: Vec<&Schema>,
462    reader: &mut R,
463    reader_schema: Option<&Schema>,
464) -> AvroResult<Value> {
465    from_avro_datum_reader_schemata(
466        writer_schema,
467        writer_schemata,
468        reader,
469        reader_schema,
470        Vec::with_capacity(0),
471    )
472}
473
474/// Decode a `Value` encoded in Avro format given the provided `Schema` and anything implementing `io::Read`
475/// to read from.
476/// If the writer schema is incomplete, i.e. contains `Schema::Ref`s then it will use the provided
477/// schemata to resolve any dependencies.
478///
479/// In case a reader `Schema` is provided, schema resolution will also be performed.
480pub fn from_avro_datum_reader_schemata<R: Read>(
481    writer_schema: &Schema,
482    writer_schemata: Vec<&Schema>,
483    reader: &mut R,
484    reader_schema: Option<&Schema>,
485    reader_schemata: Vec<&Schema>,
486) -> AvroResult<Value> {
487    let rs = ResolvedSchema::try_from(writer_schemata)?;
488    let value = decode_internal(writer_schema, rs.get_names(), &None, reader)?;
489    match reader_schema {
490        Some(schema) => {
491            if reader_schemata.is_empty() {
492                value.resolve(schema)
493            } else {
494                value.resolve_schemata(schema, reader_schemata)
495            }
496        }
497        None => Ok(value),
498    }
499}
500
501pub struct GenericSingleObjectReader {
502    write_schema: ResolvedOwnedSchema,
503    expected_header: Vec<u8>,
504}
505
506impl GenericSingleObjectReader {
507    pub fn new(schema: Schema) -> AvroResult<GenericSingleObjectReader> {
508        let header_builder = RabinFingerprintHeader::from_schema(&schema);
509        Self::new_with_header_builder(schema, header_builder)
510    }
511
512    pub fn new_with_header_builder<HB: HeaderBuilder>(
513        schema: Schema,
514        header_builder: HB,
515    ) -> AvroResult<GenericSingleObjectReader> {
516        let expected_header = header_builder.build_header();
517        Ok(GenericSingleObjectReader {
518            write_schema: ResolvedOwnedSchema::try_from(schema)?,
519            expected_header,
520        })
521    }
522
523    pub fn read_value<R: Read>(&self, reader: &mut R) -> AvroResult<Value> {
524        let mut header = vec![0; self.expected_header.len()];
525        match reader.read_exact(&mut header) {
526            Ok(_) => {
527                if self.expected_header == header {
528                    decode_internal(
529                        self.write_schema.get_root_schema(),
530                        self.write_schema.get_names(),
531                        &None,
532                        reader,
533                    )
534                } else {
535                    Err(Error::SingleObjectHeaderMismatch(
536                        self.expected_header.clone(),
537                        header,
538                    ))
539                }
540            }
541            Err(io_error) => Err(Error::ReadHeader(io_error)),
542        }
543    }
544}
545
546pub struct SpecificSingleObjectReader<T>
547where
548    T: AvroSchema,
549{
550    inner: GenericSingleObjectReader,
551    _model: PhantomData<T>,
552}
553
554impl<T> SpecificSingleObjectReader<T>
555where
556    T: AvroSchema,
557{
558    pub fn new() -> AvroResult<SpecificSingleObjectReader<T>> {
559        Ok(SpecificSingleObjectReader {
560            inner: GenericSingleObjectReader::new(T::get_schema())?,
561            _model: PhantomData,
562        })
563    }
564}
565
566impl<T> SpecificSingleObjectReader<T>
567where
568    T: AvroSchema + From<Value>,
569{
570    pub fn read_from_value<R: Read>(&self, reader: &mut R) -> AvroResult<T> {
571        self.inner.read_value(reader).map(|v| v.into())
572    }
573}
574
575impl<T> SpecificSingleObjectReader<T>
576where
577    T: AvroSchema + DeserializeOwned,
578{
579    pub fn read<R: Read>(&self, reader: &mut R) -> AvroResult<T> {
580        from_value::<T>(&self.inner.read_value(reader)?)
581    }
582}
583
584/// Reads the marker bytes from Avro bytes generated earlier by a `Writer`
585pub fn read_marker(bytes: &[u8]) -> [u8; 16] {
586    assert!(
587        bytes.len() > 16,
588        "The bytes are too short to read a marker from them"
589    );
590    let mut marker = [0_u8; 16];
591    marker.clone_from_slice(&bytes[(bytes.len() - 16)..]);
592    marker
593}
594
595#[cfg(test)]
596mod tests {
597    use super::*;
598    use crate::{encode::encode, headers::GlueSchemaUuidHeader, rabin::Rabin, types::Record};
599    use apache_avro_test_helper::TestResult;
600    use pretty_assertions::assert_eq;
601    use serde::Deserialize;
602    use std::io::Cursor;
603    use uuid::Uuid;
604
605    const SCHEMA: &str = r#"
606    {
607      "type": "record",
608      "name": "test",
609      "fields": [
610        {
611          "name": "a",
612          "type": "long",
613          "default": 42
614        },
615        {
616          "name": "b",
617          "type": "string"
618        }
619      ]
620    }
621    "#;
622    const UNION_SCHEMA: &str = r#"["null", "long"]"#;
623    const ENCODED: &[u8] = &[
624        79u8, 98u8, 106u8, 1u8, 4u8, 22u8, 97u8, 118u8, 114u8, 111u8, 46u8, 115u8, 99u8, 104u8,
625        101u8, 109u8, 97u8, 222u8, 1u8, 123u8, 34u8, 116u8, 121u8, 112u8, 101u8, 34u8, 58u8, 34u8,
626        114u8, 101u8, 99u8, 111u8, 114u8, 100u8, 34u8, 44u8, 34u8, 110u8, 97u8, 109u8, 101u8, 34u8,
627        58u8, 34u8, 116u8, 101u8, 115u8, 116u8, 34u8, 44u8, 34u8, 102u8, 105u8, 101u8, 108u8,
628        100u8, 115u8, 34u8, 58u8, 91u8, 123u8, 34u8, 110u8, 97u8, 109u8, 101u8, 34u8, 58u8, 34u8,
629        97u8, 34u8, 44u8, 34u8, 116u8, 121u8, 112u8, 101u8, 34u8, 58u8, 34u8, 108u8, 111u8, 110u8,
630        103u8, 34u8, 44u8, 34u8, 100u8, 101u8, 102u8, 97u8, 117u8, 108u8, 116u8, 34u8, 58u8, 52u8,
631        50u8, 125u8, 44u8, 123u8, 34u8, 110u8, 97u8, 109u8, 101u8, 34u8, 58u8, 34u8, 98u8, 34u8,
632        44u8, 34u8, 116u8, 121u8, 112u8, 101u8, 34u8, 58u8, 34u8, 115u8, 116u8, 114u8, 105u8,
633        110u8, 103u8, 34u8, 125u8, 93u8, 125u8, 20u8, 97u8, 118u8, 114u8, 111u8, 46u8, 99u8, 111u8,
634        100u8, 101u8, 99u8, 8u8, 110u8, 117u8, 108u8, 108u8, 0u8, 94u8, 61u8, 54u8, 221u8, 190u8,
635        207u8, 108u8, 180u8, 158u8, 57u8, 114u8, 40u8, 173u8, 199u8, 228u8, 239u8, 4u8, 20u8, 54u8,
636        6u8, 102u8, 111u8, 111u8, 84u8, 6u8, 98u8, 97u8, 114u8, 94u8, 61u8, 54u8, 221u8, 190u8,
637        207u8, 108u8, 180u8, 158u8, 57u8, 114u8, 40u8, 173u8, 199u8, 228u8, 239u8,
638    ];
639
640    #[test]
641    fn test_from_avro_datum() -> TestResult {
642        let schema = Schema::parse_str(SCHEMA)?;
643        let mut encoded: &'static [u8] = &[54, 6, 102, 111, 111];
644
645        let mut record = Record::new(&schema).unwrap();
646        record.put("a", 27i64);
647        record.put("b", "foo");
648        let expected = record.into();
649
650        assert_eq!(from_avro_datum(&schema, &mut encoded, None)?, expected);
651
652        Ok(())
653    }
654
655    #[test]
656    fn test_from_avro_datum_with_union_to_struct() -> TestResult {
657        const TEST_RECORD_SCHEMA_3240: &str = r#"
658    {
659      "type": "record",
660      "name": "test",
661      "fields": [
662        {
663          "name": "a",
664          "type": "long",
665          "default": 42
666        },
667        {
668          "name": "b",
669          "type": "string"
670        },
671        {
672            "name": "a_nullable_array",
673            "type": ["null", {"type": "array", "items": {"type": "string"}}],
674            "default": null
675        },
676        {
677            "name": "a_nullable_boolean",
678            "type": ["null", {"type": "boolean"}],
679            "default": null
680        },
681        {
682            "name": "a_nullable_string",
683            "type": ["null", {"type": "string"}],
684            "default": null
685        }
686      ]
687    }
688    "#;
689        #[derive(Default, Debug, Deserialize, PartialEq, Eq)]
690        struct TestRecord3240 {
691            a: i64,
692            b: String,
693            a_nullable_array: Option<Vec<String>>,
694            // we are missing the 'a_nullable_boolean' field to simulate missing keys
695            // a_nullable_boolean: Option<bool>,
696            a_nullable_string: Option<String>,
697        }
698
699        let schema = Schema::parse_str(TEST_RECORD_SCHEMA_3240)?;
700        let mut encoded: &'static [u8] = &[54, 6, 102, 111, 111];
701
702        let expected_record: TestRecord3240 = TestRecord3240 {
703            a: 27i64,
704            b: String::from("foo"),
705            a_nullable_array: None,
706            a_nullable_string: None,
707        };
708
709        let avro_datum = from_avro_datum(&schema, &mut encoded, None)?;
710        let parsed_record: TestRecord3240 = match &avro_datum {
711            Value::Record(_) => from_value::<TestRecord3240>(&avro_datum)?,
712            unexpected => {
713                panic!("could not map avro data to struct, found unexpected: {unexpected:?}")
714            }
715        };
716
717        assert_eq!(parsed_record, expected_record);
718
719        Ok(())
720    }
721
722    #[test]
723    fn test_null_union() -> TestResult {
724        let schema = Schema::parse_str(UNION_SCHEMA)?;
725        let mut encoded: &'static [u8] = &[2, 0];
726
727        assert_eq!(
728            from_avro_datum(&schema, &mut encoded, None)?,
729            Value::Union(1, Box::new(Value::Long(0)))
730        );
731
732        Ok(())
733    }
734
735    #[test]
736    fn test_reader_iterator() -> TestResult {
737        let schema = Schema::parse_str(SCHEMA)?;
738        let reader = Reader::with_schema(&schema, ENCODED)?;
739
740        let mut record1 = Record::new(&schema).unwrap();
741        record1.put("a", 27i64);
742        record1.put("b", "foo");
743
744        let mut record2 = Record::new(&schema).unwrap();
745        record2.put("a", 42i64);
746        record2.put("b", "bar");
747
748        let expected = [record1.into(), record2.into()];
749
750        for (i, value) in reader.enumerate() {
751            assert_eq!(value?, expected[i]);
752        }
753
754        Ok(())
755    }
756
757    #[test]
758    fn test_reader_invalid_header() -> TestResult {
759        let schema = Schema::parse_str(SCHEMA)?;
760        let invalid = ENCODED.iter().copied().skip(1).collect::<Vec<u8>>();
761        assert!(Reader::with_schema(&schema, &invalid[..]).is_err());
762
763        Ok(())
764    }
765
766    #[test]
767    fn test_reader_invalid_block() -> TestResult {
768        let schema = Schema::parse_str(SCHEMA)?;
769        let invalid = ENCODED
770            .iter()
771            .copied()
772            .rev()
773            .skip(19)
774            .collect::<Vec<u8>>()
775            .into_iter()
776            .rev()
777            .collect::<Vec<u8>>();
778        let reader = Reader::with_schema(&schema, &invalid[..])?;
779        for value in reader {
780            assert!(value.is_err());
781        }
782
783        Ok(())
784    }
785
786    #[test]
787    fn test_reader_empty_buffer() -> TestResult {
788        let empty = Cursor::new(Vec::new());
789        assert!(Reader::new(empty).is_err());
790
791        Ok(())
792    }
793
794    #[test]
795    fn test_reader_only_header() -> TestResult {
796        let invalid = ENCODED.iter().copied().take(165).collect::<Vec<u8>>();
797        let reader = Reader::new(&invalid[..])?;
798        for value in reader {
799            assert!(value.is_err());
800        }
801
802        Ok(())
803    }
804
805    #[test]
806    fn test_avro_3405_read_user_metadata_success() -> TestResult {
807        use crate::writer::Writer;
808
809        let schema = Schema::parse_str(SCHEMA)?;
810        let mut writer = Writer::new(&schema, Vec::new());
811
812        let mut user_meta_data: HashMap<String, Vec<u8>> = HashMap::new();
813        user_meta_data.insert(
814            "stringKey".to_string(),
815            "stringValue".to_string().into_bytes(),
816        );
817        user_meta_data.insert("bytesKey".to_string(), b"bytesValue".to_vec());
818        user_meta_data.insert("vecKey".to_string(), vec![1, 2, 3]);
819
820        for (k, v) in user_meta_data.iter() {
821            writer.add_user_metadata(k.to_string(), v)?;
822        }
823
824        let mut record = Record::new(&schema).unwrap();
825        record.put("a", 27i64);
826        record.put("b", "foo");
827
828        writer.append(record.clone())?;
829        writer.append(record.clone())?;
830        writer.flush()?;
831        let result = writer.into_inner()?;
832
833        let reader = Reader::new(&result[..])?;
834        assert_eq!(reader.user_metadata(), &user_meta_data);
835
836        Ok(())
837    }
838
839    #[derive(Deserialize, Clone, PartialEq, Debug)]
840    struct TestSingleObjectReader {
841        a: i64,
842        b: f64,
843        c: Vec<String>,
844    }
845
846    impl AvroSchema for TestSingleObjectReader {
847        fn get_schema() -> Schema {
848            let schema = r#"
849            {
850                "type":"record",
851                "name":"TestSingleObjectWrtierSerialize",
852                "fields":[
853                    {
854                        "name":"a",
855                        "type":"long"
856                    },
857                    {
858                        "name":"b",
859                        "type":"double"
860                    },
861                    {
862                        "name":"c",
863                        "type":{
864                            "type":"array",
865                            "items":"string"
866                        }
867                    }
868                ]
869            }
870            "#;
871            Schema::parse_str(schema).unwrap()
872        }
873    }
874
875    impl From<Value> for TestSingleObjectReader {
876        fn from(obj: Value) -> TestSingleObjectReader {
877            if let Value::Record(fields) = obj {
878                let mut a = None;
879                let mut b = None;
880                let mut c = vec![];
881                for (field_name, v) in fields {
882                    match (field_name.as_str(), v) {
883                        ("a", Value::Long(i)) => a = Some(i),
884                        ("b", Value::Double(d)) => b = Some(d),
885                        ("c", Value::Array(v)) => {
886                            for inner_val in v {
887                                if let Value::String(s) = inner_val {
888                                    c.push(s);
889                                }
890                            }
891                        }
892                        (key, value) => panic!("Unexpected pair: {key:?} -> {value:?}"),
893                    }
894                }
895                TestSingleObjectReader {
896                    a: a.unwrap(),
897                    b: b.unwrap(),
898                    c,
899                }
900            } else {
901                panic!("Expected a Value::Record but was {obj:?}")
902            }
903        }
904    }
905
906    impl From<TestSingleObjectReader> for Value {
907        fn from(obj: TestSingleObjectReader) -> Value {
908            Value::Record(vec![
909                ("a".into(), obj.a.into()),
910                ("b".into(), obj.b.into()),
911                (
912                    "c".into(),
913                    Value::Array(obj.c.into_iter().map(|s| s.into()).collect()),
914                ),
915            ])
916        }
917    }
918
919    #[test]
920    fn test_avro_3507_single_object_reader() -> TestResult {
921        let obj = TestSingleObjectReader {
922            a: 42,
923            b: 3.33,
924            c: vec!["cat".into(), "dog".into()],
925        };
926        let mut to_read = Vec::<u8>::new();
927        to_read.extend_from_slice(&[0xC3, 0x01]);
928        to_read.extend_from_slice(
929            &TestSingleObjectReader::get_schema()
930                .fingerprint::<Rabin>()
931                .bytes[..],
932        );
933        encode(
934            &obj.clone().into(),
935            &TestSingleObjectReader::get_schema(),
936            &mut to_read,
937        )
938        .expect("Encode should succeed");
939        let mut to_read = &to_read[..];
940        let generic_reader = GenericSingleObjectReader::new(TestSingleObjectReader::get_schema())
941            .expect("Schema should resolve");
942        let val = generic_reader
943            .read_value(&mut to_read)
944            .expect("Should read");
945        let expected_value: Value = obj.into();
946        assert_eq!(expected_value, val);
947
948        Ok(())
949    }
950
951    #[test]
952    fn avro_3642_test_single_object_reader_incomplete_reads() -> TestResult {
953        let obj = TestSingleObjectReader {
954            a: 42,
955            b: 3.33,
956            c: vec!["cat".into(), "dog".into()],
957        };
958        // The two-byte marker, to show that the message uses this single-record format
959        let to_read_1 = [0xC3, 0x01];
960        let mut to_read_2 = Vec::<u8>::new();
961        to_read_2.extend_from_slice(
962            &TestSingleObjectReader::get_schema()
963                .fingerprint::<Rabin>()
964                .bytes[..],
965        );
966        let mut to_read_3 = Vec::<u8>::new();
967        encode(
968            &obj.clone().into(),
969            &TestSingleObjectReader::get_schema(),
970            &mut to_read_3,
971        )
972        .expect("Encode should succeed");
973        let mut to_read = (&to_read_1[..]).chain(&to_read_2[..]).chain(&to_read_3[..]);
974        let generic_reader = GenericSingleObjectReader::new(TestSingleObjectReader::get_schema())
975            .expect("Schema should resolve");
976        let val = generic_reader
977            .read_value(&mut to_read)
978            .expect("Should read");
979        let expected_value: Value = obj.into();
980        assert_eq!(expected_value, val);
981
982        Ok(())
983    }
984
985    #[test]
986    fn test_avro_3507_reader_parity() -> TestResult {
987        let obj = TestSingleObjectReader {
988            a: 42,
989            b: 3.33,
990            c: vec!["cat".into(), "dog".into()],
991        };
992
993        let mut to_read = Vec::<u8>::new();
994        to_read.extend_from_slice(&[0xC3, 0x01]);
995        to_read.extend_from_slice(
996            &TestSingleObjectReader::get_schema()
997                .fingerprint::<Rabin>()
998                .bytes[..],
999        );
1000        encode(
1001            &obj.clone().into(),
1002            &TestSingleObjectReader::get_schema(),
1003            &mut to_read,
1004        )
1005        .expect("Encode should succeed");
1006        let generic_reader = GenericSingleObjectReader::new(TestSingleObjectReader::get_schema())
1007            .expect("Schema should resolve");
1008        let specific_reader = SpecificSingleObjectReader::<TestSingleObjectReader>::new()
1009            .expect("schema should resolve");
1010        let mut to_read1 = &to_read[..];
1011        let mut to_read2 = &to_read[..];
1012        let mut to_read3 = &to_read[..];
1013
1014        let val = generic_reader
1015            .read_value(&mut to_read1)
1016            .expect("Should read");
1017        let read_obj1 = specific_reader
1018            .read_from_value(&mut to_read2)
1019            .expect("Should read from value");
1020        let read_obj2 = specific_reader
1021            .read(&mut to_read3)
1022            .expect("Should read from deserilize");
1023        let expected_value: Value = obj.clone().into();
1024        assert_eq!(obj, read_obj1);
1025        assert_eq!(obj, read_obj2);
1026        assert_eq!(val, expected_value);
1027
1028        Ok(())
1029    }
1030
1031    #[test]
1032    fn avro_rs_164_generic_reader_alternate_header() -> TestResult {
1033        let schema_uuid = Uuid::parse_str("b2f1cf00-0434-013e-439a-125eb8485a5f")?;
1034        let header_builder = GlueSchemaUuidHeader::from_uuid(schema_uuid);
1035        let generic_reader = GenericSingleObjectReader::new_with_header_builder(
1036            TestSingleObjectReader::get_schema(),
1037            header_builder,
1038        )
1039        .expect("failed to build reader");
1040        let data_to_read: Vec<u8> = vec![
1041            3, 0, 178, 241, 207, 0, 4, 52, 1, 62, 67, 154, 18, 94, 184, 72, 90, 95,
1042        ];
1043        let mut to_read = &data_to_read[..];
1044        let read_result = generic_reader.read_value(&mut to_read);
1045        matches!(read_result, Err(crate::Error::ReadBytes(_)));
1046        Ok(())
1047    }
1048
1049    #[cfg(not(feature = "snappy"))]
1050    #[test]
1051    fn test_avro_3549_read_not_enabled_codec() {
1052        let snappy_compressed_avro = vec![
1053            79, 98, 106, 1, 4, 22, 97, 118, 114, 111, 46, 115, 99, 104, 101, 109, 97, 210, 1, 123,
1054            34, 102, 105, 101, 108, 100, 115, 34, 58, 91, 123, 34, 110, 97, 109, 101, 34, 58, 34,
1055            110, 117, 109, 34, 44, 34, 116, 121, 112, 101, 34, 58, 34, 115, 116, 114, 105, 110,
1056            103, 34, 125, 93, 44, 34, 110, 97, 109, 101, 34, 58, 34, 101, 118, 101, 110, 116, 34,
1057            44, 34, 110, 97, 109, 101, 115, 112, 97, 99, 101, 34, 58, 34, 101, 120, 97, 109, 112,
1058            108, 101, 110, 97, 109, 101, 115, 112, 97, 99, 101, 34, 44, 34, 116, 121, 112, 101, 34,
1059            58, 34, 114, 101, 99, 111, 114, 100, 34, 125, 20, 97, 118, 114, 111, 46, 99, 111, 100,
1060            101, 99, 12, 115, 110, 97, 112, 112, 121, 0, 213, 209, 241, 208, 200, 110, 164, 47,
1061            203, 25, 90, 235, 161, 167, 195, 177, 2, 20, 4, 12, 6, 49, 50, 51, 115, 38, 58, 0, 213,
1062            209, 241, 208, 200, 110, 164, 47, 203, 25, 90, 235, 161, 167, 195, 177,
1063        ];
1064
1065        if let Err(err) = Reader::new(snappy_compressed_avro.as_slice()) {
1066            assert_eq!("Codec 'snappy' is not supported/enabled", err.to_string());
1067        } else {
1068            panic!("Expected an error in the reading of the codec!");
1069        }
1070    }
1071}