apache_avro/
reader.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Logic handling reading from Avro format at user level.
19use crate::{
20    AvroResult, Codec, Error,
21    decode::{decode, decode_internal},
22    error::Details,
23    from_value,
24    headers::{HeaderBuilder, RabinFingerprintHeader},
25    schema::{
26        AvroSchema, Names, ResolvedOwnedSchema, ResolvedSchema, Schema, resolve_names,
27        resolve_names_with_schemata,
28    },
29    types::Value,
30    util,
31};
32use log::warn;
33use serde::de::DeserializeOwned;
34use serde_json::from_slice;
35use std::{
36    collections::HashMap,
37    io::{ErrorKind, Read},
38    marker::PhantomData,
39    str::FromStr,
40};
41
42/// Internal Block reader.
43#[derive(Debug, Clone)]
44struct Block<'r, R> {
45    reader: R,
46    /// Internal buffering to reduce allocation.
47    buf: Vec<u8>,
48    buf_idx: usize,
49    /// Number of elements expected to exist within this block.
50    message_count: usize,
51    marker: [u8; 16],
52    codec: Codec,
53    writer_schema: Schema,
54    schemata: Vec<&'r Schema>,
55    user_metadata: HashMap<String, Vec<u8>>,
56    names_refs: Names,
57}
58
59impl<'r, R: Read> Block<'r, R> {
60    fn new(reader: R, schemata: Vec<&'r Schema>) -> AvroResult<Block<'r, R>> {
61        let mut block = Block {
62            reader,
63            codec: Codec::Null,
64            writer_schema: Schema::Null,
65            schemata,
66            buf: vec![],
67            buf_idx: 0,
68            message_count: 0,
69            marker: [0; 16],
70            user_metadata: Default::default(),
71            names_refs: Default::default(),
72        };
73
74        block.read_header()?;
75        Ok(block)
76    }
77
78    /// Try to read the header and to set the writer `Schema`, the `Codec` and the marker based on
79    /// its content.
80    fn read_header(&mut self) -> AvroResult<()> {
81        let mut buf = [0u8; 4];
82        self.reader
83            .read_exact(&mut buf)
84            .map_err(Details::ReadHeader)?;
85
86        if buf != [b'O', b'b', b'j', 1u8] {
87            return Err(Details::HeaderMagic.into());
88        }
89
90        let meta_schema = Schema::map(Schema::Bytes);
91        match decode(&meta_schema, &mut self.reader)? {
92            Value::Map(metadata) => {
93                self.read_writer_schema(&metadata)?;
94                self.codec = read_codec(&metadata)?;
95
96                for (key, value) in metadata {
97                    if key == "avro.schema"
98                        || key == "avro.codec"
99                        || key == "avro.codec.compression_level"
100                    {
101                        // already processed
102                    } else if key.starts_with("avro.") {
103                        warn!("Ignoring unknown metadata key: {key}");
104                    } else {
105                        self.read_user_metadata(key, value);
106                    }
107                }
108            }
109            _ => {
110                return Err(Details::GetHeaderMetadata.into());
111            }
112        }
113
114        self.reader
115            .read_exact(&mut self.marker)
116            .map_err(|e| Details::ReadMarker(e).into())
117    }
118
119    fn fill_buf(&mut self, n: usize) -> AvroResult<()> {
120        // The buffer needs to contain exactly `n` elements, otherwise codecs will potentially read
121        // invalid bytes.
122        //
123        // The are two cases to handle here:
124        //
125        // 1. `n > self.buf.len()`:
126        //    In this case we call `Vec::resize`, which guarantees that `self.buf.len() == n`.
127        // 2. `n < self.buf.len()`:
128        //    We need to resize to ensure that the buffer len is safe to read `n` elements.
129        //
130        // TODO: Figure out a way to avoid having to truncate for the second case.
131        self.buf.resize(util::safe_len(n)?, 0);
132        self.reader
133            .read_exact(&mut self.buf)
134            .map_err(Details::ReadIntoBuf)?;
135        self.buf_idx = 0;
136        Ok(())
137    }
138
139    /// Try to read a data block, also performing schema resolution for the objects contained in
140    /// the block. The objects are stored in an internal buffer to the `Reader`.
141    fn read_block_next(&mut self) -> AvroResult<()> {
142        assert!(self.is_empty(), "Expected self to be empty!");
143        match util::read_long(&mut self.reader).map_err(Error::into_details) {
144            Ok(block_len) => {
145                self.message_count = block_len as usize;
146                let block_bytes = util::read_long(&mut self.reader)?;
147                self.fill_buf(block_bytes as usize)?;
148                let mut marker = [0u8; 16];
149                self.reader
150                    .read_exact(&mut marker)
151                    .map_err(Details::ReadBlockMarker)?;
152
153                if marker != self.marker {
154                    return Err(Details::GetBlockMarker.into());
155                }
156
157                // NOTE (JAB): This doesn't fit this Reader pattern very well.
158                // `self.buf` is a growable buffer that is reused as the reader is iterated.
159                // For non `Codec::Null` variants, `decompress` will allocate a new `Vec`
160                // and replace `buf` with the new one, instead of reusing the same buffer.
161                // We can address this by using some "limited read" type to decode directly
162                // into the buffer. But this is fine, for now.
163                self.codec.decompress(&mut self.buf)
164            }
165            Err(Details::ReadVariableIntegerBytes(io_err)) => {
166                if let ErrorKind::UnexpectedEof = io_err.kind() {
167                    // to not return any error in case we only finished to read cleanly from the stream
168                    Ok(())
169                } else {
170                    Err(Details::ReadVariableIntegerBytes(io_err).into())
171                }
172            }
173            Err(e) => Err(Error::new(e)),
174        }
175    }
176
177    fn len(&self) -> usize {
178        self.message_count
179    }
180
181    fn is_empty(&self) -> bool {
182        self.len() == 0
183    }
184
185    fn read_next(&mut self, read_schema: Option<&Schema>) -> AvroResult<Option<Value>> {
186        if self.is_empty() {
187            self.read_block_next()?;
188            if self.is_empty() {
189                return Ok(None);
190            }
191        }
192
193        let mut block_bytes = &self.buf[self.buf_idx..];
194        let b_original = block_bytes.len();
195
196        let item = decode_internal(
197            &self.writer_schema,
198            &self.names_refs,
199            &None,
200            &mut block_bytes,
201        )?;
202        let item = match read_schema {
203            Some(schema) => item.resolve(schema)?,
204            None => item,
205        };
206
207        if b_original != 0 && b_original == block_bytes.len() {
208            // from_avro_datum did not consume any bytes, so return an error to avoid an infinite loop
209            return Err(Details::ReadBlock.into());
210        }
211        self.buf_idx += b_original - block_bytes.len();
212        self.message_count -= 1;
213        Ok(Some(item))
214    }
215
216    fn read_writer_schema(&mut self, metadata: &HashMap<String, Value>) -> AvroResult<()> {
217        let json: serde_json::Value = metadata
218            .get("avro.schema")
219            .and_then(|bytes| {
220                if let Value::Bytes(ref bytes) = *bytes {
221                    from_slice(bytes.as_ref()).ok()
222                } else {
223                    None
224                }
225            })
226            .ok_or(Details::GetAvroSchemaFromMap)?;
227        if !self.schemata.is_empty() {
228            let rs = ResolvedSchema::try_from(self.schemata.clone())?;
229            let names: Names = rs
230                .get_names()
231                .iter()
232                .map(|(name, schema)| (name.clone(), (*schema).clone()))
233                .collect();
234            self.writer_schema = Schema::parse_with_names(&json, names)?;
235            resolve_names_with_schemata(&self.schemata, &mut self.names_refs, &None)?;
236        } else {
237            self.writer_schema = Schema::parse(&json)?;
238            resolve_names(&self.writer_schema, &mut self.names_refs, &None)?;
239        }
240        Ok(())
241    }
242
243    fn read_user_metadata(&mut self, key: String, value: Value) {
244        match value {
245            Value::Bytes(ref vec) => {
246                self.user_metadata.insert(key, vec.clone());
247            }
248            wrong => {
249                warn!("User metadata values must be Value::Bytes, found {wrong:?}");
250            }
251        }
252    }
253}
254
255fn read_codec(metadata: &HashMap<String, Value>) -> AvroResult<Codec> {
256    let result = metadata
257        .get("avro.codec")
258        .map(|codec| {
259            if let Value::Bytes(ref bytes) = *codec {
260                match std::str::from_utf8(bytes.as_ref()) {
261                    Ok(utf8) => Ok(utf8),
262                    Err(utf8_error) => Err(Details::ConvertToUtf8Error(utf8_error).into()),
263                }
264            } else {
265                Err(Details::BadCodecMetadata.into())
266            }
267        })
268        .map(|codec_res| match codec_res {
269            Ok(codec) => match Codec::from_str(codec) {
270                Ok(codec) => match codec {
271                    #[cfg(feature = "bzip")]
272                    Codec::Bzip2(_) => {
273                        use crate::Bzip2Settings;
274                        if let Some(Value::Bytes(bytes)) =
275                            metadata.get("avro.codec.compression_level")
276                        {
277                            Ok(Codec::Bzip2(Bzip2Settings::new(bytes[0])))
278                        } else {
279                            Ok(codec)
280                        }
281                    }
282                    #[cfg(feature = "xz")]
283                    Codec::Xz(_) => {
284                        use crate::XzSettings;
285                        if let Some(Value::Bytes(bytes)) =
286                            metadata.get("avro.codec.compression_level")
287                        {
288                            Ok(Codec::Xz(XzSettings::new(bytes[0])))
289                        } else {
290                            Ok(codec)
291                        }
292                    }
293                    #[cfg(feature = "zstandard")]
294                    Codec::Zstandard(_) => {
295                        use crate::ZstandardSettings;
296                        if let Some(Value::Bytes(bytes)) =
297                            metadata.get("avro.codec.compression_level")
298                        {
299                            Ok(Codec::Zstandard(ZstandardSettings::new(bytes[0])))
300                        } else {
301                            Ok(codec)
302                        }
303                    }
304                    _ => Ok(codec),
305                },
306                Err(_) => Err(Details::CodecNotSupported(codec.to_owned()).into()),
307            },
308            Err(err) => Err(err),
309        });
310
311    result.unwrap_or(Ok(Codec::Null))
312}
313
314/// Main interface for reading Avro formatted values.
315///
316/// To be used as an iterator:
317///
318/// ```no_run
319/// # use apache_avro::Reader;
320/// # use std::io::Cursor;
321/// # let input = Cursor::new(Vec::<u8>::new());
322/// for value in Reader::new(input).unwrap() {
323///     match value {
324///         Ok(v) => println!("{:?}", v),
325///         Err(e) => println!("Error: {}", e),
326///     };
327/// }
328/// ```
329pub struct Reader<'a, R> {
330    block: Block<'a, R>,
331    reader_schema: Option<&'a Schema>,
332    errored: bool,
333    should_resolve_schema: bool,
334}
335
336impl<'a, R: Read> Reader<'a, R> {
337    /// Creates a `Reader` given something implementing the `io::Read` trait to read from.
338    /// No reader `Schema` will be set.
339    ///
340    /// **NOTE** The avro header is going to be read automatically upon creation of the `Reader`.
341    pub fn new(reader: R) -> AvroResult<Reader<'a, R>> {
342        let block = Block::new(reader, vec![])?;
343        let reader = Reader {
344            block,
345            reader_schema: None,
346            errored: false,
347            should_resolve_schema: false,
348        };
349        Ok(reader)
350    }
351
352    /// Creates a `Reader` given a reader `Schema` and something implementing the `io::Read` trait
353    /// to read from.
354    ///
355    /// **NOTE** The avro header is going to be read automatically upon creation of the `Reader`.
356    pub fn with_schema(schema: &'a Schema, reader: R) -> AvroResult<Reader<'a, R>> {
357        let block = Block::new(reader, vec![schema])?;
358        let mut reader = Reader {
359            block,
360            reader_schema: Some(schema),
361            errored: false,
362            should_resolve_schema: false,
363        };
364        // Check if the reader and writer schemas disagree.
365        reader.should_resolve_schema = reader.writer_schema() != schema;
366        Ok(reader)
367    }
368
369    /// Creates a `Reader` given a reader `Schema` and something implementing the `io::Read` trait
370    /// to read from.
371    ///
372    /// **NOTE** The avro header is going to be read automatically upon creation of the `Reader`.
373    pub fn with_schemata(
374        schema: &'a Schema,
375        schemata: Vec<&'a Schema>,
376        reader: R,
377    ) -> AvroResult<Reader<'a, R>> {
378        let block = Block::new(reader, schemata)?;
379        let mut reader = Reader {
380            block,
381            reader_schema: Some(schema),
382            errored: false,
383            should_resolve_schema: false,
384        };
385        // Check if the reader and writer schemas disagree.
386        reader.should_resolve_schema = reader.writer_schema() != schema;
387        Ok(reader)
388    }
389
390    /// Get a reference to the writer `Schema`.
391    #[inline]
392    pub fn writer_schema(&self) -> &Schema {
393        &self.block.writer_schema
394    }
395
396    /// Get a reference to the optional reader `Schema`.
397    #[inline]
398    pub fn reader_schema(&self) -> Option<&Schema> {
399        self.reader_schema
400    }
401
402    /// Get a reference to the user metadata
403    #[inline]
404    pub fn user_metadata(&self) -> &HashMap<String, Vec<u8>> {
405        &self.block.user_metadata
406    }
407
408    #[inline]
409    fn read_next(&mut self) -> AvroResult<Option<Value>> {
410        let read_schema = if self.should_resolve_schema {
411            self.reader_schema
412        } else {
413            None
414        };
415
416        self.block.read_next(read_schema)
417    }
418}
419
420impl<R: Read> Iterator for Reader<'_, R> {
421    type Item = AvroResult<Value>;
422
423    fn next(&mut self) -> Option<Self::Item> {
424        // to prevent keep on reading after the first error occurs
425        if self.errored {
426            return None;
427        };
428        match self.read_next() {
429            Ok(opt) => opt.map(Ok),
430            Err(e) => {
431                self.errored = true;
432                Some(Err(e))
433            }
434        }
435    }
436}
437
438/// Decode a `Value` encoded in Avro format given its `Schema` and anything implementing `io::Read`
439/// to read from.
440///
441/// In case a reader `Schema` is provided, schema resolution will also be performed.
442///
443/// **NOTE** This function has a quite small niche of usage and does NOT take care of reading the
444/// header and consecutive data blocks; use [`Reader`](struct.Reader.html) if you don't know what
445/// you are doing, instead.
446pub fn from_avro_datum<R: Read>(
447    writer_schema: &Schema,
448    reader: &mut R,
449    reader_schema: Option<&Schema>,
450) -> AvroResult<Value> {
451    let value = decode(writer_schema, reader)?;
452    match reader_schema {
453        Some(schema) => value.resolve(schema),
454        None => Ok(value),
455    }
456}
457
458/// Decode a `Value` encoded in Avro format given the provided `Schema` and anything implementing `io::Read`
459/// to read from.
460/// If the writer schema is incomplete, i.e. contains `Schema::Ref`s then it will use the provided
461/// schemata to resolve any dependencies.
462///
463/// In case a reader `Schema` is provided, schema resolution will also be performed.
464pub fn from_avro_datum_schemata<R: Read>(
465    writer_schema: &Schema,
466    writer_schemata: Vec<&Schema>,
467    reader: &mut R,
468    reader_schema: Option<&Schema>,
469) -> AvroResult<Value> {
470    from_avro_datum_reader_schemata(
471        writer_schema,
472        writer_schemata,
473        reader,
474        reader_schema,
475        Vec::with_capacity(0),
476    )
477}
478
479/// Decode a `Value` encoded in Avro format given the provided `Schema` and anything implementing `io::Read`
480/// to read from.
481/// If the writer schema is incomplete, i.e. contains `Schema::Ref`s then it will use the provided
482/// schemata to resolve any dependencies.
483///
484/// In case a reader `Schema` is provided, schema resolution will also be performed.
485pub fn from_avro_datum_reader_schemata<R: Read>(
486    writer_schema: &Schema,
487    writer_schemata: Vec<&Schema>,
488    reader: &mut R,
489    reader_schema: Option<&Schema>,
490    reader_schemata: Vec<&Schema>,
491) -> AvroResult<Value> {
492    let rs = ResolvedSchema::try_from(writer_schemata)?;
493    let value = decode_internal(writer_schema, rs.get_names(), &None, reader)?;
494    match reader_schema {
495        Some(schema) => {
496            if reader_schemata.is_empty() {
497                value.resolve(schema)
498            } else {
499                value.resolve_schemata(schema, reader_schemata)
500            }
501        }
502        None => Ok(value),
503    }
504}
505
506pub struct GenericSingleObjectReader {
507    write_schema: ResolvedOwnedSchema,
508    expected_header: Vec<u8>,
509}
510
511impl GenericSingleObjectReader {
512    pub fn new(schema: Schema) -> AvroResult<GenericSingleObjectReader> {
513        let header_builder = RabinFingerprintHeader::from_schema(&schema);
514        Self::new_with_header_builder(schema, header_builder)
515    }
516
517    pub fn new_with_header_builder<HB: HeaderBuilder>(
518        schema: Schema,
519        header_builder: HB,
520    ) -> AvroResult<GenericSingleObjectReader> {
521        let expected_header = header_builder.build_header();
522        Ok(GenericSingleObjectReader {
523            write_schema: ResolvedOwnedSchema::try_from(schema)?,
524            expected_header,
525        })
526    }
527
528    pub fn read_value<R: Read>(&self, reader: &mut R) -> AvroResult<Value> {
529        let mut header = vec![0; self.expected_header.len()];
530        match reader.read_exact(&mut header) {
531            Ok(_) => {
532                if self.expected_header == header {
533                    decode_internal(
534                        self.write_schema.get_root_schema(),
535                        self.write_schema.get_names(),
536                        &None,
537                        reader,
538                    )
539                } else {
540                    Err(
541                        Details::SingleObjectHeaderMismatch(self.expected_header.clone(), header)
542                            .into(),
543                    )
544                }
545            }
546            Err(io_error) => Err(Details::ReadHeader(io_error).into()),
547        }
548    }
549}
550
551pub struct SpecificSingleObjectReader<T>
552where
553    T: AvroSchema,
554{
555    inner: GenericSingleObjectReader,
556    _model: PhantomData<T>,
557}
558
559impl<T> SpecificSingleObjectReader<T>
560where
561    T: AvroSchema,
562{
563    pub fn new() -> AvroResult<SpecificSingleObjectReader<T>> {
564        Ok(SpecificSingleObjectReader {
565            inner: GenericSingleObjectReader::new(T::get_schema())?,
566            _model: PhantomData,
567        })
568    }
569}
570
571impl<T> SpecificSingleObjectReader<T>
572where
573    T: AvroSchema + From<Value>,
574{
575    pub fn read_from_value<R: Read>(&self, reader: &mut R) -> AvroResult<T> {
576        self.inner.read_value(reader).map(|v| v.into())
577    }
578}
579
580impl<T> SpecificSingleObjectReader<T>
581where
582    T: AvroSchema + DeserializeOwned,
583{
584    pub fn read<R: Read>(&self, reader: &mut R) -> AvroResult<T> {
585        from_value::<T>(&self.inner.read_value(reader)?)
586    }
587}
588
589/// Reads the marker bytes from Avro bytes generated earlier by a `Writer`
590pub fn read_marker(bytes: &[u8]) -> [u8; 16] {
591    assert!(
592        bytes.len() > 16,
593        "The bytes are too short to read a marker from them"
594    );
595    let mut marker = [0_u8; 16];
596    marker.clone_from_slice(&bytes[(bytes.len() - 16)..]);
597    marker
598}
599
600#[cfg(test)]
601mod tests {
602    use super::*;
603    use crate::{encode::encode, headers::GlueSchemaUuidHeader, rabin::Rabin, types::Record};
604    use apache_avro_test_helper::TestResult;
605    use pretty_assertions::assert_eq;
606    use serde::Deserialize;
607    use std::io::Cursor;
608    use uuid::Uuid;
609
610    const SCHEMA: &str = r#"
611    {
612      "type": "record",
613      "name": "test",
614      "fields": [
615        {
616          "name": "a",
617          "type": "long",
618          "default": 42
619        },
620        {
621          "name": "b",
622          "type": "string"
623        }
624      ]
625    }
626    "#;
627    const UNION_SCHEMA: &str = r#"["null", "long"]"#;
628    const ENCODED: &[u8] = &[
629        79u8, 98u8, 106u8, 1u8, 4u8, 22u8, 97u8, 118u8, 114u8, 111u8, 46u8, 115u8, 99u8, 104u8,
630        101u8, 109u8, 97u8, 222u8, 1u8, 123u8, 34u8, 116u8, 121u8, 112u8, 101u8, 34u8, 58u8, 34u8,
631        114u8, 101u8, 99u8, 111u8, 114u8, 100u8, 34u8, 44u8, 34u8, 110u8, 97u8, 109u8, 101u8, 34u8,
632        58u8, 34u8, 116u8, 101u8, 115u8, 116u8, 34u8, 44u8, 34u8, 102u8, 105u8, 101u8, 108u8,
633        100u8, 115u8, 34u8, 58u8, 91u8, 123u8, 34u8, 110u8, 97u8, 109u8, 101u8, 34u8, 58u8, 34u8,
634        97u8, 34u8, 44u8, 34u8, 116u8, 121u8, 112u8, 101u8, 34u8, 58u8, 34u8, 108u8, 111u8, 110u8,
635        103u8, 34u8, 44u8, 34u8, 100u8, 101u8, 102u8, 97u8, 117u8, 108u8, 116u8, 34u8, 58u8, 52u8,
636        50u8, 125u8, 44u8, 123u8, 34u8, 110u8, 97u8, 109u8, 101u8, 34u8, 58u8, 34u8, 98u8, 34u8,
637        44u8, 34u8, 116u8, 121u8, 112u8, 101u8, 34u8, 58u8, 34u8, 115u8, 116u8, 114u8, 105u8,
638        110u8, 103u8, 34u8, 125u8, 93u8, 125u8, 20u8, 97u8, 118u8, 114u8, 111u8, 46u8, 99u8, 111u8,
639        100u8, 101u8, 99u8, 8u8, 110u8, 117u8, 108u8, 108u8, 0u8, 94u8, 61u8, 54u8, 221u8, 190u8,
640        207u8, 108u8, 180u8, 158u8, 57u8, 114u8, 40u8, 173u8, 199u8, 228u8, 239u8, 4u8, 20u8, 54u8,
641        6u8, 102u8, 111u8, 111u8, 84u8, 6u8, 98u8, 97u8, 114u8, 94u8, 61u8, 54u8, 221u8, 190u8,
642        207u8, 108u8, 180u8, 158u8, 57u8, 114u8, 40u8, 173u8, 199u8, 228u8, 239u8,
643    ];
644
645    #[test]
646    fn test_from_avro_datum() -> TestResult {
647        let schema = Schema::parse_str(SCHEMA)?;
648        let mut encoded: &'static [u8] = &[54, 6, 102, 111, 111];
649
650        let mut record = Record::new(&schema).unwrap();
651        record.put("a", 27i64);
652        record.put("b", "foo");
653        let expected = record.into();
654
655        assert_eq!(from_avro_datum(&schema, &mut encoded, None)?, expected);
656
657        Ok(())
658    }
659
660    #[test]
661    fn test_from_avro_datum_with_union_to_struct() -> TestResult {
662        const TEST_RECORD_SCHEMA_3240: &str = r#"
663    {
664      "type": "record",
665      "name": "test",
666      "fields": [
667        {
668          "name": "a",
669          "type": "long",
670          "default": 42
671        },
672        {
673          "name": "b",
674          "type": "string"
675        },
676        {
677            "name": "a_nullable_array",
678            "type": ["null", {"type": "array", "items": {"type": "string"}}],
679            "default": null
680        },
681        {
682            "name": "a_nullable_boolean",
683            "type": ["null", {"type": "boolean"}],
684            "default": null
685        },
686        {
687            "name": "a_nullable_string",
688            "type": ["null", {"type": "string"}],
689            "default": null
690        }
691      ]
692    }
693    "#;
694        #[derive(Default, Debug, Deserialize, PartialEq, Eq)]
695        struct TestRecord3240 {
696            a: i64,
697            b: String,
698            a_nullable_array: Option<Vec<String>>,
699            // we are missing the 'a_nullable_boolean' field to simulate missing keys
700            // a_nullable_boolean: Option<bool>,
701            a_nullable_string: Option<String>,
702        }
703
704        let schema = Schema::parse_str(TEST_RECORD_SCHEMA_3240)?;
705        let mut encoded: &'static [u8] = &[54, 6, 102, 111, 111];
706
707        let expected_record: TestRecord3240 = TestRecord3240 {
708            a: 27i64,
709            b: String::from("foo"),
710            a_nullable_array: None,
711            a_nullable_string: None,
712        };
713
714        let avro_datum = from_avro_datum(&schema, &mut encoded, None)?;
715        let parsed_record: TestRecord3240 = match &avro_datum {
716            Value::Record(_) => from_value::<TestRecord3240>(&avro_datum)?,
717            unexpected => {
718                panic!("could not map avro data to struct, found unexpected: {unexpected:?}")
719            }
720        };
721
722        assert_eq!(parsed_record, expected_record);
723
724        Ok(())
725    }
726
727    #[test]
728    fn test_null_union() -> TestResult {
729        let schema = Schema::parse_str(UNION_SCHEMA)?;
730        let mut encoded: &'static [u8] = &[2, 0];
731
732        assert_eq!(
733            from_avro_datum(&schema, &mut encoded, None)?,
734            Value::Union(1, Box::new(Value::Long(0)))
735        );
736
737        Ok(())
738    }
739
740    #[test]
741    fn test_reader_iterator() -> TestResult {
742        let schema = Schema::parse_str(SCHEMA)?;
743        let reader = Reader::with_schema(&schema, ENCODED)?;
744
745        let mut record1 = Record::new(&schema).unwrap();
746        record1.put("a", 27i64);
747        record1.put("b", "foo");
748
749        let mut record2 = Record::new(&schema).unwrap();
750        record2.put("a", 42i64);
751        record2.put("b", "bar");
752
753        let expected = [record1.into(), record2.into()];
754
755        for (i, value) in reader.enumerate() {
756            assert_eq!(value?, expected[i]);
757        }
758
759        Ok(())
760    }
761
762    #[test]
763    fn test_reader_invalid_header() -> TestResult {
764        let schema = Schema::parse_str(SCHEMA)?;
765        let invalid = ENCODED.iter().copied().skip(1).collect::<Vec<u8>>();
766        assert!(Reader::with_schema(&schema, &invalid[..]).is_err());
767
768        Ok(())
769    }
770
771    #[test]
772    fn test_reader_invalid_block() -> TestResult {
773        let schema = Schema::parse_str(SCHEMA)?;
774        let invalid = ENCODED
775            .iter()
776            .copied()
777            .rev()
778            .skip(19)
779            .collect::<Vec<u8>>()
780            .into_iter()
781            .rev()
782            .collect::<Vec<u8>>();
783        let reader = Reader::with_schema(&schema, &invalid[..])?;
784        for value in reader {
785            assert!(value.is_err());
786        }
787
788        Ok(())
789    }
790
791    #[test]
792    fn test_reader_empty_buffer() -> TestResult {
793        let empty = Cursor::new(Vec::new());
794        assert!(Reader::new(empty).is_err());
795
796        Ok(())
797    }
798
799    #[test]
800    fn test_reader_only_header() -> TestResult {
801        let invalid = ENCODED.iter().copied().take(165).collect::<Vec<u8>>();
802        let reader = Reader::new(&invalid[..])?;
803        for value in reader {
804            assert!(value.is_err());
805        }
806
807        Ok(())
808    }
809
810    #[test]
811    fn test_avro_3405_read_user_metadata_success() -> TestResult {
812        use crate::writer::Writer;
813
814        let schema = Schema::parse_str(SCHEMA)?;
815        let mut writer = Writer::new(&schema, Vec::new());
816
817        let mut user_meta_data: HashMap<String, Vec<u8>> = HashMap::new();
818        user_meta_data.insert(
819            "stringKey".to_string(),
820            "stringValue".to_string().into_bytes(),
821        );
822        user_meta_data.insert("bytesKey".to_string(), b"bytesValue".to_vec());
823        user_meta_data.insert("vecKey".to_string(), vec![1, 2, 3]);
824
825        for (k, v) in user_meta_data.iter() {
826            writer.add_user_metadata(k.to_string(), v)?;
827        }
828
829        let mut record = Record::new(&schema).unwrap();
830        record.put("a", 27i64);
831        record.put("b", "foo");
832
833        writer.append(record.clone())?;
834        writer.append(record.clone())?;
835        writer.flush()?;
836        let result = writer.into_inner()?;
837
838        let reader = Reader::new(&result[..])?;
839        assert_eq!(reader.user_metadata(), &user_meta_data);
840
841        Ok(())
842    }
843
844    #[derive(Deserialize, Clone, PartialEq, Debug)]
845    struct TestSingleObjectReader {
846        a: i64,
847        b: f64,
848        c: Vec<String>,
849    }
850
851    impl AvroSchema for TestSingleObjectReader {
852        fn get_schema() -> Schema {
853            let schema = r#"
854            {
855                "type":"record",
856                "name":"TestSingleObjectWrtierSerialize",
857                "fields":[
858                    {
859                        "name":"a",
860                        "type":"long"
861                    },
862                    {
863                        "name":"b",
864                        "type":"double"
865                    },
866                    {
867                        "name":"c",
868                        "type":{
869                            "type":"array",
870                            "items":"string"
871                        }
872                    }
873                ]
874            }
875            "#;
876            Schema::parse_str(schema).unwrap()
877        }
878    }
879
880    impl From<Value> for TestSingleObjectReader {
881        fn from(obj: Value) -> TestSingleObjectReader {
882            if let Value::Record(fields) = obj {
883                let mut a = None;
884                let mut b = None;
885                let mut c = vec![];
886                for (field_name, v) in fields {
887                    match (field_name.as_str(), v) {
888                        ("a", Value::Long(i)) => a = Some(i),
889                        ("b", Value::Double(d)) => b = Some(d),
890                        ("c", Value::Array(v)) => {
891                            for inner_val in v {
892                                if let Value::String(s) = inner_val {
893                                    c.push(s);
894                                }
895                            }
896                        }
897                        (key, value) => panic!("Unexpected pair: {key:?} -> {value:?}"),
898                    }
899                }
900                TestSingleObjectReader {
901                    a: a.unwrap(),
902                    b: b.unwrap(),
903                    c,
904                }
905            } else {
906                panic!("Expected a Value::Record but was {obj:?}")
907            }
908        }
909    }
910
911    impl From<TestSingleObjectReader> for Value {
912        fn from(obj: TestSingleObjectReader) -> Value {
913            Value::Record(vec![
914                ("a".into(), obj.a.into()),
915                ("b".into(), obj.b.into()),
916                (
917                    "c".into(),
918                    Value::Array(obj.c.into_iter().map(|s| s.into()).collect()),
919                ),
920            ])
921        }
922    }
923
924    #[test]
925    fn test_avro_3507_single_object_reader() -> TestResult {
926        let obj = TestSingleObjectReader {
927            a: 42,
928            b: 3.33,
929            c: vec!["cat".into(), "dog".into()],
930        };
931        let mut to_read = Vec::<u8>::new();
932        to_read.extend_from_slice(&[0xC3, 0x01]);
933        to_read.extend_from_slice(
934            &TestSingleObjectReader::get_schema()
935                .fingerprint::<Rabin>()
936                .bytes[..],
937        );
938        encode(
939            &obj.clone().into(),
940            &TestSingleObjectReader::get_schema(),
941            &mut to_read,
942        )
943        .expect("Encode should succeed");
944        let mut to_read = &to_read[..];
945        let generic_reader = GenericSingleObjectReader::new(TestSingleObjectReader::get_schema())
946            .expect("Schema should resolve");
947        let val = generic_reader
948            .read_value(&mut to_read)
949            .expect("Should read");
950        let expected_value: Value = obj.into();
951        assert_eq!(expected_value, val);
952
953        Ok(())
954    }
955
956    #[test]
957    fn avro_3642_test_single_object_reader_incomplete_reads() -> TestResult {
958        let obj = TestSingleObjectReader {
959            a: 42,
960            b: 3.33,
961            c: vec!["cat".into(), "dog".into()],
962        };
963        // The two-byte marker, to show that the message uses this single-record format
964        let to_read_1 = [0xC3, 0x01];
965        let mut to_read_2 = Vec::<u8>::new();
966        to_read_2.extend_from_slice(
967            &TestSingleObjectReader::get_schema()
968                .fingerprint::<Rabin>()
969                .bytes[..],
970        );
971        let mut to_read_3 = Vec::<u8>::new();
972        encode(
973            &obj.clone().into(),
974            &TestSingleObjectReader::get_schema(),
975            &mut to_read_3,
976        )
977        .expect("Encode should succeed");
978        let mut to_read = (&to_read_1[..]).chain(&to_read_2[..]).chain(&to_read_3[..]);
979        let generic_reader = GenericSingleObjectReader::new(TestSingleObjectReader::get_schema())
980            .expect("Schema should resolve");
981        let val = generic_reader
982            .read_value(&mut to_read)
983            .expect("Should read");
984        let expected_value: Value = obj.into();
985        assert_eq!(expected_value, val);
986
987        Ok(())
988    }
989
990    #[test]
991    fn test_avro_3507_reader_parity() -> TestResult {
992        let obj = TestSingleObjectReader {
993            a: 42,
994            b: 3.33,
995            c: vec!["cat".into(), "dog".into()],
996        };
997
998        let mut to_read = Vec::<u8>::new();
999        to_read.extend_from_slice(&[0xC3, 0x01]);
1000        to_read.extend_from_slice(
1001            &TestSingleObjectReader::get_schema()
1002                .fingerprint::<Rabin>()
1003                .bytes[..],
1004        );
1005        encode(
1006            &obj.clone().into(),
1007            &TestSingleObjectReader::get_schema(),
1008            &mut to_read,
1009        )
1010        .expect("Encode should succeed");
1011        let generic_reader = GenericSingleObjectReader::new(TestSingleObjectReader::get_schema())
1012            .expect("Schema should resolve");
1013        let specific_reader = SpecificSingleObjectReader::<TestSingleObjectReader>::new()
1014            .expect("schema should resolve");
1015        let mut to_read1 = &to_read[..];
1016        let mut to_read2 = &to_read[..];
1017        let mut to_read3 = &to_read[..];
1018
1019        let val = generic_reader
1020            .read_value(&mut to_read1)
1021            .expect("Should read");
1022        let read_obj1 = specific_reader
1023            .read_from_value(&mut to_read2)
1024            .expect("Should read from value");
1025        let read_obj2 = specific_reader
1026            .read(&mut to_read3)
1027            .expect("Should read from deserilize");
1028        let expected_value: Value = obj.clone().into();
1029        assert_eq!(obj, read_obj1);
1030        assert_eq!(obj, read_obj2);
1031        assert_eq!(val, expected_value);
1032
1033        Ok(())
1034    }
1035
1036    #[test]
1037    fn avro_rs_164_generic_reader_alternate_header() -> TestResult {
1038        let schema_uuid = Uuid::parse_str("b2f1cf00-0434-013e-439a-125eb8485a5f")?;
1039        let header_builder = GlueSchemaUuidHeader::from_uuid(schema_uuid);
1040        let generic_reader = GenericSingleObjectReader::new_with_header_builder(
1041            TestSingleObjectReader::get_schema(),
1042            header_builder,
1043        )
1044        .expect("failed to build reader");
1045        let data_to_read: Vec<u8> = vec![
1046            3, 0, 178, 241, 207, 0, 4, 52, 1, 62, 67, 154, 18, 94, 184, 72, 90, 95,
1047        ];
1048        let mut to_read = &data_to_read[..];
1049        let read_result = generic_reader
1050            .read_value(&mut to_read)
1051            .map_err(Error::into_details);
1052        matches!(read_result, Err(Details::ReadBytes(_)));
1053        Ok(())
1054    }
1055
1056    #[cfg(not(feature = "snappy"))]
1057    #[test]
1058    fn test_avro_3549_read_not_enabled_codec() {
1059        let snappy_compressed_avro = vec![
1060            79, 98, 106, 1, 4, 22, 97, 118, 114, 111, 46, 115, 99, 104, 101, 109, 97, 210, 1, 123,
1061            34, 102, 105, 101, 108, 100, 115, 34, 58, 91, 123, 34, 110, 97, 109, 101, 34, 58, 34,
1062            110, 117, 109, 34, 44, 34, 116, 121, 112, 101, 34, 58, 34, 115, 116, 114, 105, 110,
1063            103, 34, 125, 93, 44, 34, 110, 97, 109, 101, 34, 58, 34, 101, 118, 101, 110, 116, 34,
1064            44, 34, 110, 97, 109, 101, 115, 112, 97, 99, 101, 34, 58, 34, 101, 120, 97, 109, 112,
1065            108, 101, 110, 97, 109, 101, 115, 112, 97, 99, 101, 34, 44, 34, 116, 121, 112, 101, 34,
1066            58, 34, 114, 101, 99, 111, 114, 100, 34, 125, 20, 97, 118, 114, 111, 46, 99, 111, 100,
1067            101, 99, 12, 115, 110, 97, 112, 112, 121, 0, 213, 209, 241, 208, 200, 110, 164, 47,
1068            203, 25, 90, 235, 161, 167, 195, 177, 2, 20, 4, 12, 6, 49, 50, 51, 115, 38, 58, 0, 213,
1069            209, 241, 208, 200, 110, 164, 47, 203, 25, 90, 235, 161, 167, 195, 177,
1070        ];
1071
1072        if let Err(err) = Reader::new(snappy_compressed_avro.as_slice()) {
1073            assert_eq!("Codec 'snappy' is not supported/enabled", err.to_string());
1074        } else {
1075            panic!("Expected an error in the reading of the codec!");
1076        }
1077    }
1078}