apache_avro/
reader.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Logic handling reading from Avro format at user level.
19use crate::{
20    AvroResult, Codec, Error,
21    decode::{decode, decode_internal},
22    error::Details,
23    from_value,
24    headers::{HeaderBuilder, RabinFingerprintHeader},
25    schema::{
26        AvroSchema, Names, ResolvedOwnedSchema, ResolvedSchema, Schema, resolve_names,
27        resolve_names_with_schemata,
28    },
29    types::Value,
30    util,
31};
32use log::warn;
33use serde::de::DeserializeOwned;
34use serde_json::from_slice;
35use std::{
36    collections::HashMap,
37    io::{ErrorKind, Read},
38    marker::PhantomData,
39    str::FromStr,
40};
41
42/// Internal Block reader.
43#[derive(Debug, Clone)]
44struct Block<'r, R> {
45    reader: R,
46    /// Internal buffering to reduce allocation.
47    buf: Vec<u8>,
48    buf_idx: usize,
49    /// Number of elements expected to exist within this block.
50    message_count: usize,
51    marker: [u8; 16],
52    codec: Codec,
53    writer_schema: Schema,
54    schemata: Vec<&'r Schema>,
55    user_metadata: HashMap<String, Vec<u8>>,
56    names_refs: Names,
57}
58
59impl<'r, R: Read> Block<'r, R> {
60    fn new(reader: R, schemata: Vec<&'r Schema>) -> AvroResult<Block<'r, R>> {
61        let mut block = Block {
62            reader,
63            codec: Codec::Null,
64            writer_schema: Schema::Null,
65            schemata,
66            buf: vec![],
67            buf_idx: 0,
68            message_count: 0,
69            marker: [0; 16],
70            user_metadata: Default::default(),
71            names_refs: Default::default(),
72        };
73
74        block.read_header()?;
75        Ok(block)
76    }
77
78    /// Try to read the header and to set the writer `Schema`, the `Codec` and the marker based on
79    /// its content.
80    fn read_header(&mut self) -> AvroResult<()> {
81        let mut buf = [0u8; 4];
82        self.reader
83            .read_exact(&mut buf)
84            .map_err(Details::ReadHeader)?;
85
86        if buf != [b'O', b'b', b'j', 1u8] {
87            return Err(Details::HeaderMagic.into());
88        }
89
90        let meta_schema = Schema::map(Schema::Bytes);
91        match decode(&meta_schema, &mut self.reader)? {
92            Value::Map(metadata) => {
93                self.read_writer_schema(&metadata)?;
94                self.codec = read_codec(&metadata)?;
95
96                for (key, value) in metadata {
97                    if key == "avro.schema"
98                        || key == "avro.codec"
99                        || key == "avro.codec.compression_level"
100                    {
101                        // already processed
102                    } else if key.starts_with("avro.") {
103                        warn!("Ignoring unknown metadata key: {key}");
104                    } else {
105                        self.read_user_metadata(key, value);
106                    }
107                }
108            }
109            _ => {
110                return Err(Details::GetHeaderMetadata.into());
111            }
112        }
113
114        self.reader
115            .read_exact(&mut self.marker)
116            .map_err(|e| Details::ReadMarker(e).into())
117    }
118
119    fn fill_buf(&mut self, n: usize) -> AvroResult<()> {
120        // The buffer needs to contain exactly `n` elements, otherwise codecs will potentially read
121        // invalid bytes.
122        //
123        // The are two cases to handle here:
124        //
125        // 1. `n > self.buf.len()`:
126        //    In this case we call `Vec::resize`, which guarantees that `self.buf.len() == n`.
127        // 2. `n < self.buf.len()`:
128        //    We need to resize to ensure that the buffer len is safe to read `n` elements.
129        //
130        // TODO: Figure out a way to avoid having to truncate for the second case.
131        self.buf.resize(util::safe_len(n)?, 0);
132        self.reader
133            .read_exact(&mut self.buf)
134            .map_err(Details::ReadIntoBuf)?;
135        self.buf_idx = 0;
136        Ok(())
137    }
138
139    /// Try to read a data block, also performing schema resolution for the objects contained in
140    /// the block. The objects are stored in an internal buffer to the `Reader`.
141    fn read_block_next(&mut self) -> AvroResult<()> {
142        assert!(self.is_empty(), "Expected self to be empty!");
143        match util::read_long(&mut self.reader).map_err(Error::into_details) {
144            Ok(block_len) => {
145                self.message_count = block_len as usize;
146                let block_bytes = util::read_long(&mut self.reader)?;
147                self.fill_buf(block_bytes as usize)?;
148                let mut marker = [0u8; 16];
149                self.reader
150                    .read_exact(&mut marker)
151                    .map_err(Details::ReadBlockMarker)?;
152
153                if marker != self.marker {
154                    return Err(Details::GetBlockMarker.into());
155                }
156
157                // NOTE (JAB): This doesn't fit this Reader pattern very well.
158                // `self.buf` is a growable buffer that is reused as the reader is iterated.
159                // For non `Codec::Null` variants, `decompress` will allocate a new `Vec`
160                // and replace `buf` with the new one, instead of reusing the same buffer.
161                // We can address this by using some "limited read" type to decode directly
162                // into the buffer. But this is fine, for now.
163                self.codec.decompress(&mut self.buf)
164            }
165            Err(Details::ReadVariableIntegerBytes(io_err)) => {
166                if let ErrorKind::UnexpectedEof = io_err.kind() {
167                    // to not return any error in case we only finished to read cleanly from the stream
168                    Ok(())
169                } else {
170                    Err(Details::ReadVariableIntegerBytes(io_err).into())
171                }
172            }
173            Err(e) => Err(Error::new(e)),
174        }
175    }
176
177    fn len(&self) -> usize {
178        self.message_count
179    }
180
181    fn is_empty(&self) -> bool {
182        self.len() == 0
183    }
184
185    fn read_next(&mut self, read_schema: Option<&Schema>) -> AvroResult<Option<Value>> {
186        if self.is_empty() {
187            self.read_block_next()?;
188            if self.is_empty() {
189                return Ok(None);
190            }
191        }
192
193        let mut block_bytes = &self.buf[self.buf_idx..];
194        let b_original = block_bytes.len();
195
196        let item = decode_internal(
197            &self.writer_schema,
198            &self.names_refs,
199            &None,
200            &mut block_bytes,
201        )?;
202        let item = match read_schema {
203            Some(schema) => item.resolve(schema)?,
204            None => item,
205        };
206
207        if b_original != 0 && b_original == block_bytes.len() {
208            // from_avro_datum did not consume any bytes, so return an error to avoid an infinite loop
209            return Err(Details::ReadBlock.into());
210        }
211        self.buf_idx += b_original - block_bytes.len();
212        self.message_count -= 1;
213        Ok(Some(item))
214    }
215
216    fn read_writer_schema(&mut self, metadata: &HashMap<String, Value>) -> AvroResult<()> {
217        let json: serde_json::Value = metadata
218            .get("avro.schema")
219            .and_then(|bytes| {
220                if let Value::Bytes(ref bytes) = *bytes {
221                    from_slice(bytes.as_ref()).ok()
222                } else {
223                    None
224                }
225            })
226            .ok_or(Details::GetAvroSchemaFromMap)?;
227        if !self.schemata.is_empty() {
228            let rs = ResolvedSchema::try_from(self.schemata.clone())?;
229            let names: Names = rs
230                .get_names()
231                .iter()
232                .map(|(name, schema)| (name.clone(), (*schema).clone()))
233                .collect();
234            self.writer_schema = Schema::parse_with_names(&json, names)?;
235            resolve_names_with_schemata(
236                self.schemata.iter().copied(),
237                &mut self.names_refs,
238                &None,
239            )?;
240        } else {
241            self.writer_schema = Schema::parse(&json)?;
242            resolve_names(&self.writer_schema, &mut self.names_refs, &None)?;
243        }
244        Ok(())
245    }
246
247    fn read_user_metadata(&mut self, key: String, value: Value) {
248        match value {
249            Value::Bytes(ref vec) => {
250                self.user_metadata.insert(key, vec.clone());
251            }
252            wrong => {
253                warn!("User metadata values must be Value::Bytes, found {wrong:?}");
254            }
255        }
256    }
257}
258
259fn read_codec(metadata: &HashMap<String, Value>) -> AvroResult<Codec> {
260    let result = metadata
261        .get("avro.codec")
262        .map(|codec| {
263            if let Value::Bytes(ref bytes) = *codec {
264                match std::str::from_utf8(bytes.as_ref()) {
265                    Ok(utf8) => Ok(utf8),
266                    Err(utf8_error) => Err(Details::ConvertToUtf8Error(utf8_error).into()),
267                }
268            } else {
269                Err(Details::BadCodecMetadata.into())
270            }
271        })
272        .map(|codec_res| match codec_res {
273            Ok(codec) => match Codec::from_str(codec) {
274                Ok(codec) => match codec {
275                    #[cfg(feature = "bzip")]
276                    Codec::Bzip2(_) => {
277                        use crate::Bzip2Settings;
278                        if let Some(Value::Bytes(bytes)) =
279                            metadata.get("avro.codec.compression_level")
280                        {
281                            Ok(Codec::Bzip2(Bzip2Settings::new(bytes[0])))
282                        } else {
283                            Ok(codec)
284                        }
285                    }
286                    #[cfg(feature = "xz")]
287                    Codec::Xz(_) => {
288                        use crate::XzSettings;
289                        if let Some(Value::Bytes(bytes)) =
290                            metadata.get("avro.codec.compression_level")
291                        {
292                            Ok(Codec::Xz(XzSettings::new(bytes[0])))
293                        } else {
294                            Ok(codec)
295                        }
296                    }
297                    #[cfg(feature = "zstandard")]
298                    Codec::Zstandard(_) => {
299                        use crate::ZstandardSettings;
300                        if let Some(Value::Bytes(bytes)) =
301                            metadata.get("avro.codec.compression_level")
302                        {
303                            Ok(Codec::Zstandard(ZstandardSettings::new(bytes[0])))
304                        } else {
305                            Ok(codec)
306                        }
307                    }
308                    _ => Ok(codec),
309                },
310                Err(_) => Err(Details::CodecNotSupported(codec.to_owned()).into()),
311            },
312            Err(err) => Err(err),
313        });
314
315    result.unwrap_or(Ok(Codec::Null))
316}
317
318/// Main interface for reading Avro formatted values.
319///
320/// To be used as an iterator:
321///
322/// ```no_run
323/// # use apache_avro::Reader;
324/// # use std::io::Cursor;
325/// # let input = Cursor::new(Vec::<u8>::new());
326/// for value in Reader::new(input).unwrap() {
327///     match value {
328///         Ok(v) => println!("{:?}", v),
329///         Err(e) => println!("Error: {}", e),
330///     };
331/// }
332/// ```
333pub struct Reader<'a, R> {
334    block: Block<'a, R>,
335    reader_schema: Option<&'a Schema>,
336    errored: bool,
337    should_resolve_schema: bool,
338}
339
340impl<'a, R: Read> Reader<'a, R> {
341    /// Creates a `Reader` given something implementing the `io::Read` trait to read from.
342    /// No reader `Schema` will be set.
343    ///
344    /// **NOTE** The avro header is going to be read automatically upon creation of the `Reader`.
345    pub fn new(reader: R) -> AvroResult<Reader<'a, R>> {
346        let block = Block::new(reader, vec![])?;
347        let reader = Reader {
348            block,
349            reader_schema: None,
350            errored: false,
351            should_resolve_schema: false,
352        };
353        Ok(reader)
354    }
355
356    /// Creates a `Reader` given a reader `Schema` and something implementing the `io::Read` trait
357    /// to read from.
358    ///
359    /// **NOTE** The avro header is going to be read automatically upon creation of the `Reader`.
360    pub fn with_schema(schema: &'a Schema, reader: R) -> AvroResult<Reader<'a, R>> {
361        let block = Block::new(reader, vec![schema])?;
362        let mut reader = Reader {
363            block,
364            reader_schema: Some(schema),
365            errored: false,
366            should_resolve_schema: false,
367        };
368        // Check if the reader and writer schemas disagree.
369        reader.should_resolve_schema = reader.writer_schema() != schema;
370        Ok(reader)
371    }
372
373    /// Creates a `Reader` given a reader `Schema` and something implementing the `io::Read` trait
374    /// to read from.
375    ///
376    /// **NOTE** The avro header is going to be read automatically upon creation of the `Reader`.
377    pub fn with_schemata(
378        schema: &'a Schema,
379        schemata: Vec<&'a Schema>,
380        reader: R,
381    ) -> AvroResult<Reader<'a, R>> {
382        let block = Block::new(reader, schemata)?;
383        let mut reader = Reader {
384            block,
385            reader_schema: Some(schema),
386            errored: false,
387            should_resolve_schema: false,
388        };
389        // Check if the reader and writer schemas disagree.
390        reader.should_resolve_schema = reader.writer_schema() != schema;
391        Ok(reader)
392    }
393
394    /// Get a reference to the writer `Schema`.
395    #[inline]
396    pub fn writer_schema(&self) -> &Schema {
397        &self.block.writer_schema
398    }
399
400    /// Get a reference to the optional reader `Schema`.
401    #[inline]
402    pub fn reader_schema(&self) -> Option<&Schema> {
403        self.reader_schema
404    }
405
406    /// Get a reference to the user metadata
407    #[inline]
408    pub fn user_metadata(&self) -> &HashMap<String, Vec<u8>> {
409        &self.block.user_metadata
410    }
411
412    #[inline]
413    fn read_next(&mut self) -> AvroResult<Option<Value>> {
414        let read_schema = if self.should_resolve_schema {
415            self.reader_schema
416        } else {
417            None
418        };
419
420        self.block.read_next(read_schema)
421    }
422}
423
424impl<R: Read> Iterator for Reader<'_, R> {
425    type Item = AvroResult<Value>;
426
427    fn next(&mut self) -> Option<Self::Item> {
428        // to prevent keep on reading after the first error occurs
429        if self.errored {
430            return None;
431        };
432        match self.read_next() {
433            Ok(opt) => opt.map(Ok),
434            Err(e) => {
435                self.errored = true;
436                Some(Err(e))
437            }
438        }
439    }
440}
441
442/// Decode a `Value` encoded in Avro format given its `Schema` and anything implementing `io::Read`
443/// to read from.
444///
445/// In case a reader `Schema` is provided, schema resolution will also be performed.
446///
447/// **NOTE** This function has a quite small niche of usage and does NOT take care of reading the
448/// header and consecutive data blocks; use [`Reader`](struct.Reader.html) if you don't know what
449/// you are doing, instead.
450pub fn from_avro_datum<R: Read>(
451    writer_schema: &Schema,
452    reader: &mut R,
453    reader_schema: Option<&Schema>,
454) -> AvroResult<Value> {
455    let value = decode(writer_schema, reader)?;
456    match reader_schema {
457        Some(schema) => value.resolve(schema),
458        None => Ok(value),
459    }
460}
461
462/// Decode a `Value` encoded in Avro format given the provided `Schema` and anything implementing `io::Read`
463/// to read from.
464/// If the writer schema is incomplete, i.e. contains `Schema::Ref`s then it will use the provided
465/// schemata to resolve any dependencies.
466///
467/// In case a reader `Schema` is provided, schema resolution will also be performed.
468pub fn from_avro_datum_schemata<R: Read>(
469    writer_schema: &Schema,
470    writer_schemata: Vec<&Schema>,
471    reader: &mut R,
472    reader_schema: Option<&Schema>,
473) -> AvroResult<Value> {
474    from_avro_datum_reader_schemata(
475        writer_schema,
476        writer_schemata,
477        reader,
478        reader_schema,
479        Vec::with_capacity(0),
480    )
481}
482
483/// Decode a `Value` encoded in Avro format given the provided `Schema` and anything implementing `io::Read`
484/// to read from.
485/// If the writer schema is incomplete, i.e. contains `Schema::Ref`s then it will use the provided
486/// schemata to resolve any dependencies.
487///
488/// In case a reader `Schema` is provided, schema resolution will also be performed.
489pub fn from_avro_datum_reader_schemata<R: Read>(
490    writer_schema: &Schema,
491    writer_schemata: Vec<&Schema>,
492    reader: &mut R,
493    reader_schema: Option<&Schema>,
494    reader_schemata: Vec<&Schema>,
495) -> AvroResult<Value> {
496    let rs = ResolvedSchema::try_from(writer_schemata)?;
497    let value = decode_internal(writer_schema, rs.get_names(), &None, reader)?;
498    match reader_schema {
499        Some(schema) => {
500            if reader_schemata.is_empty() {
501                value.resolve(schema)
502            } else {
503                value.resolve_schemata(schema, reader_schemata)
504            }
505        }
506        None => Ok(value),
507    }
508}
509
510pub struct GenericSingleObjectReader {
511    write_schema: ResolvedOwnedSchema,
512    expected_header: Vec<u8>,
513}
514
515impl GenericSingleObjectReader {
516    pub fn new(schema: Schema) -> AvroResult<GenericSingleObjectReader> {
517        let header_builder = RabinFingerprintHeader::from_schema(&schema);
518        Self::new_with_header_builder(schema, header_builder)
519    }
520
521    pub fn new_with_header_builder<HB: HeaderBuilder>(
522        schema: Schema,
523        header_builder: HB,
524    ) -> AvroResult<GenericSingleObjectReader> {
525        let expected_header = header_builder.build_header();
526        Ok(GenericSingleObjectReader {
527            write_schema: ResolvedOwnedSchema::try_from(schema)?,
528            expected_header,
529        })
530    }
531
532    pub fn read_value<R: Read>(&self, reader: &mut R) -> AvroResult<Value> {
533        let mut header = vec![0; self.expected_header.len()];
534        match reader.read_exact(&mut header) {
535            Ok(_) => {
536                if self.expected_header == header {
537                    decode_internal(
538                        self.write_schema.get_root_schema(),
539                        self.write_schema.get_names(),
540                        &None,
541                        reader,
542                    )
543                } else {
544                    Err(
545                        Details::SingleObjectHeaderMismatch(self.expected_header.clone(), header)
546                            .into(),
547                    )
548                }
549            }
550            Err(io_error) => Err(Details::ReadHeader(io_error).into()),
551        }
552    }
553}
554
555pub struct SpecificSingleObjectReader<T>
556where
557    T: AvroSchema,
558{
559    inner: GenericSingleObjectReader,
560    _model: PhantomData<T>,
561}
562
563impl<T> SpecificSingleObjectReader<T>
564where
565    T: AvroSchema,
566{
567    pub fn new() -> AvroResult<SpecificSingleObjectReader<T>> {
568        Ok(SpecificSingleObjectReader {
569            inner: GenericSingleObjectReader::new(T::get_schema())?,
570            _model: PhantomData,
571        })
572    }
573}
574
575impl<T> SpecificSingleObjectReader<T>
576where
577    T: AvroSchema + From<Value>,
578{
579    pub fn read_from_value<R: Read>(&self, reader: &mut R) -> AvroResult<T> {
580        self.inner.read_value(reader).map(|v| v.into())
581    }
582}
583
584impl<T> SpecificSingleObjectReader<T>
585where
586    T: AvroSchema + DeserializeOwned,
587{
588    pub fn read<R: Read>(&self, reader: &mut R) -> AvroResult<T> {
589        from_value::<T>(&self.inner.read_value(reader)?)
590    }
591}
592
593/// Reads the marker bytes from Avro bytes generated earlier by a `Writer`
594pub fn read_marker(bytes: &[u8]) -> [u8; 16] {
595    assert!(
596        bytes.len() > 16,
597        "The bytes are too short to read a marker from them"
598    );
599    let mut marker = [0_u8; 16];
600    marker.clone_from_slice(&bytes[(bytes.len() - 16)..]);
601    marker
602}
603
604#[cfg(test)]
605mod tests {
606    use super::*;
607    use crate::{encode::encode, headers::GlueSchemaUuidHeader, rabin::Rabin, types::Record};
608    use apache_avro_test_helper::TestResult;
609    use pretty_assertions::assert_eq;
610    use serde::Deserialize;
611    use std::io::Cursor;
612    use uuid::Uuid;
613
614    const SCHEMA: &str = r#"
615    {
616      "type": "record",
617      "name": "test",
618      "fields": [
619        {
620          "name": "a",
621          "type": "long",
622          "default": 42
623        },
624        {
625          "name": "b",
626          "type": "string"
627        }
628      ]
629    }
630    "#;
631    const UNION_SCHEMA: &str = r#"["null", "long"]"#;
632    const ENCODED: &[u8] = &[
633        79u8, 98u8, 106u8, 1u8, 4u8, 22u8, 97u8, 118u8, 114u8, 111u8, 46u8, 115u8, 99u8, 104u8,
634        101u8, 109u8, 97u8, 222u8, 1u8, 123u8, 34u8, 116u8, 121u8, 112u8, 101u8, 34u8, 58u8, 34u8,
635        114u8, 101u8, 99u8, 111u8, 114u8, 100u8, 34u8, 44u8, 34u8, 110u8, 97u8, 109u8, 101u8, 34u8,
636        58u8, 34u8, 116u8, 101u8, 115u8, 116u8, 34u8, 44u8, 34u8, 102u8, 105u8, 101u8, 108u8,
637        100u8, 115u8, 34u8, 58u8, 91u8, 123u8, 34u8, 110u8, 97u8, 109u8, 101u8, 34u8, 58u8, 34u8,
638        97u8, 34u8, 44u8, 34u8, 116u8, 121u8, 112u8, 101u8, 34u8, 58u8, 34u8, 108u8, 111u8, 110u8,
639        103u8, 34u8, 44u8, 34u8, 100u8, 101u8, 102u8, 97u8, 117u8, 108u8, 116u8, 34u8, 58u8, 52u8,
640        50u8, 125u8, 44u8, 123u8, 34u8, 110u8, 97u8, 109u8, 101u8, 34u8, 58u8, 34u8, 98u8, 34u8,
641        44u8, 34u8, 116u8, 121u8, 112u8, 101u8, 34u8, 58u8, 34u8, 115u8, 116u8, 114u8, 105u8,
642        110u8, 103u8, 34u8, 125u8, 93u8, 125u8, 20u8, 97u8, 118u8, 114u8, 111u8, 46u8, 99u8, 111u8,
643        100u8, 101u8, 99u8, 8u8, 110u8, 117u8, 108u8, 108u8, 0u8, 94u8, 61u8, 54u8, 221u8, 190u8,
644        207u8, 108u8, 180u8, 158u8, 57u8, 114u8, 40u8, 173u8, 199u8, 228u8, 239u8, 4u8, 20u8, 54u8,
645        6u8, 102u8, 111u8, 111u8, 84u8, 6u8, 98u8, 97u8, 114u8, 94u8, 61u8, 54u8, 221u8, 190u8,
646        207u8, 108u8, 180u8, 158u8, 57u8, 114u8, 40u8, 173u8, 199u8, 228u8, 239u8,
647    ];
648
649    #[test]
650    fn test_from_avro_datum() -> TestResult {
651        let schema = Schema::parse_str(SCHEMA)?;
652        let mut encoded: &'static [u8] = &[54, 6, 102, 111, 111];
653
654        let mut record = Record::new(&schema).unwrap();
655        record.put("a", 27i64);
656        record.put("b", "foo");
657        let expected = record.into();
658
659        assert_eq!(from_avro_datum(&schema, &mut encoded, None)?, expected);
660
661        Ok(())
662    }
663
664    #[test]
665    fn test_from_avro_datum_with_union_to_struct() -> TestResult {
666        const TEST_RECORD_SCHEMA_3240: &str = r#"
667    {
668      "type": "record",
669      "name": "test",
670      "fields": [
671        {
672          "name": "a",
673          "type": "long",
674          "default": 42
675        },
676        {
677          "name": "b",
678          "type": "string"
679        },
680        {
681            "name": "a_nullable_array",
682            "type": ["null", {"type": "array", "items": {"type": "string"}}],
683            "default": null
684        },
685        {
686            "name": "a_nullable_boolean",
687            "type": ["null", {"type": "boolean"}],
688            "default": null
689        },
690        {
691            "name": "a_nullable_string",
692            "type": ["null", {"type": "string"}],
693            "default": null
694        }
695      ]
696    }
697    "#;
698        #[derive(Default, Debug, Deserialize, PartialEq, Eq)]
699        struct TestRecord3240 {
700            a: i64,
701            b: String,
702            a_nullable_array: Option<Vec<String>>,
703            // we are missing the 'a_nullable_boolean' field to simulate missing keys
704            // a_nullable_boolean: Option<bool>,
705            a_nullable_string: Option<String>,
706        }
707
708        let schema = Schema::parse_str(TEST_RECORD_SCHEMA_3240)?;
709        let mut encoded: &'static [u8] = &[54, 6, 102, 111, 111];
710
711        let expected_record: TestRecord3240 = TestRecord3240 {
712            a: 27i64,
713            b: String::from("foo"),
714            a_nullable_array: None,
715            a_nullable_string: None,
716        };
717
718        let avro_datum = from_avro_datum(&schema, &mut encoded, None)?;
719        let parsed_record: TestRecord3240 = match &avro_datum {
720            Value::Record(_) => from_value::<TestRecord3240>(&avro_datum)?,
721            unexpected => {
722                panic!("could not map avro data to struct, found unexpected: {unexpected:?}")
723            }
724        };
725
726        assert_eq!(parsed_record, expected_record);
727
728        Ok(())
729    }
730
731    #[test]
732    fn test_null_union() -> TestResult {
733        let schema = Schema::parse_str(UNION_SCHEMA)?;
734        let mut encoded: &'static [u8] = &[2, 0];
735
736        assert_eq!(
737            from_avro_datum(&schema, &mut encoded, None)?,
738            Value::Union(1, Box::new(Value::Long(0)))
739        );
740
741        Ok(())
742    }
743
744    #[test]
745    fn test_reader_iterator() -> TestResult {
746        let schema = Schema::parse_str(SCHEMA)?;
747        let reader = Reader::with_schema(&schema, ENCODED)?;
748
749        let mut record1 = Record::new(&schema).unwrap();
750        record1.put("a", 27i64);
751        record1.put("b", "foo");
752
753        let mut record2 = Record::new(&schema).unwrap();
754        record2.put("a", 42i64);
755        record2.put("b", "bar");
756
757        let expected = [record1.into(), record2.into()];
758
759        for (i, value) in reader.enumerate() {
760            assert_eq!(value?, expected[i]);
761        }
762
763        Ok(())
764    }
765
766    #[test]
767    fn test_reader_invalid_header() -> TestResult {
768        let schema = Schema::parse_str(SCHEMA)?;
769        let mut invalid = &ENCODED[1..];
770        assert!(Reader::with_schema(&schema, &mut invalid).is_err());
771
772        Ok(())
773    }
774
775    #[test]
776    fn test_reader_invalid_block() -> TestResult {
777        let schema = Schema::parse_str(SCHEMA)?;
778        let mut invalid = &ENCODED[0..ENCODED.len() - 19];
779        let reader = Reader::with_schema(&schema, &mut invalid)?;
780        for value in reader {
781            assert!(value.is_err());
782        }
783
784        Ok(())
785    }
786
787    #[test]
788    fn test_reader_empty_buffer() -> TestResult {
789        let empty = Cursor::new(Vec::new());
790        assert!(Reader::new(empty).is_err());
791
792        Ok(())
793    }
794
795    #[test]
796    fn test_reader_only_header() -> TestResult {
797        let mut invalid = &ENCODED[..165];
798        let reader = Reader::new(&mut invalid)?;
799        for value in reader {
800            assert!(value.is_err());
801        }
802
803        Ok(())
804    }
805
806    #[test]
807    fn test_avro_3405_read_user_metadata_success() -> TestResult {
808        use crate::writer::Writer;
809
810        let schema = Schema::parse_str(SCHEMA)?;
811        let mut writer = Writer::new(&schema, Vec::new())?;
812
813        let mut user_meta_data: HashMap<String, Vec<u8>> = HashMap::new();
814        user_meta_data.insert(
815            "stringKey".to_string(),
816            "stringValue".to_string().into_bytes(),
817        );
818        user_meta_data.insert("bytesKey".to_string(), b"bytesValue".to_vec());
819        user_meta_data.insert("vecKey".to_string(), vec![1, 2, 3]);
820
821        for (k, v) in user_meta_data.iter() {
822            writer.add_user_metadata(k.to_string(), v)?;
823        }
824
825        let mut record = Record::new(&schema).unwrap();
826        record.put("a", 27i64);
827        record.put("b", "foo");
828
829        writer.append(record.clone())?;
830        writer.append(record.clone())?;
831        writer.flush()?;
832        let result = writer.into_inner()?;
833
834        let reader = Reader::new(&result[..])?;
835        assert_eq!(reader.user_metadata(), &user_meta_data);
836
837        Ok(())
838    }
839
840    #[derive(Deserialize, Clone, PartialEq, Debug)]
841    struct TestSingleObjectReader {
842        a: i64,
843        b: f64,
844        c: Vec<String>,
845    }
846
847    impl AvroSchema for TestSingleObjectReader {
848        fn get_schema() -> Schema {
849            let schema = r#"
850            {
851                "type":"record",
852                "name":"TestSingleObjectWrtierSerialize",
853                "fields":[
854                    {
855                        "name":"a",
856                        "type":"long"
857                    },
858                    {
859                        "name":"b",
860                        "type":"double"
861                    },
862                    {
863                        "name":"c",
864                        "type":{
865                            "type":"array",
866                            "items":"string"
867                        }
868                    }
869                ]
870            }
871            "#;
872            Schema::parse_str(schema).unwrap()
873        }
874    }
875
876    impl From<Value> for TestSingleObjectReader {
877        fn from(obj: Value) -> TestSingleObjectReader {
878            if let Value::Record(fields) = obj {
879                let mut a = None;
880                let mut b = None;
881                let mut c = vec![];
882                for (field_name, v) in fields {
883                    match (field_name.as_str(), v) {
884                        ("a", Value::Long(i)) => a = Some(i),
885                        ("b", Value::Double(d)) => b = Some(d),
886                        ("c", Value::Array(v)) => {
887                            for inner_val in v {
888                                if let Value::String(s) = inner_val {
889                                    c.push(s);
890                                }
891                            }
892                        }
893                        (key, value) => panic!("Unexpected pair: {key:?} -> {value:?}"),
894                    }
895                }
896                TestSingleObjectReader {
897                    a: a.unwrap(),
898                    b: b.unwrap(),
899                    c,
900                }
901            } else {
902                panic!("Expected a Value::Record but was {obj:?}")
903            }
904        }
905    }
906
907    impl From<TestSingleObjectReader> for Value {
908        fn from(obj: TestSingleObjectReader) -> Value {
909            Value::Record(vec![
910                ("a".into(), obj.a.into()),
911                ("b".into(), obj.b.into()),
912                (
913                    "c".into(),
914                    Value::Array(obj.c.into_iter().map(|s| s.into()).collect()),
915                ),
916            ])
917        }
918    }
919
920    #[test]
921    fn test_avro_3507_single_object_reader() -> TestResult {
922        let obj = TestSingleObjectReader {
923            a: 42,
924            b: 3.33,
925            c: vec!["cat".into(), "dog".into()],
926        };
927        let mut to_read = Vec::<u8>::new();
928        to_read.extend_from_slice(&[0xC3, 0x01]);
929        to_read.extend_from_slice(
930            &TestSingleObjectReader::get_schema()
931                .fingerprint::<Rabin>()
932                .bytes[..],
933        );
934        encode(
935            &obj.clone().into(),
936            &TestSingleObjectReader::get_schema(),
937            &mut to_read,
938        )
939        .expect("Encode should succeed");
940        let mut to_read = &to_read[..];
941        let generic_reader = GenericSingleObjectReader::new(TestSingleObjectReader::get_schema())
942            .expect("Schema should resolve");
943        let val = generic_reader
944            .read_value(&mut to_read)
945            .expect("Should read");
946        let expected_value: Value = obj.into();
947        assert_eq!(expected_value, val);
948
949        Ok(())
950    }
951
952    #[test]
953    fn avro_3642_test_single_object_reader_incomplete_reads() -> TestResult {
954        let obj = TestSingleObjectReader {
955            a: 42,
956            b: 3.33,
957            c: vec!["cat".into(), "dog".into()],
958        };
959        // The two-byte marker, to show that the message uses this single-record format
960        let to_read_1 = [0xC3, 0x01];
961        let mut to_read_2 = Vec::<u8>::new();
962        to_read_2.extend_from_slice(
963            &TestSingleObjectReader::get_schema()
964                .fingerprint::<Rabin>()
965                .bytes[..],
966        );
967        let mut to_read_3 = Vec::<u8>::new();
968        encode(
969            &obj.clone().into(),
970            &TestSingleObjectReader::get_schema(),
971            &mut to_read_3,
972        )
973        .expect("Encode should succeed");
974        let mut to_read = (&to_read_1[..]).chain(&to_read_2[..]).chain(&to_read_3[..]);
975        let generic_reader = GenericSingleObjectReader::new(TestSingleObjectReader::get_schema())
976            .expect("Schema should resolve");
977        let val = generic_reader
978            .read_value(&mut to_read)
979            .expect("Should read");
980        let expected_value: Value = obj.into();
981        assert_eq!(expected_value, val);
982
983        Ok(())
984    }
985
986    #[test]
987    fn test_avro_3507_reader_parity() -> TestResult {
988        let obj = TestSingleObjectReader {
989            a: 42,
990            b: 3.33,
991            c: vec!["cat".into(), "dog".into()],
992        };
993
994        let mut to_read = Vec::<u8>::new();
995        to_read.extend_from_slice(&[0xC3, 0x01]);
996        to_read.extend_from_slice(
997            &TestSingleObjectReader::get_schema()
998                .fingerprint::<Rabin>()
999                .bytes[..],
1000        );
1001        encode(
1002            &obj.clone().into(),
1003            &TestSingleObjectReader::get_schema(),
1004            &mut to_read,
1005        )
1006        .expect("Encode should succeed");
1007        let generic_reader = GenericSingleObjectReader::new(TestSingleObjectReader::get_schema())
1008            .expect("Schema should resolve");
1009        let specific_reader = SpecificSingleObjectReader::<TestSingleObjectReader>::new()
1010            .expect("schema should resolve");
1011        let mut to_read1 = &to_read[..];
1012        let mut to_read2 = &to_read[..];
1013        let mut to_read3 = &to_read[..];
1014
1015        let val = generic_reader
1016            .read_value(&mut to_read1)
1017            .expect("Should read");
1018        let read_obj1 = specific_reader
1019            .read_from_value(&mut to_read2)
1020            .expect("Should read from value");
1021        let read_obj2 = specific_reader
1022            .read(&mut to_read3)
1023            .expect("Should read from deserilize");
1024        let expected_value: Value = obj.clone().into();
1025        assert_eq!(obj, read_obj1);
1026        assert_eq!(obj, read_obj2);
1027        assert_eq!(val, expected_value);
1028
1029        Ok(())
1030    }
1031
1032    #[test]
1033    fn avro_rs_164_generic_reader_alternate_header() -> TestResult {
1034        let schema_uuid = Uuid::parse_str("b2f1cf00-0434-013e-439a-125eb8485a5f")?;
1035        let header_builder = GlueSchemaUuidHeader::from_uuid(schema_uuid);
1036        let generic_reader = GenericSingleObjectReader::new_with_header_builder(
1037            TestSingleObjectReader::get_schema(),
1038            header_builder,
1039        )
1040        .expect("failed to build reader");
1041        let data_to_read: Vec<u8> = vec![
1042            3, 0, 178, 241, 207, 0, 4, 52, 1, 62, 67, 154, 18, 94, 184, 72, 90, 95,
1043        ];
1044        let mut to_read = &data_to_read[..];
1045        let read_result = generic_reader
1046            .read_value(&mut to_read)
1047            .map_err(Error::into_details);
1048        matches!(read_result, Err(Details::ReadBytes(_)));
1049        Ok(())
1050    }
1051
1052    #[cfg(not(feature = "snappy"))]
1053    #[test]
1054    fn test_avro_3549_read_not_enabled_codec() {
1055        let snappy_compressed_avro = vec![
1056            79, 98, 106, 1, 4, 22, 97, 118, 114, 111, 46, 115, 99, 104, 101, 109, 97, 210, 1, 123,
1057            34, 102, 105, 101, 108, 100, 115, 34, 58, 91, 123, 34, 110, 97, 109, 101, 34, 58, 34,
1058            110, 117, 109, 34, 44, 34, 116, 121, 112, 101, 34, 58, 34, 115, 116, 114, 105, 110,
1059            103, 34, 125, 93, 44, 34, 110, 97, 109, 101, 34, 58, 34, 101, 118, 101, 110, 116, 34,
1060            44, 34, 110, 97, 109, 101, 115, 112, 97, 99, 101, 34, 58, 34, 101, 120, 97, 109, 112,
1061            108, 101, 110, 97, 109, 101, 115, 112, 97, 99, 101, 34, 44, 34, 116, 121, 112, 101, 34,
1062            58, 34, 114, 101, 99, 111, 114, 100, 34, 125, 20, 97, 118, 114, 111, 46, 99, 111, 100,
1063            101, 99, 12, 115, 110, 97, 112, 112, 121, 0, 213, 209, 241, 208, 200, 110, 164, 47,
1064            203, 25, 90, 235, 161, 167, 195, 177, 2, 20, 4, 12, 6, 49, 50, 51, 115, 38, 58, 0, 213,
1065            209, 241, 208, 200, 110, 164, 47, 203, 25, 90, 235, 161, 167, 195, 177,
1066        ];
1067
1068        if let Err(err) = Reader::new(snappy_compressed_avro.as_slice()) {
1069            assert_eq!("Codec 'snappy' is not supported/enabled", err.to_string());
1070        } else {
1071            panic!("Expected an error in the reading of the codec!");
1072        }
1073    }
1074}