apache_avro/
reader.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Logic handling reading from Avro format at user level.
19use crate::{
20    decode::{decode, decode_internal},
21    from_value,
22    headers::{HeaderBuilder, RabinFingerprintHeader},
23    schema::{
24        resolve_names, resolve_names_with_schemata, AvroSchema, Names, ResolvedOwnedSchema,
25        ResolvedSchema, Schema,
26    },
27    types::Value,
28    util, AvroResult, Codec, Error,
29};
30use log::warn;
31use serde::de::DeserializeOwned;
32use serde_json::from_slice;
33use std::{
34    collections::HashMap,
35    io::{ErrorKind, Read},
36    marker::PhantomData,
37    str::FromStr,
38};
39
40/// Internal Block reader.
41#[derive(Debug, Clone)]
42struct Block<'r, R> {
43    reader: R,
44    /// Internal buffering to reduce allocation.
45    buf: Vec<u8>,
46    buf_idx: usize,
47    /// Number of elements expected to exist within this block.
48    message_count: usize,
49    marker: [u8; 16],
50    codec: Codec,
51    writer_schema: Schema,
52    schemata: Vec<&'r Schema>,
53    user_metadata: HashMap<String, Vec<u8>>,
54    names_refs: Names,
55}
56
57impl<'r, R: Read> Block<'r, R> {
58    fn new(reader: R, schemata: Vec<&'r Schema>) -> AvroResult<Block<'r, R>> {
59        let mut block = Block {
60            reader,
61            codec: Codec::Null,
62            writer_schema: Schema::Null,
63            schemata,
64            buf: vec![],
65            buf_idx: 0,
66            message_count: 0,
67            marker: [0; 16],
68            user_metadata: Default::default(),
69            names_refs: Default::default(),
70        };
71
72        block.read_header()?;
73        Ok(block)
74    }
75
76    /// Try to read the header and to set the writer `Schema`, the `Codec` and the marker based on
77    /// its content.
78    fn read_header(&mut self) -> AvroResult<()> {
79        let mut buf = [0u8; 4];
80        self.reader
81            .read_exact(&mut buf)
82            .map_err(Error::ReadHeader)?;
83
84        if buf != [b'O', b'b', b'j', 1u8] {
85            return Err(Error::HeaderMagic);
86        }
87
88        let meta_schema = Schema::map(Schema::Bytes);
89        if let Value::Map(metadata) = decode(&meta_schema, &mut self.reader)? {
90            self.read_writer_schema(&metadata)?;
91            self.codec = read_codec(&metadata)?;
92
93            for (key, value) in metadata {
94                if key == "avro.schema"
95                    || key == "avro.codec"
96                    || key == "avro.codec.compression_level"
97                {
98                    // already processed
99                } else if key.starts_with("avro.") {
100                    warn!("Ignoring unknown metadata key: {}", key);
101                } else {
102                    self.read_user_metadata(key, value);
103                }
104            }
105        } else {
106            return Err(Error::GetHeaderMetadata);
107        }
108
109        self.reader
110            .read_exact(&mut self.marker)
111            .map_err(Error::ReadMarker)
112    }
113
114    fn fill_buf(&mut self, n: usize) -> AvroResult<()> {
115        // The buffer needs to contain exactly `n` elements, otherwise codecs will potentially read
116        // invalid bytes.
117        //
118        // The are two cases to handle here:
119        //
120        // 1. `n > self.buf.len()`:
121        //    In this case we call `Vec::resize`, which guarantees that `self.buf.len() == n`.
122        // 2. `n < self.buf.len()`:
123        //    We need to resize to ensure that the buffer len is safe to read `n` elements.
124        //
125        // TODO: Figure out a way to avoid having to truncate for the second case.
126        self.buf.resize(util::safe_len(n)?, 0);
127        self.reader
128            .read_exact(&mut self.buf)
129            .map_err(Error::ReadIntoBuf)?;
130        self.buf_idx = 0;
131        Ok(())
132    }
133
134    /// Try to read a data block, also performing schema resolution for the objects contained in
135    /// the block. The objects are stored in an internal buffer to the `Reader`.
136    fn read_block_next(&mut self) -> AvroResult<()> {
137        assert!(self.is_empty(), "Expected self to be empty!");
138        match util::read_long(&mut self.reader) {
139            Ok(block_len) => {
140                self.message_count = block_len as usize;
141                let block_bytes = util::read_long(&mut self.reader)?;
142                self.fill_buf(block_bytes as usize)?;
143                let mut marker = [0u8; 16];
144                self.reader
145                    .read_exact(&mut marker)
146                    .map_err(Error::ReadBlockMarker)?;
147
148                if marker != self.marker {
149                    return Err(Error::GetBlockMarker);
150                }
151
152                // NOTE (JAB): This doesn't fit this Reader pattern very well.
153                // `self.buf` is a growable buffer that is reused as the reader is iterated.
154                // For non `Codec::Null` variants, `decompress` will allocate a new `Vec`
155                // and replace `buf` with the new one, instead of reusing the same buffer.
156                // We can address this by using some "limited read" type to decode directly
157                // into the buffer. But this is fine, for now.
158                self.codec.decompress(&mut self.buf)
159            }
160            Err(Error::ReadVariableIntegerBytes(io_err)) => {
161                if let ErrorKind::UnexpectedEof = io_err.kind() {
162                    // to not return any error in case we only finished to read cleanly from the stream
163                    Ok(())
164                } else {
165                    Err(Error::ReadVariableIntegerBytes(io_err))
166                }
167            }
168            Err(e) => Err(e),
169        }
170    }
171
172    fn len(&self) -> usize {
173        self.message_count
174    }
175
176    fn is_empty(&self) -> bool {
177        self.len() == 0
178    }
179
180    fn read_next(&mut self, read_schema: Option<&Schema>) -> AvroResult<Option<Value>> {
181        if self.is_empty() {
182            self.read_block_next()?;
183            if self.is_empty() {
184                return Ok(None);
185            }
186        }
187
188        let mut block_bytes = &self.buf[self.buf_idx..];
189        let b_original = block_bytes.len();
190
191        let item = decode_internal(
192            &self.writer_schema,
193            &self.names_refs,
194            &None,
195            &mut block_bytes,
196        )?;
197        let item = match read_schema {
198            Some(schema) => item.resolve(schema)?,
199            None => item,
200        };
201
202        if b_original != 0 && b_original == block_bytes.len() {
203            // from_avro_datum did not consume any bytes, so return an error to avoid an infinite loop
204            return Err(Error::ReadBlock);
205        }
206        self.buf_idx += b_original - block_bytes.len();
207        self.message_count -= 1;
208        Ok(Some(item))
209    }
210
211    fn read_writer_schema(&mut self, metadata: &HashMap<String, Value>) -> AvroResult<()> {
212        let json: serde_json::Value = metadata
213            .get("avro.schema")
214            .and_then(|bytes| {
215                if let Value::Bytes(ref bytes) = *bytes {
216                    from_slice(bytes.as_ref()).ok()
217                } else {
218                    None
219                }
220            })
221            .ok_or(Error::GetAvroSchemaFromMap)?;
222        if !self.schemata.is_empty() {
223            let rs = ResolvedSchema::try_from(self.schemata.clone())?;
224            let names: Names = rs
225                .get_names()
226                .iter()
227                .map(|(name, schema)| (name.clone(), (*schema).clone()))
228                .collect();
229            self.writer_schema = Schema::parse_with_names(&json, names)?;
230            resolve_names_with_schemata(&self.schemata, &mut self.names_refs, &None)?;
231        } else {
232            self.writer_schema = Schema::parse(&json)?;
233            resolve_names(&self.writer_schema, &mut self.names_refs, &None)?;
234        }
235        Ok(())
236    }
237
238    fn read_user_metadata(&mut self, key: String, value: Value) {
239        match value {
240            Value::Bytes(ref vec) => {
241                self.user_metadata.insert(key, vec.clone());
242            }
243            wrong => {
244                warn!(
245                    "User metadata values must be Value::Bytes, found {:?}",
246                    wrong
247                );
248            }
249        }
250    }
251}
252
253fn read_codec(metadata: &HashMap<String, Value>) -> AvroResult<Codec> {
254    let result = metadata
255        .get("avro.codec")
256        .map(|codec| {
257            if let Value::Bytes(ref bytes) = *codec {
258                match std::str::from_utf8(bytes.as_ref()) {
259                    Ok(utf8) => Ok(utf8),
260                    Err(utf8_error) => Err(Error::ConvertToUtf8Error(utf8_error)),
261                }
262            } else {
263                Err(Error::BadCodecMetadata)
264            }
265        })
266        .map(|codec_res| match codec_res {
267            Ok(codec) => match Codec::from_str(codec) {
268                Ok(codec) => match codec {
269                    #[cfg(feature = "bzip")]
270                    Codec::Bzip2(_) => {
271                        use crate::Bzip2Settings;
272                        if let Some(Value::Bytes(bytes)) =
273                            metadata.get("avro.codec.compression_level")
274                        {
275                            Ok(Codec::Bzip2(Bzip2Settings::new(bytes[0])))
276                        } else {
277                            Ok(codec)
278                        }
279                    }
280                    #[cfg(feature = "xz")]
281                    Codec::Xz(_) => {
282                        use crate::XzSettings;
283                        if let Some(Value::Bytes(bytes)) =
284                            metadata.get("avro.codec.compression_level")
285                        {
286                            Ok(Codec::Xz(XzSettings::new(bytes[0])))
287                        } else {
288                            Ok(codec)
289                        }
290                    }
291                    #[cfg(feature = "zstandard")]
292                    Codec::Zstandard(_) => {
293                        use crate::ZstandardSettings;
294                        if let Some(Value::Bytes(bytes)) =
295                            metadata.get("avro.codec.compression_level")
296                        {
297                            Ok(Codec::Zstandard(ZstandardSettings::new(bytes[0])))
298                        } else {
299                            Ok(codec)
300                        }
301                    }
302                    _ => Ok(codec),
303                },
304                Err(_) => Err(Error::CodecNotSupported(codec.to_owned())),
305            },
306            Err(err) => Err(err),
307        });
308
309    result.unwrap_or(Ok(Codec::Null))
310}
311
312/// Main interface for reading Avro formatted values.
313///
314/// To be used as an iterator:
315///
316/// ```no_run
317/// # use apache_avro::Reader;
318/// # use std::io::Cursor;
319/// # let input = Cursor::new(Vec::<u8>::new());
320/// for value in Reader::new(input).unwrap() {
321///     match value {
322///         Ok(v) => println!("{:?}", v),
323///         Err(e) => println!("Error: {}", e),
324///     };
325/// }
326/// ```
327pub struct Reader<'a, R> {
328    block: Block<'a, R>,
329    reader_schema: Option<&'a Schema>,
330    errored: bool,
331    should_resolve_schema: bool,
332}
333
334impl<'a, R: Read> Reader<'a, R> {
335    /// Creates a `Reader` given something implementing the `io::Read` trait to read from.
336    /// No reader `Schema` will be set.
337    ///
338    /// **NOTE** The avro header is going to be read automatically upon creation of the `Reader`.
339    pub fn new(reader: R) -> AvroResult<Reader<'a, R>> {
340        let block = Block::new(reader, vec![])?;
341        let reader = Reader {
342            block,
343            reader_schema: None,
344            errored: false,
345            should_resolve_schema: false,
346        };
347        Ok(reader)
348    }
349
350    /// Creates a `Reader` given a reader `Schema` and something implementing the `io::Read` trait
351    /// to read from.
352    ///
353    /// **NOTE** The avro header is going to be read automatically upon creation of the `Reader`.
354    pub fn with_schema(schema: &'a Schema, reader: R) -> AvroResult<Reader<'a, R>> {
355        let block = Block::new(reader, vec![schema])?;
356        let mut reader = Reader {
357            block,
358            reader_schema: Some(schema),
359            errored: false,
360            should_resolve_schema: false,
361        };
362        // Check if the reader and writer schemas disagree.
363        reader.should_resolve_schema = reader.writer_schema() != schema;
364        Ok(reader)
365    }
366
367    /// Creates a `Reader` given a reader `Schema` and something implementing the `io::Read` trait
368    /// to read from.
369    ///
370    /// **NOTE** The avro header is going to be read automatically upon creation of the `Reader`.
371    pub fn with_schemata(
372        schema: &'a Schema,
373        schemata: Vec<&'a Schema>,
374        reader: R,
375    ) -> AvroResult<Reader<'a, R>> {
376        let block = Block::new(reader, schemata)?;
377        let mut reader = Reader {
378            block,
379            reader_schema: Some(schema),
380            errored: false,
381            should_resolve_schema: false,
382        };
383        // Check if the reader and writer schemas disagree.
384        reader.should_resolve_schema = reader.writer_schema() != schema;
385        Ok(reader)
386    }
387
388    /// Get a reference to the writer `Schema`.
389    #[inline]
390    pub fn writer_schema(&self) -> &Schema {
391        &self.block.writer_schema
392    }
393
394    /// Get a reference to the optional reader `Schema`.
395    #[inline]
396    pub fn reader_schema(&self) -> Option<&Schema> {
397        self.reader_schema
398    }
399
400    /// Get a reference to the user metadata
401    #[inline]
402    pub fn user_metadata(&self) -> &HashMap<String, Vec<u8>> {
403        &self.block.user_metadata
404    }
405
406    #[inline]
407    fn read_next(&mut self) -> AvroResult<Option<Value>> {
408        let read_schema = if self.should_resolve_schema {
409            self.reader_schema
410        } else {
411            None
412        };
413
414        self.block.read_next(read_schema)
415    }
416}
417
418impl<R: Read> Iterator for Reader<'_, R> {
419    type Item = AvroResult<Value>;
420
421    fn next(&mut self) -> Option<Self::Item> {
422        // to prevent keep on reading after the first error occurs
423        if self.errored {
424            return None;
425        };
426        match self.read_next() {
427            Ok(opt) => opt.map(Ok),
428            Err(e) => {
429                self.errored = true;
430                Some(Err(e))
431            }
432        }
433    }
434}
435
436/// Decode a `Value` encoded in Avro format given its `Schema` and anything implementing `io::Read`
437/// to read from.
438///
439/// In case a reader `Schema` is provided, schema resolution will also be performed.
440///
441/// **NOTE** This function has a quite small niche of usage and does NOT take care of reading the
442/// header and consecutive data blocks; use [`Reader`](struct.Reader.html) if you don't know what
443/// you are doing, instead.
444pub fn from_avro_datum<R: Read>(
445    writer_schema: &Schema,
446    reader: &mut R,
447    reader_schema: Option<&Schema>,
448) -> AvroResult<Value> {
449    let value = decode(writer_schema, reader)?;
450    match reader_schema {
451        Some(schema) => value.resolve(schema),
452        None => Ok(value),
453    }
454}
455
456/// Decode a `Value` encoded in Avro format given the provided `Schema` and anything implementing `io::Read`
457/// to read from.
458/// If the writer schema is incomplete, i.e. contains `Schema::Ref`s then it will use the provided
459/// schemata to resolve any dependencies.
460///
461/// In case a reader `Schema` is provided, schema resolution will also be performed.
462pub fn from_avro_datum_schemata<R: Read>(
463    writer_schema: &Schema,
464    writer_schemata: Vec<&Schema>,
465    reader: &mut R,
466    reader_schema: Option<&Schema>,
467) -> AvroResult<Value> {
468    from_avro_datum_reader_schemata(
469        writer_schema,
470        writer_schemata,
471        reader,
472        reader_schema,
473        Vec::with_capacity(0),
474    )
475}
476
477/// Decode a `Value` encoded in Avro format given the provided `Schema` and anything implementing `io::Read`
478/// to read from.
479/// If the writer schema is incomplete, i.e. contains `Schema::Ref`s then it will use the provided
480/// schemata to resolve any dependencies.
481///
482/// In case a reader `Schema` is provided, schema resolution will also be performed.
483pub fn from_avro_datum_reader_schemata<R: Read>(
484    writer_schema: &Schema,
485    writer_schemata: Vec<&Schema>,
486    reader: &mut R,
487    reader_schema: Option<&Schema>,
488    reader_schemata: Vec<&Schema>,
489) -> AvroResult<Value> {
490    let rs = ResolvedSchema::try_from(writer_schemata)?;
491    let value = decode_internal(writer_schema, rs.get_names(), &None, reader)?;
492    match reader_schema {
493        Some(schema) => {
494            if reader_schemata.is_empty() {
495                value.resolve(schema)
496            } else {
497                value.resolve_schemata(schema, reader_schemata)
498            }
499        }
500        None => Ok(value),
501    }
502}
503
504pub struct GenericSingleObjectReader {
505    write_schema: ResolvedOwnedSchema,
506    expected_header: Vec<u8>,
507}
508
509impl GenericSingleObjectReader {
510    pub fn new(schema: Schema) -> AvroResult<GenericSingleObjectReader> {
511        let header_builder = RabinFingerprintHeader::from_schema(&schema);
512        Self::new_with_header_builder(schema, header_builder)
513    }
514
515    pub fn new_with_header_builder<HB: HeaderBuilder>(
516        schema: Schema,
517        header_builder: HB,
518    ) -> AvroResult<GenericSingleObjectReader> {
519        let expected_header = header_builder.build_header();
520        Ok(GenericSingleObjectReader {
521            write_schema: ResolvedOwnedSchema::try_from(schema)?,
522            expected_header,
523        })
524    }
525
526    pub fn read_value<R: Read>(&self, reader: &mut R) -> AvroResult<Value> {
527        let mut header = vec![0; self.expected_header.len()];
528        match reader.read_exact(&mut header) {
529            Ok(_) => {
530                if self.expected_header == header {
531                    decode_internal(
532                        self.write_schema.get_root_schema(),
533                        self.write_schema.get_names(),
534                        &None,
535                        reader,
536                    )
537                } else {
538                    Err(Error::SingleObjectHeaderMismatch(
539                        self.expected_header.clone(),
540                        header,
541                    ))
542                }
543            }
544            Err(io_error) => Err(Error::ReadHeader(io_error)),
545        }
546    }
547}
548
549pub struct SpecificSingleObjectReader<T>
550where
551    T: AvroSchema,
552{
553    inner: GenericSingleObjectReader,
554    _model: PhantomData<T>,
555}
556
557impl<T> SpecificSingleObjectReader<T>
558where
559    T: AvroSchema,
560{
561    pub fn new() -> AvroResult<SpecificSingleObjectReader<T>> {
562        Ok(SpecificSingleObjectReader {
563            inner: GenericSingleObjectReader::new(T::get_schema())?,
564            _model: PhantomData,
565        })
566    }
567}
568
569impl<T> SpecificSingleObjectReader<T>
570where
571    T: AvroSchema + From<Value>,
572{
573    pub fn read_from_value<R: Read>(&self, reader: &mut R) -> AvroResult<T> {
574        self.inner.read_value(reader).map(|v| v.into())
575    }
576}
577
578impl<T> SpecificSingleObjectReader<T>
579where
580    T: AvroSchema + DeserializeOwned,
581{
582    pub fn read<R: Read>(&self, reader: &mut R) -> AvroResult<T> {
583        from_value::<T>(&self.inner.read_value(reader)?)
584    }
585}
586
587/// Reads the marker bytes from Avro bytes generated earlier by a `Writer`
588pub fn read_marker(bytes: &[u8]) -> [u8; 16] {
589    assert!(
590        bytes.len() > 16,
591        "The bytes are too short to read a marker from them"
592    );
593    let mut marker = [0_u8; 16];
594    marker.clone_from_slice(&bytes[(bytes.len() - 16)..]);
595    marker
596}
597
598#[cfg(test)]
599mod tests {
600    use super::*;
601    use crate::{encode::encode, headers::GlueSchemaUuidHeader, rabin::Rabin, types::Record};
602    use apache_avro_test_helper::TestResult;
603    use pretty_assertions::assert_eq;
604    use serde::Deserialize;
605    use std::io::Cursor;
606    use uuid::Uuid;
607
608    const SCHEMA: &str = r#"
609    {
610      "type": "record",
611      "name": "test",
612      "fields": [
613        {
614          "name": "a",
615          "type": "long",
616          "default": 42
617        },
618        {
619          "name": "b",
620          "type": "string"
621        }
622      ]
623    }
624    "#;
625    const UNION_SCHEMA: &str = r#"["null", "long"]"#;
626    const ENCODED: &[u8] = &[
627        79u8, 98u8, 106u8, 1u8, 4u8, 22u8, 97u8, 118u8, 114u8, 111u8, 46u8, 115u8, 99u8, 104u8,
628        101u8, 109u8, 97u8, 222u8, 1u8, 123u8, 34u8, 116u8, 121u8, 112u8, 101u8, 34u8, 58u8, 34u8,
629        114u8, 101u8, 99u8, 111u8, 114u8, 100u8, 34u8, 44u8, 34u8, 110u8, 97u8, 109u8, 101u8, 34u8,
630        58u8, 34u8, 116u8, 101u8, 115u8, 116u8, 34u8, 44u8, 34u8, 102u8, 105u8, 101u8, 108u8,
631        100u8, 115u8, 34u8, 58u8, 91u8, 123u8, 34u8, 110u8, 97u8, 109u8, 101u8, 34u8, 58u8, 34u8,
632        97u8, 34u8, 44u8, 34u8, 116u8, 121u8, 112u8, 101u8, 34u8, 58u8, 34u8, 108u8, 111u8, 110u8,
633        103u8, 34u8, 44u8, 34u8, 100u8, 101u8, 102u8, 97u8, 117u8, 108u8, 116u8, 34u8, 58u8, 52u8,
634        50u8, 125u8, 44u8, 123u8, 34u8, 110u8, 97u8, 109u8, 101u8, 34u8, 58u8, 34u8, 98u8, 34u8,
635        44u8, 34u8, 116u8, 121u8, 112u8, 101u8, 34u8, 58u8, 34u8, 115u8, 116u8, 114u8, 105u8,
636        110u8, 103u8, 34u8, 125u8, 93u8, 125u8, 20u8, 97u8, 118u8, 114u8, 111u8, 46u8, 99u8, 111u8,
637        100u8, 101u8, 99u8, 8u8, 110u8, 117u8, 108u8, 108u8, 0u8, 94u8, 61u8, 54u8, 221u8, 190u8,
638        207u8, 108u8, 180u8, 158u8, 57u8, 114u8, 40u8, 173u8, 199u8, 228u8, 239u8, 4u8, 20u8, 54u8,
639        6u8, 102u8, 111u8, 111u8, 84u8, 6u8, 98u8, 97u8, 114u8, 94u8, 61u8, 54u8, 221u8, 190u8,
640        207u8, 108u8, 180u8, 158u8, 57u8, 114u8, 40u8, 173u8, 199u8, 228u8, 239u8,
641    ];
642
643    #[test]
644    fn test_from_avro_datum() -> TestResult {
645        let schema = Schema::parse_str(SCHEMA)?;
646        let mut encoded: &'static [u8] = &[54, 6, 102, 111, 111];
647
648        let mut record = Record::new(&schema).unwrap();
649        record.put("a", 27i64);
650        record.put("b", "foo");
651        let expected = record.into();
652
653        assert_eq!(from_avro_datum(&schema, &mut encoded, None)?, expected);
654
655        Ok(())
656    }
657
658    #[test]
659    fn test_from_avro_datum_with_union_to_struct() -> TestResult {
660        const TEST_RECORD_SCHEMA_3240: &str = r#"
661    {
662      "type": "record",
663      "name": "test",
664      "fields": [
665        {
666          "name": "a",
667          "type": "long",
668          "default": 42
669        },
670        {
671          "name": "b",
672          "type": "string"
673        },
674        {
675            "name": "a_nullable_array",
676            "type": ["null", {"type": "array", "items": {"type": "string"}}],
677            "default": null
678        },
679        {
680            "name": "a_nullable_boolean",
681            "type": ["null", {"type": "boolean"}],
682            "default": null
683        },
684        {
685            "name": "a_nullable_string",
686            "type": ["null", {"type": "string"}],
687            "default": null
688        }
689      ]
690    }
691    "#;
692        #[derive(Default, Debug, Deserialize, PartialEq, Eq)]
693        struct TestRecord3240 {
694            a: i64,
695            b: String,
696            a_nullable_array: Option<Vec<String>>,
697            // we are missing the 'a_nullable_boolean' field to simulate missing keys
698            // a_nullable_boolean: Option<bool>,
699            a_nullable_string: Option<String>,
700        }
701
702        let schema = Schema::parse_str(TEST_RECORD_SCHEMA_3240)?;
703        let mut encoded: &'static [u8] = &[54, 6, 102, 111, 111];
704
705        let expected_record: TestRecord3240 = TestRecord3240 {
706            a: 27i64,
707            b: String::from("foo"),
708            a_nullable_array: None,
709            a_nullable_string: None,
710        };
711
712        let avro_datum = from_avro_datum(&schema, &mut encoded, None)?;
713        let parsed_record: TestRecord3240 = match &avro_datum {
714            Value::Record(_) => from_value::<TestRecord3240>(&avro_datum)?,
715            unexpected => {
716                panic!("could not map avro data to struct, found unexpected: {unexpected:?}")
717            }
718        };
719
720        assert_eq!(parsed_record, expected_record);
721
722        Ok(())
723    }
724
725    #[test]
726    fn test_null_union() -> TestResult {
727        let schema = Schema::parse_str(UNION_SCHEMA)?;
728        let mut encoded: &'static [u8] = &[2, 0];
729
730        assert_eq!(
731            from_avro_datum(&schema, &mut encoded, None)?,
732            Value::Union(1, Box::new(Value::Long(0)))
733        );
734
735        Ok(())
736    }
737
738    #[test]
739    fn test_reader_iterator() -> TestResult {
740        let schema = Schema::parse_str(SCHEMA)?;
741        let reader = Reader::with_schema(&schema, ENCODED)?;
742
743        let mut record1 = Record::new(&schema).unwrap();
744        record1.put("a", 27i64);
745        record1.put("b", "foo");
746
747        let mut record2 = Record::new(&schema).unwrap();
748        record2.put("a", 42i64);
749        record2.put("b", "bar");
750
751        let expected = [record1.into(), record2.into()];
752
753        for (i, value) in reader.enumerate() {
754            assert_eq!(value?, expected[i]);
755        }
756
757        Ok(())
758    }
759
760    #[test]
761    fn test_reader_invalid_header() -> TestResult {
762        let schema = Schema::parse_str(SCHEMA)?;
763        let invalid = ENCODED.iter().copied().skip(1).collect::<Vec<u8>>();
764        assert!(Reader::with_schema(&schema, &invalid[..]).is_err());
765
766        Ok(())
767    }
768
769    #[test]
770    fn test_reader_invalid_block() -> TestResult {
771        let schema = Schema::parse_str(SCHEMA)?;
772        let invalid = ENCODED
773            .iter()
774            .copied()
775            .rev()
776            .skip(19)
777            .collect::<Vec<u8>>()
778            .into_iter()
779            .rev()
780            .collect::<Vec<u8>>();
781        let reader = Reader::with_schema(&schema, &invalid[..])?;
782        for value in reader {
783            assert!(value.is_err());
784        }
785
786        Ok(())
787    }
788
789    #[test]
790    fn test_reader_empty_buffer() -> TestResult {
791        let empty = Cursor::new(Vec::new());
792        assert!(Reader::new(empty).is_err());
793
794        Ok(())
795    }
796
797    #[test]
798    fn test_reader_only_header() -> TestResult {
799        let invalid = ENCODED.iter().copied().take(165).collect::<Vec<u8>>();
800        let reader = Reader::new(&invalid[..])?;
801        for value in reader {
802            assert!(value.is_err());
803        }
804
805        Ok(())
806    }
807
808    #[test]
809    fn test_avro_3405_read_user_metadata_success() -> TestResult {
810        use crate::writer::Writer;
811
812        let schema = Schema::parse_str(SCHEMA)?;
813        let mut writer = Writer::new(&schema, Vec::new());
814
815        let mut user_meta_data: HashMap<String, Vec<u8>> = HashMap::new();
816        user_meta_data.insert(
817            "stringKey".to_string(),
818            "stringValue".to_string().into_bytes(),
819        );
820        user_meta_data.insert("bytesKey".to_string(), b"bytesValue".to_vec());
821        user_meta_data.insert("vecKey".to_string(), vec![1, 2, 3]);
822
823        for (k, v) in user_meta_data.iter() {
824            writer.add_user_metadata(k.to_string(), v)?;
825        }
826
827        let mut record = Record::new(&schema).unwrap();
828        record.put("a", 27i64);
829        record.put("b", "foo");
830
831        writer.append(record.clone())?;
832        writer.append(record.clone())?;
833        writer.flush()?;
834        let result = writer.into_inner()?;
835
836        let reader = Reader::new(&result[..])?;
837        assert_eq!(reader.user_metadata(), &user_meta_data);
838
839        Ok(())
840    }
841
842    #[derive(Deserialize, Clone, PartialEq, Debug)]
843    struct TestSingleObjectReader {
844        a: i64,
845        b: f64,
846        c: Vec<String>,
847    }
848
849    impl AvroSchema for TestSingleObjectReader {
850        fn get_schema() -> Schema {
851            let schema = r#"
852            {
853                "type":"record",
854                "name":"TestSingleObjectWrtierSerialize",
855                "fields":[
856                    {
857                        "name":"a",
858                        "type":"long"
859                    },
860                    {
861                        "name":"b",
862                        "type":"double"
863                    },
864                    {
865                        "name":"c",
866                        "type":{
867                            "type":"array",
868                            "items":"string"
869                        }
870                    }
871                ]
872            }
873            "#;
874            Schema::parse_str(schema).unwrap()
875        }
876    }
877
878    impl From<Value> for TestSingleObjectReader {
879        fn from(obj: Value) -> TestSingleObjectReader {
880            if let Value::Record(fields) = obj {
881                let mut a = None;
882                let mut b = None;
883                let mut c = vec![];
884                for (field_name, v) in fields {
885                    match (field_name.as_str(), v) {
886                        ("a", Value::Long(i)) => a = Some(i),
887                        ("b", Value::Double(d)) => b = Some(d),
888                        ("c", Value::Array(v)) => {
889                            for inner_val in v {
890                                if let Value::String(s) = inner_val {
891                                    c.push(s);
892                                }
893                            }
894                        }
895                        (key, value) => panic!("Unexpected pair: {key:?} -> {value:?}"),
896                    }
897                }
898                TestSingleObjectReader {
899                    a: a.unwrap(),
900                    b: b.unwrap(),
901                    c,
902                }
903            } else {
904                panic!("Expected a Value::Record but was {obj:?}")
905            }
906        }
907    }
908
909    impl From<TestSingleObjectReader> for Value {
910        fn from(obj: TestSingleObjectReader) -> Value {
911            Value::Record(vec![
912                ("a".into(), obj.a.into()),
913                ("b".into(), obj.b.into()),
914                (
915                    "c".into(),
916                    Value::Array(obj.c.into_iter().map(|s| s.into()).collect()),
917                ),
918            ])
919        }
920    }
921
922    #[test]
923    fn test_avro_3507_single_object_reader() -> TestResult {
924        let obj = TestSingleObjectReader {
925            a: 42,
926            b: 3.33,
927            c: vec!["cat".into(), "dog".into()],
928        };
929        let mut to_read = Vec::<u8>::new();
930        to_read.extend_from_slice(&[0xC3, 0x01]);
931        to_read.extend_from_slice(
932            &TestSingleObjectReader::get_schema()
933                .fingerprint::<Rabin>()
934                .bytes[..],
935        );
936        encode(
937            &obj.clone().into(),
938            &TestSingleObjectReader::get_schema(),
939            &mut to_read,
940        )
941        .expect("Encode should succeed");
942        let mut to_read = &to_read[..];
943        let generic_reader = GenericSingleObjectReader::new(TestSingleObjectReader::get_schema())
944            .expect("Schema should resolve");
945        let val = generic_reader
946            .read_value(&mut to_read)
947            .expect("Should read");
948        let expected_value: Value = obj.into();
949        assert_eq!(expected_value, val);
950
951        Ok(())
952    }
953
954    #[test]
955    fn avro_3642_test_single_object_reader_incomplete_reads() -> TestResult {
956        let obj = TestSingleObjectReader {
957            a: 42,
958            b: 3.33,
959            c: vec!["cat".into(), "dog".into()],
960        };
961        // The two-byte marker, to show that the message uses this single-record format
962        let to_read_1 = [0xC3, 0x01];
963        let mut to_read_2 = Vec::<u8>::new();
964        to_read_2.extend_from_slice(
965            &TestSingleObjectReader::get_schema()
966                .fingerprint::<Rabin>()
967                .bytes[..],
968        );
969        let mut to_read_3 = Vec::<u8>::new();
970        encode(
971            &obj.clone().into(),
972            &TestSingleObjectReader::get_schema(),
973            &mut to_read_3,
974        )
975        .expect("Encode should succeed");
976        let mut to_read = (&to_read_1[..]).chain(&to_read_2[..]).chain(&to_read_3[..]);
977        let generic_reader = GenericSingleObjectReader::new(TestSingleObjectReader::get_schema())
978            .expect("Schema should resolve");
979        let val = generic_reader
980            .read_value(&mut to_read)
981            .expect("Should read");
982        let expected_value: Value = obj.into();
983        assert_eq!(expected_value, val);
984
985        Ok(())
986    }
987
988    #[test]
989    fn test_avro_3507_reader_parity() -> TestResult {
990        let obj = TestSingleObjectReader {
991            a: 42,
992            b: 3.33,
993            c: vec!["cat".into(), "dog".into()],
994        };
995
996        let mut to_read = Vec::<u8>::new();
997        to_read.extend_from_slice(&[0xC3, 0x01]);
998        to_read.extend_from_slice(
999            &TestSingleObjectReader::get_schema()
1000                .fingerprint::<Rabin>()
1001                .bytes[..],
1002        );
1003        encode(
1004            &obj.clone().into(),
1005            &TestSingleObjectReader::get_schema(),
1006            &mut to_read,
1007        )
1008        .expect("Encode should succeed");
1009        let generic_reader = GenericSingleObjectReader::new(TestSingleObjectReader::get_schema())
1010            .expect("Schema should resolve");
1011        let specific_reader = SpecificSingleObjectReader::<TestSingleObjectReader>::new()
1012            .expect("schema should resolve");
1013        let mut to_read1 = &to_read[..];
1014        let mut to_read2 = &to_read[..];
1015        let mut to_read3 = &to_read[..];
1016
1017        let val = generic_reader
1018            .read_value(&mut to_read1)
1019            .expect("Should read");
1020        let read_obj1 = specific_reader
1021            .read_from_value(&mut to_read2)
1022            .expect("Should read from value");
1023        let read_obj2 = specific_reader
1024            .read(&mut to_read3)
1025            .expect("Should read from deserilize");
1026        let expected_value: Value = obj.clone().into();
1027        assert_eq!(obj, read_obj1);
1028        assert_eq!(obj, read_obj2);
1029        assert_eq!(val, expected_value);
1030
1031        Ok(())
1032    }
1033
1034    #[test]
1035    fn avro_rs_164_generic_reader_alternate_header() -> TestResult {
1036        let schema_uuid = Uuid::parse_str("b2f1cf00-0434-013e-439a-125eb8485a5f")?;
1037        let header_builder = GlueSchemaUuidHeader::from_uuid(schema_uuid);
1038        let generic_reader = GenericSingleObjectReader::new_with_header_builder(
1039            TestSingleObjectReader::get_schema(),
1040            header_builder,
1041        )
1042        .expect("failed to build reader");
1043        let data_to_read: Vec<u8> = vec![
1044            3, 0, 178, 241, 207, 0, 4, 52, 1, 62, 67, 154, 18, 94, 184, 72, 90, 95,
1045        ];
1046        let mut to_read = &data_to_read[..];
1047        let read_result = generic_reader.read_value(&mut to_read);
1048        matches!(read_result, Err(crate::Error::ReadBytes(_)));
1049        Ok(())
1050    }
1051
1052    #[cfg(not(feature = "snappy"))]
1053    #[test]
1054    fn test_avro_3549_read_not_enabled_codec() {
1055        let snappy_compressed_avro = vec![
1056            79, 98, 106, 1, 4, 22, 97, 118, 114, 111, 46, 115, 99, 104, 101, 109, 97, 210, 1, 123,
1057            34, 102, 105, 101, 108, 100, 115, 34, 58, 91, 123, 34, 110, 97, 109, 101, 34, 58, 34,
1058            110, 117, 109, 34, 44, 34, 116, 121, 112, 101, 34, 58, 34, 115, 116, 114, 105, 110,
1059            103, 34, 125, 93, 44, 34, 110, 97, 109, 101, 34, 58, 34, 101, 118, 101, 110, 116, 34,
1060            44, 34, 110, 97, 109, 101, 115, 112, 97, 99, 101, 34, 58, 34, 101, 120, 97, 109, 112,
1061            108, 101, 110, 97, 109, 101, 115, 112, 97, 99, 101, 34, 44, 34, 116, 121, 112, 101, 34,
1062            58, 34, 114, 101, 99, 111, 114, 100, 34, 125, 20, 97, 118, 114, 111, 46, 99, 111, 100,
1063            101, 99, 12, 115, 110, 97, 112, 112, 121, 0, 213, 209, 241, 208, 200, 110, 164, 47,
1064            203, 25, 90, 235, 161, 167, 195, 177, 2, 20, 4, 12, 6, 49, 50, 51, 115, 38, 58, 0, 213,
1065            209, 241, 208, 200, 110, 164, 47, 203, 25, 90, 235, 161, 167, 195, 177,
1066        ];
1067
1068        if let Err(err) = Reader::new(snappy_compressed_avro.as_slice()) {
1069            assert_eq!("Codec 'snappy' is not supported/enabled", err.to_string());
1070        } else {
1071            panic!("Expected an error in the reading of the codec!");
1072        }
1073    }
1074}