apache_avro/
reader.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Logic handling reading from Avro format at user level.
19use crate::{
20    decode::{decode, decode_internal},
21    from_value,
22    rabin::Rabin,
23    schema::{
24        resolve_names, resolve_names_with_schemata, AvroSchema, Names, ResolvedOwnedSchema,
25        ResolvedSchema, Schema,
26    },
27    types::Value,
28    util, AvroResult, Codec, Error,
29};
30use log::warn;
31use serde::de::DeserializeOwned;
32use serde_json::from_slice;
33use std::{
34    collections::HashMap,
35    io::{ErrorKind, Read},
36    marker::PhantomData,
37    str::FromStr,
38};
39
40/// Internal Block reader.
41#[derive(Debug, Clone)]
42struct Block<'r, R> {
43    reader: R,
44    /// Internal buffering to reduce allocation.
45    buf: Vec<u8>,
46    buf_idx: usize,
47    /// Number of elements expected to exist within this block.
48    message_count: usize,
49    marker: [u8; 16],
50    codec: Codec,
51    writer_schema: Schema,
52    schemata: Vec<&'r Schema>,
53    user_metadata: HashMap<String, Vec<u8>>,
54    names_refs: Names,
55}
56
57impl<'r, R: Read> Block<'r, R> {
58    fn new(reader: R, schemata: Vec<&'r Schema>) -> AvroResult<Block<'r, R>> {
59        let mut block = Block {
60            reader,
61            codec: Codec::Null,
62            writer_schema: Schema::Null,
63            schemata,
64            buf: vec![],
65            buf_idx: 0,
66            message_count: 0,
67            marker: [0; 16],
68            user_metadata: Default::default(),
69            names_refs: Default::default(),
70        };
71
72        block.read_header()?;
73        Ok(block)
74    }
75
76    /// Try to read the header and to set the writer `Schema`, the `Codec` and the marker based on
77    /// its content.
78    fn read_header(&mut self) -> AvroResult<()> {
79        let mut buf = [0u8; 4];
80        self.reader
81            .read_exact(&mut buf)
82            .map_err(Error::ReadHeader)?;
83
84        if buf != [b'O', b'b', b'j', 1u8] {
85            return Err(Error::HeaderMagic);
86        }
87
88        let meta_schema = Schema::map(Schema::Bytes);
89        if let Value::Map(metadata) = decode(&meta_schema, &mut self.reader)? {
90            self.read_writer_schema(&metadata)?;
91            self.codec = read_codec(&metadata)?;
92
93            for (key, value) in metadata {
94                if key == "avro.schema"
95                    || key == "avro.codec"
96                    || key == "avro.codec.compression_level"
97                {
98                    // already processed
99                } else if key.starts_with("avro.") {
100                    warn!("Ignoring unknown metadata key: {}", key);
101                } else {
102                    self.read_user_metadata(key, value);
103                }
104            }
105        } else {
106            return Err(Error::GetHeaderMetadata);
107        }
108
109        self.reader
110            .read_exact(&mut self.marker)
111            .map_err(Error::ReadMarker)
112    }
113
114    fn fill_buf(&mut self, n: usize) -> AvroResult<()> {
115        // The buffer needs to contain exactly `n` elements, otherwise codecs will potentially read
116        // invalid bytes.
117        //
118        // The are two cases to handle here:
119        //
120        // 1. `n > self.buf.len()`:
121        //    In this case we call `Vec::resize`, which guarantees that `self.buf.len() == n`.
122        // 2. `n < self.buf.len()`:
123        //    We need to resize to ensure that the buffer len is safe to read `n` elements.
124        //
125        // TODO: Figure out a way to avoid having to truncate for the second case.
126        self.buf.resize(util::safe_len(n)?, 0);
127        self.reader
128            .read_exact(&mut self.buf)
129            .map_err(Error::ReadIntoBuf)?;
130        self.buf_idx = 0;
131        Ok(())
132    }
133
134    /// Try to read a data block, also performing schema resolution for the objects contained in
135    /// the block. The objects are stored in an internal buffer to the `Reader`.
136    fn read_block_next(&mut self) -> AvroResult<()> {
137        assert!(self.is_empty(), "Expected self to be empty!");
138        match util::read_long(&mut self.reader) {
139            Ok(block_len) => {
140                self.message_count = block_len as usize;
141                let block_bytes = util::read_long(&mut self.reader)?;
142                self.fill_buf(block_bytes as usize)?;
143                let mut marker = [0u8; 16];
144                self.reader
145                    .read_exact(&mut marker)
146                    .map_err(Error::ReadBlockMarker)?;
147
148                if marker != self.marker {
149                    return Err(Error::GetBlockMarker);
150                }
151
152                // NOTE (JAB): This doesn't fit this Reader pattern very well.
153                // `self.buf` is a growable buffer that is reused as the reader is iterated.
154                // For non `Codec::Null` variants, `decompress` will allocate a new `Vec`
155                // and replace `buf` with the new one, instead of reusing the same buffer.
156                // We can address this by using some "limited read" type to decode directly
157                // into the buffer. But this is fine, for now.
158                self.codec.decompress(&mut self.buf)
159            }
160            Err(Error::ReadVariableIntegerBytes(io_err)) => {
161                if let ErrorKind::UnexpectedEof = io_err.kind() {
162                    // to not return any error in case we only finished to read cleanly from the stream
163                    Ok(())
164                } else {
165                    Err(Error::ReadVariableIntegerBytes(io_err))
166                }
167            }
168            Err(e) => Err(e),
169        }
170    }
171
172    fn len(&self) -> usize {
173        self.message_count
174    }
175
176    fn is_empty(&self) -> bool {
177        self.len() == 0
178    }
179
180    fn read_next(&mut self, read_schema: Option<&Schema>) -> AvroResult<Option<Value>> {
181        if self.is_empty() {
182            self.read_block_next()?;
183            if self.is_empty() {
184                return Ok(None);
185            }
186        }
187
188        let mut block_bytes = &self.buf[self.buf_idx..];
189        let b_original = block_bytes.len();
190
191        let item = decode_internal(
192            &self.writer_schema,
193            &self.names_refs,
194            &None,
195            &mut block_bytes,
196        )?;
197        let item = match read_schema {
198            Some(schema) => item.resolve(schema)?,
199            None => item,
200        };
201
202        if b_original != 0 && b_original == block_bytes.len() {
203            // from_avro_datum did not consume any bytes, so return an error to avoid an infinite loop
204            return Err(Error::ReadBlock);
205        }
206        self.buf_idx += b_original - block_bytes.len();
207        self.message_count -= 1;
208        Ok(Some(item))
209    }
210
211    fn read_writer_schema(&mut self, metadata: &HashMap<String, Value>) -> AvroResult<()> {
212        let json: serde_json::Value = metadata
213            .get("avro.schema")
214            .and_then(|bytes| {
215                if let Value::Bytes(ref bytes) = *bytes {
216                    from_slice(bytes.as_ref()).ok()
217                } else {
218                    None
219                }
220            })
221            .ok_or(Error::GetAvroSchemaFromMap)?;
222        if !self.schemata.is_empty() {
223            let rs = ResolvedSchema::try_from(self.schemata.clone())?;
224            let names: Names = rs
225                .get_names()
226                .iter()
227                .map(|(name, schema)| (name.clone(), (*schema).clone()))
228                .collect();
229            self.writer_schema = Schema::parse_with_names(&json, names)?;
230            resolve_names_with_schemata(&self.schemata, &mut self.names_refs, &None)?;
231        } else {
232            self.writer_schema = Schema::parse(&json)?;
233            resolve_names(&self.writer_schema, &mut self.names_refs, &None)?;
234        }
235        Ok(())
236    }
237
238    fn read_user_metadata(&mut self, key: String, value: Value) {
239        match value {
240            Value::Bytes(ref vec) => {
241                self.user_metadata.insert(key, vec.clone());
242            }
243            wrong => {
244                warn!(
245                    "User metadata values must be Value::Bytes, found {:?}",
246                    wrong
247                );
248            }
249        }
250    }
251}
252
253fn read_codec(metadata: &HashMap<String, Value>) -> AvroResult<Codec> {
254    let result = metadata
255        .get("avro.codec")
256        .map(|codec| {
257            if let Value::Bytes(ref bytes) = *codec {
258                match std::str::from_utf8(bytes.as_ref()) {
259                    Ok(utf8) => Ok(utf8),
260                    Err(utf8_error) => Err(Error::ConvertToUtf8Error(utf8_error)),
261                }
262            } else {
263                Err(Error::BadCodecMetadata)
264            }
265        })
266        .map(|codec_res| match codec_res {
267            Ok(codec) => match Codec::from_str(codec) {
268                Ok(codec) => match codec {
269                    #[cfg(feature = "bzip")]
270                    Codec::Bzip2(_) => {
271                        use crate::Bzip2Settings;
272                        if let Some(Value::Bytes(bytes)) =
273                            metadata.get("avro.codec.compression_level")
274                        {
275                            Ok(Codec::Bzip2(Bzip2Settings::new(bytes[0])))
276                        } else {
277                            Ok(codec)
278                        }
279                    }
280                    #[cfg(feature = "xz")]
281                    Codec::Xz(_) => {
282                        use crate::XzSettings;
283                        if let Some(Value::Bytes(bytes)) =
284                            metadata.get("avro.codec.compression_level")
285                        {
286                            Ok(Codec::Xz(XzSettings::new(bytes[0])))
287                        } else {
288                            Ok(codec)
289                        }
290                    }
291                    #[cfg(feature = "zstandard")]
292                    Codec::Zstandard(_) => {
293                        use crate::ZstandardSettings;
294                        if let Some(Value::Bytes(bytes)) =
295                            metadata.get("avro.codec.compression_level")
296                        {
297                            Ok(Codec::Zstandard(ZstandardSettings::new(bytes[0])))
298                        } else {
299                            Ok(codec)
300                        }
301                    }
302                    _ => Ok(codec),
303                },
304                Err(_) => Err(Error::CodecNotSupported(codec.to_owned())),
305            },
306            Err(err) => Err(err),
307        });
308
309    result.unwrap_or(Ok(Codec::Null))
310}
311
312/// Main interface for reading Avro formatted values.
313///
314/// To be used as an iterator:
315///
316/// ```no_run
317/// # use apache_avro::Reader;
318/// # use std::io::Cursor;
319/// # let input = Cursor::new(Vec::<u8>::new());
320/// for value in Reader::new(input).unwrap() {
321///     match value {
322///         Ok(v) => println!("{:?}", v),
323///         Err(e) => println!("Error: {}", e),
324///     };
325/// }
326/// ```
327pub struct Reader<'a, R> {
328    block: Block<'a, R>,
329    reader_schema: Option<&'a Schema>,
330    errored: bool,
331    should_resolve_schema: bool,
332}
333
334impl<'a, R: Read> Reader<'a, R> {
335    /// Creates a `Reader` given something implementing the `io::Read` trait to read from.
336    /// No reader `Schema` will be set.
337    ///
338    /// **NOTE** The avro header is going to be read automatically upon creation of the `Reader`.
339    pub fn new(reader: R) -> AvroResult<Reader<'a, R>> {
340        let block = Block::new(reader, vec![])?;
341        let reader = Reader {
342            block,
343            reader_schema: None,
344            errored: false,
345            should_resolve_schema: false,
346        };
347        Ok(reader)
348    }
349
350    /// Creates a `Reader` given a reader `Schema` and something implementing the `io::Read` trait
351    /// to read from.
352    ///
353    /// **NOTE** The avro header is going to be read automatically upon creation of the `Reader`.
354    pub fn with_schema(schema: &'a Schema, reader: R) -> AvroResult<Reader<'a, R>> {
355        let block = Block::new(reader, vec![schema])?;
356        let mut reader = Reader {
357            block,
358            reader_schema: Some(schema),
359            errored: false,
360            should_resolve_schema: false,
361        };
362        // Check if the reader and writer schemas disagree.
363        reader.should_resolve_schema = reader.writer_schema() != schema;
364        Ok(reader)
365    }
366
367    /// Creates a `Reader` given a reader `Schema` and something implementing the `io::Read` trait
368    /// to read from.
369    ///
370    /// **NOTE** The avro header is going to be read automatically upon creation of the `Reader`.
371    pub fn with_schemata(
372        schema: &'a Schema,
373        schemata: Vec<&'a Schema>,
374        reader: R,
375    ) -> AvroResult<Reader<'a, R>> {
376        let block = Block::new(reader, schemata)?;
377        let mut reader = Reader {
378            block,
379            reader_schema: Some(schema),
380            errored: false,
381            should_resolve_schema: false,
382        };
383        // Check if the reader and writer schemas disagree.
384        reader.should_resolve_schema = reader.writer_schema() != schema;
385        Ok(reader)
386    }
387
388    /// Get a reference to the writer `Schema`.
389    #[inline]
390    pub fn writer_schema(&self) -> &Schema {
391        &self.block.writer_schema
392    }
393
394    /// Get a reference to the optional reader `Schema`.
395    #[inline]
396    pub fn reader_schema(&self) -> Option<&Schema> {
397        self.reader_schema
398    }
399
400    /// Get a reference to the user metadata
401    #[inline]
402    pub fn user_metadata(&self) -> &HashMap<String, Vec<u8>> {
403        &self.block.user_metadata
404    }
405
406    #[inline]
407    fn read_next(&mut self) -> AvroResult<Option<Value>> {
408        let read_schema = if self.should_resolve_schema {
409            self.reader_schema
410        } else {
411            None
412        };
413
414        self.block.read_next(read_schema)
415    }
416}
417
418impl<R: Read> Iterator for Reader<'_, R> {
419    type Item = AvroResult<Value>;
420
421    fn next(&mut self) -> Option<Self::Item> {
422        // to prevent keep on reading after the first error occurs
423        if self.errored {
424            return None;
425        };
426        match self.read_next() {
427            Ok(opt) => opt.map(Ok),
428            Err(e) => {
429                self.errored = true;
430                Some(Err(e))
431            }
432        }
433    }
434}
435
436/// Decode a `Value` encoded in Avro format given its `Schema` and anything implementing `io::Read`
437/// to read from.
438///
439/// In case a reader `Schema` is provided, schema resolution will also be performed.
440///
441/// **NOTE** This function has a quite small niche of usage and does NOT take care of reading the
442/// header and consecutive data blocks; use [`Reader`](struct.Reader.html) if you don't know what
443/// you are doing, instead.
444pub fn from_avro_datum<R: Read>(
445    writer_schema: &Schema,
446    reader: &mut R,
447    reader_schema: Option<&Schema>,
448) -> AvroResult<Value> {
449    let value = decode(writer_schema, reader)?;
450    match reader_schema {
451        Some(schema) => value.resolve(schema),
452        None => Ok(value),
453    }
454}
455
456/// Decode a `Value` encoded in Avro format given the provided `Schema` and anything implementing `io::Read`
457/// to read from.
458/// If the writer schema is incomplete, i.e. contains `Schema::Ref`s then it will use the provided
459/// schemata to resolve any dependencies.
460///
461/// In case a reader `Schema` is provided, schema resolution will also be performed.
462pub fn from_avro_datum_schemata<R: Read>(
463    writer_schema: &Schema,
464    writer_schemata: Vec<&Schema>,
465    reader: &mut R,
466    reader_schema: Option<&Schema>,
467) -> AvroResult<Value> {
468    from_avro_datum_reader_schemata(
469        writer_schema,
470        writer_schemata,
471        reader,
472        reader_schema,
473        Vec::with_capacity(0),
474    )
475}
476
477/// Decode a `Value` encoded in Avro format given the provided `Schema` and anything implementing `io::Read`
478/// to read from.
479/// If the writer schema is incomplete, i.e. contains `Schema::Ref`s then it will use the provided
480/// schemata to resolve any dependencies.
481///
482/// In case a reader `Schema` is provided, schema resolution will also be performed.
483pub fn from_avro_datum_reader_schemata<R: Read>(
484    writer_schema: &Schema,
485    writer_schemata: Vec<&Schema>,
486    reader: &mut R,
487    reader_schema: Option<&Schema>,
488    reader_schemata: Vec<&Schema>,
489) -> AvroResult<Value> {
490    let rs = ResolvedSchema::try_from(writer_schemata)?;
491    let value = decode_internal(writer_schema, rs.get_names(), &None, reader)?;
492    match reader_schema {
493        Some(schema) => {
494            if reader_schemata.is_empty() {
495                value.resolve(schema)
496            } else {
497                value.resolve_schemata(schema, reader_schemata)
498            }
499        }
500        None => Ok(value),
501    }
502}
503
504pub struct GenericSingleObjectReader {
505    write_schema: ResolvedOwnedSchema,
506    expected_header: [u8; 10],
507}
508
509impl GenericSingleObjectReader {
510    pub fn new(schema: Schema) -> AvroResult<GenericSingleObjectReader> {
511        let fingerprint = schema.fingerprint::<Rabin>();
512        let expected_header = [
513            0xC3,
514            0x01,
515            fingerprint.bytes[0],
516            fingerprint.bytes[1],
517            fingerprint.bytes[2],
518            fingerprint.bytes[3],
519            fingerprint.bytes[4],
520            fingerprint.bytes[5],
521            fingerprint.bytes[6],
522            fingerprint.bytes[7],
523        ];
524        Ok(GenericSingleObjectReader {
525            write_schema: ResolvedOwnedSchema::try_from(schema)?,
526            expected_header,
527        })
528    }
529
530    pub fn read_value<R: Read>(&self, reader: &mut R) -> AvroResult<Value> {
531        let mut header: [u8; 10] = [0; 10];
532        match reader.read_exact(&mut header) {
533            Ok(_) => {
534                if self.expected_header == header {
535                    decode_internal(
536                        self.write_schema.get_root_schema(),
537                        self.write_schema.get_names(),
538                        &None,
539                        reader,
540                    )
541                } else {
542                    Err(Error::SingleObjectHeaderMismatch(
543                        self.expected_header,
544                        header,
545                    ))
546                }
547            }
548            Err(io_error) => Err(Error::ReadHeader(io_error)),
549        }
550    }
551}
552
553pub struct SpecificSingleObjectReader<T>
554where
555    T: AvroSchema,
556{
557    inner: GenericSingleObjectReader,
558    _model: PhantomData<T>,
559}
560
561impl<T> SpecificSingleObjectReader<T>
562where
563    T: AvroSchema,
564{
565    pub fn new() -> AvroResult<SpecificSingleObjectReader<T>> {
566        Ok(SpecificSingleObjectReader {
567            inner: GenericSingleObjectReader::new(T::get_schema())?,
568            _model: PhantomData,
569        })
570    }
571}
572
573impl<T> SpecificSingleObjectReader<T>
574where
575    T: AvroSchema + From<Value>,
576{
577    pub fn read_from_value<R: Read>(&self, reader: &mut R) -> AvroResult<T> {
578        self.inner.read_value(reader).map(|v| v.into())
579    }
580}
581
582impl<T> SpecificSingleObjectReader<T>
583where
584    T: AvroSchema + DeserializeOwned,
585{
586    pub fn read<R: Read>(&self, reader: &mut R) -> AvroResult<T> {
587        from_value::<T>(&self.inner.read_value(reader)?)
588    }
589}
590
591/// Reads the marker bytes from Avro bytes generated earlier by a `Writer`
592pub fn read_marker(bytes: &[u8]) -> [u8; 16] {
593    assert!(
594        bytes.len() > 16,
595        "The bytes are too short to read a marker from them"
596    );
597    let mut marker = [0_u8; 16];
598    marker.clone_from_slice(&bytes[(bytes.len() - 16)..]);
599    marker
600}
601
602#[cfg(test)]
603mod tests {
604    use super::*;
605    use crate::{encode::encode, types::Record};
606    use apache_avro_test_helper::TestResult;
607    use pretty_assertions::assert_eq;
608    use serde::Deserialize;
609    use std::io::Cursor;
610
611    const SCHEMA: &str = r#"
612    {
613      "type": "record",
614      "name": "test",
615      "fields": [
616        {
617          "name": "a",
618          "type": "long",
619          "default": 42
620        },
621        {
622          "name": "b",
623          "type": "string"
624        }
625      ]
626    }
627    "#;
628    const UNION_SCHEMA: &str = r#"["null", "long"]"#;
629    const ENCODED: &[u8] = &[
630        79u8, 98u8, 106u8, 1u8, 4u8, 22u8, 97u8, 118u8, 114u8, 111u8, 46u8, 115u8, 99u8, 104u8,
631        101u8, 109u8, 97u8, 222u8, 1u8, 123u8, 34u8, 116u8, 121u8, 112u8, 101u8, 34u8, 58u8, 34u8,
632        114u8, 101u8, 99u8, 111u8, 114u8, 100u8, 34u8, 44u8, 34u8, 110u8, 97u8, 109u8, 101u8, 34u8,
633        58u8, 34u8, 116u8, 101u8, 115u8, 116u8, 34u8, 44u8, 34u8, 102u8, 105u8, 101u8, 108u8,
634        100u8, 115u8, 34u8, 58u8, 91u8, 123u8, 34u8, 110u8, 97u8, 109u8, 101u8, 34u8, 58u8, 34u8,
635        97u8, 34u8, 44u8, 34u8, 116u8, 121u8, 112u8, 101u8, 34u8, 58u8, 34u8, 108u8, 111u8, 110u8,
636        103u8, 34u8, 44u8, 34u8, 100u8, 101u8, 102u8, 97u8, 117u8, 108u8, 116u8, 34u8, 58u8, 52u8,
637        50u8, 125u8, 44u8, 123u8, 34u8, 110u8, 97u8, 109u8, 101u8, 34u8, 58u8, 34u8, 98u8, 34u8,
638        44u8, 34u8, 116u8, 121u8, 112u8, 101u8, 34u8, 58u8, 34u8, 115u8, 116u8, 114u8, 105u8,
639        110u8, 103u8, 34u8, 125u8, 93u8, 125u8, 20u8, 97u8, 118u8, 114u8, 111u8, 46u8, 99u8, 111u8,
640        100u8, 101u8, 99u8, 8u8, 110u8, 117u8, 108u8, 108u8, 0u8, 94u8, 61u8, 54u8, 221u8, 190u8,
641        207u8, 108u8, 180u8, 158u8, 57u8, 114u8, 40u8, 173u8, 199u8, 228u8, 239u8, 4u8, 20u8, 54u8,
642        6u8, 102u8, 111u8, 111u8, 84u8, 6u8, 98u8, 97u8, 114u8, 94u8, 61u8, 54u8, 221u8, 190u8,
643        207u8, 108u8, 180u8, 158u8, 57u8, 114u8, 40u8, 173u8, 199u8, 228u8, 239u8,
644    ];
645
646    #[test]
647    fn test_from_avro_datum() -> TestResult {
648        let schema = Schema::parse_str(SCHEMA)?;
649        let mut encoded: &'static [u8] = &[54, 6, 102, 111, 111];
650
651        let mut record = Record::new(&schema).unwrap();
652        record.put("a", 27i64);
653        record.put("b", "foo");
654        let expected = record.into();
655
656        assert_eq!(from_avro_datum(&schema, &mut encoded, None)?, expected);
657
658        Ok(())
659    }
660
661    #[test]
662    fn test_from_avro_datum_with_union_to_struct() -> TestResult {
663        const TEST_RECORD_SCHEMA_3240: &str = r#"
664    {
665      "type": "record",
666      "name": "test",
667      "fields": [
668        {
669          "name": "a",
670          "type": "long",
671          "default": 42
672        },
673        {
674          "name": "b",
675          "type": "string"
676        },
677        {
678            "name": "a_nullable_array",
679            "type": ["null", {"type": "array", "items": {"type": "string"}}],
680            "default": null
681        },
682        {
683            "name": "a_nullable_boolean",
684            "type": ["null", {"type": "boolean"}],
685            "default": null
686        },
687        {
688            "name": "a_nullable_string",
689            "type": ["null", {"type": "string"}],
690            "default": null
691        }
692      ]
693    }
694    "#;
695        #[derive(Default, Debug, Deserialize, PartialEq, Eq)]
696        struct TestRecord3240 {
697            a: i64,
698            b: String,
699            a_nullable_array: Option<Vec<String>>,
700            // we are missing the 'a_nullable_boolean' field to simulate missing keys
701            // a_nullable_boolean: Option<bool>,
702            a_nullable_string: Option<String>,
703        }
704
705        let schema = Schema::parse_str(TEST_RECORD_SCHEMA_3240)?;
706        let mut encoded: &'static [u8] = &[54, 6, 102, 111, 111];
707
708        let expected_record: TestRecord3240 = TestRecord3240 {
709            a: 27i64,
710            b: String::from("foo"),
711            a_nullable_array: None,
712            a_nullable_string: None,
713        };
714
715        let avro_datum = from_avro_datum(&schema, &mut encoded, None)?;
716        let parsed_record: TestRecord3240 = match &avro_datum {
717            Value::Record(_) => from_value::<TestRecord3240>(&avro_datum)?,
718            unexpected => {
719                panic!("could not map avro data to struct, found unexpected: {unexpected:?}")
720            }
721        };
722
723        assert_eq!(parsed_record, expected_record);
724
725        Ok(())
726    }
727
728    #[test]
729    fn test_null_union() -> TestResult {
730        let schema = Schema::parse_str(UNION_SCHEMA)?;
731        let mut encoded: &'static [u8] = &[2, 0];
732
733        assert_eq!(
734            from_avro_datum(&schema, &mut encoded, None)?,
735            Value::Union(1, Box::new(Value::Long(0)))
736        );
737
738        Ok(())
739    }
740
741    #[test]
742    fn test_reader_iterator() -> TestResult {
743        let schema = Schema::parse_str(SCHEMA)?;
744        let reader = Reader::with_schema(&schema, ENCODED)?;
745
746        let mut record1 = Record::new(&schema).unwrap();
747        record1.put("a", 27i64);
748        record1.put("b", "foo");
749
750        let mut record2 = Record::new(&schema).unwrap();
751        record2.put("a", 42i64);
752        record2.put("b", "bar");
753
754        let expected = [record1.into(), record2.into()];
755
756        for (i, value) in reader.enumerate() {
757            assert_eq!(value?, expected[i]);
758        }
759
760        Ok(())
761    }
762
763    #[test]
764    fn test_reader_invalid_header() -> TestResult {
765        let schema = Schema::parse_str(SCHEMA)?;
766        let invalid = ENCODED.iter().copied().skip(1).collect::<Vec<u8>>();
767        assert!(Reader::with_schema(&schema, &invalid[..]).is_err());
768
769        Ok(())
770    }
771
772    #[test]
773    fn test_reader_invalid_block() -> TestResult {
774        let schema = Schema::parse_str(SCHEMA)?;
775        let invalid = ENCODED
776            .iter()
777            .copied()
778            .rev()
779            .skip(19)
780            .collect::<Vec<u8>>()
781            .into_iter()
782            .rev()
783            .collect::<Vec<u8>>();
784        let reader = Reader::with_schema(&schema, &invalid[..])?;
785        for value in reader {
786            assert!(value.is_err());
787        }
788
789        Ok(())
790    }
791
792    #[test]
793    fn test_reader_empty_buffer() -> TestResult {
794        let empty = Cursor::new(Vec::new());
795        assert!(Reader::new(empty).is_err());
796
797        Ok(())
798    }
799
800    #[test]
801    fn test_reader_only_header() -> TestResult {
802        let invalid = ENCODED.iter().copied().take(165).collect::<Vec<u8>>();
803        let reader = Reader::new(&invalid[..])?;
804        for value in reader {
805            assert!(value.is_err());
806        }
807
808        Ok(())
809    }
810
811    #[test]
812    fn test_avro_3405_read_user_metadata_success() -> TestResult {
813        use crate::writer::Writer;
814
815        let schema = Schema::parse_str(SCHEMA)?;
816        let mut writer = Writer::new(&schema, Vec::new());
817
818        let mut user_meta_data: HashMap<String, Vec<u8>> = HashMap::new();
819        user_meta_data.insert(
820            "stringKey".to_string(),
821            "stringValue".to_string().into_bytes(),
822        );
823        user_meta_data.insert("bytesKey".to_string(), b"bytesValue".to_vec());
824        user_meta_data.insert("vecKey".to_string(), vec![1, 2, 3]);
825
826        for (k, v) in user_meta_data.iter() {
827            writer.add_user_metadata(k.to_string(), v)?;
828        }
829
830        let mut record = Record::new(&schema).unwrap();
831        record.put("a", 27i64);
832        record.put("b", "foo");
833
834        writer.append(record.clone())?;
835        writer.append(record.clone())?;
836        writer.flush()?;
837        let result = writer.into_inner()?;
838
839        let reader = Reader::new(&result[..])?;
840        assert_eq!(reader.user_metadata(), &user_meta_data);
841
842        Ok(())
843    }
844
845    #[derive(Deserialize, Clone, PartialEq, Debug)]
846    struct TestSingleObjectReader {
847        a: i64,
848        b: f64,
849        c: Vec<String>,
850    }
851
852    impl AvroSchema for TestSingleObjectReader {
853        fn get_schema() -> Schema {
854            let schema = r#"
855            {
856                "type":"record",
857                "name":"TestSingleObjectWrtierSerialize",
858                "fields":[
859                    {
860                        "name":"a",
861                        "type":"long"
862                    },
863                    {
864                        "name":"b",
865                        "type":"double"
866                    },
867                    {
868                        "name":"c",
869                        "type":{
870                            "type":"array",
871                            "items":"string"
872                        }
873                    }
874                ]
875            }
876            "#;
877            Schema::parse_str(schema).unwrap()
878        }
879    }
880
881    impl From<Value> for TestSingleObjectReader {
882        fn from(obj: Value) -> TestSingleObjectReader {
883            if let Value::Record(fields) = obj {
884                let mut a = None;
885                let mut b = None;
886                let mut c = vec![];
887                for (field_name, v) in fields {
888                    match (field_name.as_str(), v) {
889                        ("a", Value::Long(i)) => a = Some(i),
890                        ("b", Value::Double(d)) => b = Some(d),
891                        ("c", Value::Array(v)) => {
892                            for inner_val in v {
893                                if let Value::String(s) = inner_val {
894                                    c.push(s);
895                                }
896                            }
897                        }
898                        (key, value) => panic!("Unexpected pair: {key:?} -> {value:?}"),
899                    }
900                }
901                TestSingleObjectReader {
902                    a: a.unwrap(),
903                    b: b.unwrap(),
904                    c,
905                }
906            } else {
907                panic!("Expected a Value::Record but was {obj:?}")
908            }
909        }
910    }
911
912    impl From<TestSingleObjectReader> for Value {
913        fn from(obj: TestSingleObjectReader) -> Value {
914            Value::Record(vec![
915                ("a".into(), obj.a.into()),
916                ("b".into(), obj.b.into()),
917                (
918                    "c".into(),
919                    Value::Array(obj.c.into_iter().map(|s| s.into()).collect()),
920                ),
921            ])
922        }
923    }
924
925    #[test]
926    fn test_avro_3507_single_object_reader() -> TestResult {
927        let obj = TestSingleObjectReader {
928            a: 42,
929            b: 3.33,
930            c: vec!["cat".into(), "dog".into()],
931        };
932        let mut to_read = Vec::<u8>::new();
933        to_read.extend_from_slice(&[0xC3, 0x01]);
934        to_read.extend_from_slice(
935            &TestSingleObjectReader::get_schema()
936                .fingerprint::<Rabin>()
937                .bytes[..],
938        );
939        encode(
940            &obj.clone().into(),
941            &TestSingleObjectReader::get_schema(),
942            &mut to_read,
943        )
944        .expect("Encode should succeed");
945        let mut to_read = &to_read[..];
946        let generic_reader = GenericSingleObjectReader::new(TestSingleObjectReader::get_schema())
947            .expect("Schema should resolve");
948        let val = generic_reader
949            .read_value(&mut to_read)
950            .expect("Should read");
951        let expected_value: Value = obj.into();
952        assert_eq!(expected_value, val);
953
954        Ok(())
955    }
956
957    #[test]
958    fn avro_3642_test_single_object_reader_incomplete_reads() -> TestResult {
959        let obj = TestSingleObjectReader {
960            a: 42,
961            b: 3.33,
962            c: vec!["cat".into(), "dog".into()],
963        };
964        // The two-byte marker, to show that the message uses this single-record format
965        let to_read_1 = [0xC3, 0x01];
966        let mut to_read_2 = Vec::<u8>::new();
967        to_read_2.extend_from_slice(
968            &TestSingleObjectReader::get_schema()
969                .fingerprint::<Rabin>()
970                .bytes[..],
971        );
972        let mut to_read_3 = Vec::<u8>::new();
973        encode(
974            &obj.clone().into(),
975            &TestSingleObjectReader::get_schema(),
976            &mut to_read_3,
977        )
978        .expect("Encode should succeed");
979        let mut to_read = (&to_read_1[..]).chain(&to_read_2[..]).chain(&to_read_3[..]);
980        let generic_reader = GenericSingleObjectReader::new(TestSingleObjectReader::get_schema())
981            .expect("Schema should resolve");
982        let val = generic_reader
983            .read_value(&mut to_read)
984            .expect("Should read");
985        let expected_value: Value = obj.into();
986        assert_eq!(expected_value, val);
987
988        Ok(())
989    }
990
991    #[test]
992    fn test_avro_3507_reader_parity() -> TestResult {
993        let obj = TestSingleObjectReader {
994            a: 42,
995            b: 3.33,
996            c: vec!["cat".into(), "dog".into()],
997        };
998
999        let mut to_read = Vec::<u8>::new();
1000        to_read.extend_from_slice(&[0xC3, 0x01]);
1001        to_read.extend_from_slice(
1002            &TestSingleObjectReader::get_schema()
1003                .fingerprint::<Rabin>()
1004                .bytes[..],
1005        );
1006        encode(
1007            &obj.clone().into(),
1008            &TestSingleObjectReader::get_schema(),
1009            &mut to_read,
1010        )
1011        .expect("Encode should succeed");
1012        let generic_reader = GenericSingleObjectReader::new(TestSingleObjectReader::get_schema())
1013            .expect("Schema should resolve");
1014        let specific_reader = SpecificSingleObjectReader::<TestSingleObjectReader>::new()
1015            .expect("schema should resolve");
1016        let mut to_read1 = &to_read[..];
1017        let mut to_read2 = &to_read[..];
1018        let mut to_read3 = &to_read[..];
1019
1020        let val = generic_reader
1021            .read_value(&mut to_read1)
1022            .expect("Should read");
1023        let read_obj1 = specific_reader
1024            .read_from_value(&mut to_read2)
1025            .expect("Should read from value");
1026        let read_obj2 = specific_reader
1027            .read(&mut to_read3)
1028            .expect("Should read from deserilize");
1029        let expected_value: Value = obj.clone().into();
1030        assert_eq!(obj, read_obj1);
1031        assert_eq!(obj, read_obj2);
1032        assert_eq!(val, expected_value);
1033
1034        Ok(())
1035    }
1036
1037    #[cfg(not(feature = "snappy"))]
1038    #[test]
1039    fn test_avro_3549_read_not_enabled_codec() {
1040        let snappy_compressed_avro = vec![
1041            79, 98, 106, 1, 4, 22, 97, 118, 114, 111, 46, 115, 99, 104, 101, 109, 97, 210, 1, 123,
1042            34, 102, 105, 101, 108, 100, 115, 34, 58, 91, 123, 34, 110, 97, 109, 101, 34, 58, 34,
1043            110, 117, 109, 34, 44, 34, 116, 121, 112, 101, 34, 58, 34, 115, 116, 114, 105, 110,
1044            103, 34, 125, 93, 44, 34, 110, 97, 109, 101, 34, 58, 34, 101, 118, 101, 110, 116, 34,
1045            44, 34, 110, 97, 109, 101, 115, 112, 97, 99, 101, 34, 58, 34, 101, 120, 97, 109, 112,
1046            108, 101, 110, 97, 109, 101, 115, 112, 97, 99, 101, 34, 44, 34, 116, 121, 112, 101, 34,
1047            58, 34, 114, 101, 99, 111, 114, 100, 34, 125, 20, 97, 118, 114, 111, 46, 99, 111, 100,
1048            101, 99, 12, 115, 110, 97, 112, 112, 121, 0, 213, 209, 241, 208, 200, 110, 164, 47,
1049            203, 25, 90, 235, 161, 167, 195, 177, 2, 20, 4, 12, 6, 49, 50, 51, 115, 38, 58, 0, 213,
1050            209, 241, 208, 200, 110, 164, 47, 203, 25, 90, 235, 161, 167, 195, 177,
1051        ];
1052
1053        if let Err(err) = Reader::new(snappy_compressed_avro.as_slice()) {
1054            assert_eq!("Codec 'snappy' is not supported/enabled", err.to_string());
1055        } else {
1056            panic!("Expected an error in the reading of the codec!");
1057        }
1058    }
1059}