apache_avro/
reader.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Logic handling reading from Avro format at user level.
19use crate::{
20    AvroResult, Codec, Error,
21    decode::{decode, decode_internal},
22    error::Details,
23    from_value,
24    headers::{HeaderBuilder, RabinFingerprintHeader},
25    schema::{
26        AvroSchema, Names, ResolvedOwnedSchema, ResolvedSchema, Schema, resolve_names,
27        resolve_names_with_schemata,
28    },
29    types::Value,
30    util,
31};
32use log::warn;
33use serde::de::DeserializeOwned;
34use serde_json::from_slice;
35use std::{
36    collections::HashMap,
37    io::{ErrorKind, Read},
38    marker::PhantomData,
39    str::FromStr,
40};
41
42/// Internal Block reader.
43#[derive(Debug, Clone)]
44struct Block<'r, R> {
45    reader: R,
46    /// Internal buffering to reduce allocation.
47    buf: Vec<u8>,
48    buf_idx: usize,
49    /// Number of elements expected to exist within this block.
50    message_count: usize,
51    marker: [u8; 16],
52    codec: Codec,
53    writer_schema: Schema,
54    schemata: Vec<&'r Schema>,
55    user_metadata: HashMap<String, Vec<u8>>,
56    names_refs: Names,
57}
58
59impl<'r, R: Read> Block<'r, R> {
60    fn new(reader: R, schemata: Vec<&'r Schema>) -> AvroResult<Block<'r, R>> {
61        let mut block = Block {
62            reader,
63            codec: Codec::Null,
64            writer_schema: Schema::Null,
65            schemata,
66            buf: vec![],
67            buf_idx: 0,
68            message_count: 0,
69            marker: [0; 16],
70            user_metadata: Default::default(),
71            names_refs: Default::default(),
72        };
73
74        block.read_header()?;
75        Ok(block)
76    }
77
78    /// Try to read the header and to set the writer `Schema`, the `Codec` and the marker based on
79    /// its content.
80    fn read_header(&mut self) -> AvroResult<()> {
81        let mut buf = [0u8; 4];
82        self.reader
83            .read_exact(&mut buf)
84            .map_err(Details::ReadHeader)?;
85
86        if buf != [b'O', b'b', b'j', 1u8] {
87            return Err(Details::HeaderMagic.into());
88        }
89
90        let meta_schema = Schema::map(Schema::Bytes);
91        match decode(&meta_schema, &mut self.reader)? {
92            Value::Map(metadata) => {
93                self.read_writer_schema(&metadata)?;
94                self.codec = read_codec(&metadata)?;
95
96                for (key, value) in metadata {
97                    if key == "avro.schema"
98                        || key == "avro.codec"
99                        || key == "avro.codec.compression_level"
100                    {
101                        // already processed
102                    } else if key.starts_with("avro.") {
103                        warn!("Ignoring unknown metadata key: {key}");
104                    } else {
105                        self.read_user_metadata(key, value);
106                    }
107                }
108            }
109            _ => {
110                return Err(Details::GetHeaderMetadata.into());
111            }
112        }
113
114        self.reader
115            .read_exact(&mut self.marker)
116            .map_err(|e| Details::ReadMarker(e).into())
117    }
118
119    fn fill_buf(&mut self, n: usize) -> AvroResult<()> {
120        // The buffer needs to contain exactly `n` elements, otherwise codecs will potentially read
121        // invalid bytes.
122        //
123        // The are two cases to handle here:
124        //
125        // 1. `n > self.buf.len()`:
126        //    In this case we call `Vec::resize`, which guarantees that `self.buf.len() == n`.
127        // 2. `n < self.buf.len()`:
128        //    We need to resize to ensure that the buffer len is safe to read `n` elements.
129        //
130        // TODO: Figure out a way to avoid having to truncate for the second case.
131        self.buf.resize(util::safe_len(n)?, 0);
132        self.reader
133            .read_exact(&mut self.buf)
134            .map_err(Details::ReadIntoBuf)?;
135        self.buf_idx = 0;
136        Ok(())
137    }
138
139    /// Try to read a data block, also performing schema resolution for the objects contained in
140    /// the block. The objects are stored in an internal buffer to the `Reader`.
141    fn read_block_next(&mut self) -> AvroResult<()> {
142        assert!(self.is_empty(), "Expected self to be empty!");
143        match util::read_long(&mut self.reader).map_err(Error::into_details) {
144            Ok(block_len) => {
145                self.message_count = block_len as usize;
146                let block_bytes = util::read_long(&mut self.reader)?;
147                self.fill_buf(block_bytes as usize)?;
148                let mut marker = [0u8; 16];
149                self.reader
150                    .read_exact(&mut marker)
151                    .map_err(Details::ReadBlockMarker)?;
152
153                if marker != self.marker {
154                    return Err(Details::GetBlockMarker.into());
155                }
156
157                // NOTE (JAB): This doesn't fit this Reader pattern very well.
158                // `self.buf` is a growable buffer that is reused as the reader is iterated.
159                // For non `Codec::Null` variants, `decompress` will allocate a new `Vec`
160                // and replace `buf` with the new one, instead of reusing the same buffer.
161                // We can address this by using some "limited read" type to decode directly
162                // into the buffer. But this is fine, for now.
163                self.codec.decompress(&mut self.buf)
164            }
165            Err(Details::ReadVariableIntegerBytes(io_err)) => {
166                if let ErrorKind::UnexpectedEof = io_err.kind() {
167                    // to not return any error in case we only finished to read cleanly from the stream
168                    Ok(())
169                } else {
170                    Err(Details::ReadVariableIntegerBytes(io_err).into())
171                }
172            }
173            Err(e) => Err(Error::new(e)),
174        }
175    }
176
177    fn len(&self) -> usize {
178        self.message_count
179    }
180
181    fn is_empty(&self) -> bool {
182        self.len() == 0
183    }
184
185    fn read_next(&mut self, read_schema: Option<&Schema>) -> AvroResult<Option<Value>> {
186        if self.is_empty() {
187            self.read_block_next()?;
188            if self.is_empty() {
189                return Ok(None);
190            }
191        }
192
193        let mut block_bytes = &self.buf[self.buf_idx..];
194        let b_original = block_bytes.len();
195
196        let item = decode_internal(
197            &self.writer_schema,
198            &self.names_refs,
199            &None,
200            &mut block_bytes,
201        )?;
202        let item = match read_schema {
203            Some(schema) => item.resolve(schema)?,
204            None => item,
205        };
206
207        if b_original != 0 && b_original == block_bytes.len() {
208            // from_avro_datum did not consume any bytes, so return an error to avoid an infinite loop
209            return Err(Details::ReadBlock.into());
210        }
211        self.buf_idx += b_original - block_bytes.len();
212        self.message_count -= 1;
213        Ok(Some(item))
214    }
215
216    fn read_writer_schema(&mut self, metadata: &HashMap<String, Value>) -> AvroResult<()> {
217        let json: serde_json::Value = metadata
218            .get("avro.schema")
219            .and_then(|bytes| {
220                if let Value::Bytes(ref bytes) = *bytes {
221                    from_slice(bytes.as_ref()).ok()
222                } else {
223                    None
224                }
225            })
226            .ok_or(Details::GetAvroSchemaFromMap)?;
227        if !self.schemata.is_empty() {
228            let rs = ResolvedSchema::try_from(self.schemata.clone())?;
229            let names: Names = rs
230                .get_names()
231                .iter()
232                .map(|(name, schema)| (name.clone(), (*schema).clone()))
233                .collect();
234            self.writer_schema = Schema::parse_with_names(&json, names)?;
235            resolve_names_with_schemata(
236                self.schemata.iter().copied(),
237                &mut self.names_refs,
238                &None,
239            )?;
240        } else {
241            self.writer_schema = Schema::parse(&json)?;
242            resolve_names(&self.writer_schema, &mut self.names_refs, &None)?;
243        }
244        Ok(())
245    }
246
247    fn read_user_metadata(&mut self, key: String, value: Value) {
248        match value {
249            Value::Bytes(ref vec) => {
250                self.user_metadata.insert(key, vec.clone());
251            }
252            wrong => {
253                warn!("User metadata values must be Value::Bytes, found {wrong:?}");
254            }
255        }
256    }
257}
258
259fn read_codec(metadata: &HashMap<String, Value>) -> AvroResult<Codec> {
260    let result = metadata
261        .get("avro.codec")
262        .map(|codec| {
263            if let Value::Bytes(ref bytes) = *codec {
264                match std::str::from_utf8(bytes.as_ref()) {
265                    Ok(utf8) => Ok(utf8),
266                    Err(utf8_error) => Err(Details::ConvertToUtf8Error(utf8_error).into()),
267                }
268            } else {
269                Err(Details::BadCodecMetadata.into())
270            }
271        })
272        .map(|codec_res| match codec_res {
273            Ok(codec) => match Codec::from_str(codec) {
274                Ok(codec) => match codec {
275                    #[cfg(feature = "bzip")]
276                    Codec::Bzip2(_) => {
277                        use crate::Bzip2Settings;
278                        if let Some(Value::Bytes(bytes)) =
279                            metadata.get("avro.codec.compression_level")
280                        {
281                            Ok(Codec::Bzip2(Bzip2Settings::new(bytes[0])))
282                        } else {
283                            Ok(codec)
284                        }
285                    }
286                    #[cfg(feature = "xz")]
287                    Codec::Xz(_) => {
288                        use crate::XzSettings;
289                        if let Some(Value::Bytes(bytes)) =
290                            metadata.get("avro.codec.compression_level")
291                        {
292                            Ok(Codec::Xz(XzSettings::new(bytes[0])))
293                        } else {
294                            Ok(codec)
295                        }
296                    }
297                    #[cfg(feature = "zstandard")]
298                    Codec::Zstandard(_) => {
299                        use crate::ZstandardSettings;
300                        if let Some(Value::Bytes(bytes)) =
301                            metadata.get("avro.codec.compression_level")
302                        {
303                            Ok(Codec::Zstandard(ZstandardSettings::new(bytes[0])))
304                        } else {
305                            Ok(codec)
306                        }
307                    }
308                    _ => Ok(codec),
309                },
310                Err(_) => Err(Details::CodecNotSupported(codec.to_owned()).into()),
311            },
312            Err(err) => Err(err),
313        });
314
315    result.unwrap_or(Ok(Codec::Null))
316}
317
318/// Main interface for reading Avro formatted values.
319///
320/// To be used as an iterator:
321///
322/// ```no_run
323/// # use apache_avro::Reader;
324/// # use std::io::Cursor;
325/// # let input = Cursor::new(Vec::<u8>::new());
326/// for value in Reader::new(input).unwrap() {
327///     match value {
328///         Ok(v) => println!("{:?}", v),
329///         Err(e) => println!("Error: {}", e),
330///     };
331/// }
332/// ```
333pub struct Reader<'a, R> {
334    block: Block<'a, R>,
335    reader_schema: Option<&'a Schema>,
336    errored: bool,
337    should_resolve_schema: bool,
338}
339
340impl<'a, R: Read> Reader<'a, R> {
341    /// Creates a `Reader` given something implementing the `io::Read` trait to read from.
342    /// No reader `Schema` will be set.
343    ///
344    /// **NOTE** The avro header is going to be read automatically upon creation of the `Reader`.
345    pub fn new(reader: R) -> AvroResult<Reader<'a, R>> {
346        let block = Block::new(reader, vec![])?;
347        let reader = Reader {
348            block,
349            reader_schema: None,
350            errored: false,
351            should_resolve_schema: false,
352        };
353        Ok(reader)
354    }
355
356    /// Creates a `Reader` given a reader `Schema` and something implementing the `io::Read` trait
357    /// to read from.
358    ///
359    /// **NOTE** The avro header is going to be read automatically upon creation of the `Reader`.
360    pub fn with_schema(schema: &'a Schema, reader: R) -> AvroResult<Reader<'a, R>> {
361        let block = Block::new(reader, vec![schema])?;
362        let mut reader = Reader {
363            block,
364            reader_schema: Some(schema),
365            errored: false,
366            should_resolve_schema: false,
367        };
368        // Check if the reader and writer schemas disagree.
369        reader.should_resolve_schema = reader.writer_schema() != schema;
370        Ok(reader)
371    }
372
373    /// Creates a `Reader` given a reader `Schema` and something implementing the `io::Read` trait
374    /// to read from.
375    ///
376    /// **NOTE** The avro header is going to be read automatically upon creation of the `Reader`.
377    pub fn with_schemata(
378        schema: &'a Schema,
379        schemata: Vec<&'a Schema>,
380        reader: R,
381    ) -> AvroResult<Reader<'a, R>> {
382        let block = Block::new(reader, schemata)?;
383        let mut reader = Reader {
384            block,
385            reader_schema: Some(schema),
386            errored: false,
387            should_resolve_schema: false,
388        };
389        // Check if the reader and writer schemas disagree.
390        reader.should_resolve_schema = reader.writer_schema() != schema;
391        Ok(reader)
392    }
393
394    /// Get a reference to the writer `Schema`.
395    #[inline]
396    pub fn writer_schema(&self) -> &Schema {
397        &self.block.writer_schema
398    }
399
400    /// Get a reference to the optional reader `Schema`.
401    #[inline]
402    pub fn reader_schema(&self) -> Option<&Schema> {
403        self.reader_schema
404    }
405
406    /// Get a reference to the user metadata
407    #[inline]
408    pub fn user_metadata(&self) -> &HashMap<String, Vec<u8>> {
409        &self.block.user_metadata
410    }
411
412    #[inline]
413    fn read_next(&mut self) -> AvroResult<Option<Value>> {
414        let read_schema = if self.should_resolve_schema {
415            self.reader_schema
416        } else {
417            None
418        };
419
420        self.block.read_next(read_schema)
421    }
422}
423
424impl<R: Read> Iterator for Reader<'_, R> {
425    type Item = AvroResult<Value>;
426
427    fn next(&mut self) -> Option<Self::Item> {
428        // to prevent keep on reading after the first error occurs
429        if self.errored {
430            return None;
431        };
432        match self.read_next() {
433            Ok(opt) => opt.map(Ok),
434            Err(e) => {
435                self.errored = true;
436                Some(Err(e))
437            }
438        }
439    }
440}
441
442/// Decode a `Value` encoded in Avro format given its `Schema` and anything implementing `io::Read`
443/// to read from.
444///
445/// In case a reader `Schema` is provided, schema resolution will also be performed.
446///
447/// **NOTE** This function has a quite small niche of usage and does NOT take care of reading the
448/// header and consecutive data blocks; use [`Reader`](struct.Reader.html) if you don't know what
449/// you are doing, instead.
450pub fn from_avro_datum<R: Read>(
451    writer_schema: &Schema,
452    reader: &mut R,
453    reader_schema: Option<&Schema>,
454) -> AvroResult<Value> {
455    let value = decode(writer_schema, reader)?;
456    match reader_schema {
457        Some(schema) => value.resolve(schema),
458        None => Ok(value),
459    }
460}
461
462/// Decode a `Value` encoded in Avro format given the provided `Schema` and anything implementing `io::Read`
463/// to read from.
464/// If the writer schema is incomplete, i.e. contains `Schema::Ref`s then it will use the provided
465/// schemata to resolve any dependencies.
466///
467/// In case a reader `Schema` is provided, schema resolution will also be performed.
468pub fn from_avro_datum_schemata<R: Read>(
469    writer_schema: &Schema,
470    writer_schemata: Vec<&Schema>,
471    reader: &mut R,
472    reader_schema: Option<&Schema>,
473) -> AvroResult<Value> {
474    from_avro_datum_reader_schemata(
475        writer_schema,
476        writer_schemata,
477        reader,
478        reader_schema,
479        Vec::with_capacity(0),
480    )
481}
482
483/// Decode a `Value` encoded in Avro format given the provided `Schema` and anything implementing `io::Read`
484/// to read from.
485/// If the writer schema is incomplete, i.e. contains `Schema::Ref`s then it will use the provided
486/// schemata to resolve any dependencies.
487///
488/// In case a reader `Schema` is provided, schema resolution will also be performed.
489pub fn from_avro_datum_reader_schemata<R: Read>(
490    writer_schema: &Schema,
491    writer_schemata: Vec<&Schema>,
492    reader: &mut R,
493    reader_schema: Option<&Schema>,
494    reader_schemata: Vec<&Schema>,
495) -> AvroResult<Value> {
496    let rs = ResolvedSchema::try_from(writer_schemata)?;
497    let value = decode_internal(writer_schema, rs.get_names(), &None, reader)?;
498    match reader_schema {
499        Some(schema) => {
500            if reader_schemata.is_empty() {
501                value.resolve(schema)
502            } else {
503                value.resolve_schemata(schema, reader_schemata)
504            }
505        }
506        None => Ok(value),
507    }
508}
509
510pub struct GenericSingleObjectReader {
511    write_schema: ResolvedOwnedSchema,
512    expected_header: Vec<u8>,
513}
514
515impl GenericSingleObjectReader {
516    pub fn new(schema: Schema) -> AvroResult<GenericSingleObjectReader> {
517        let header_builder = RabinFingerprintHeader::from_schema(&schema);
518        Self::new_with_header_builder(schema, header_builder)
519    }
520
521    pub fn new_with_header_builder<HB: HeaderBuilder>(
522        schema: Schema,
523        header_builder: HB,
524    ) -> AvroResult<GenericSingleObjectReader> {
525        let expected_header = header_builder.build_header();
526        Ok(GenericSingleObjectReader {
527            write_schema: ResolvedOwnedSchema::try_from(schema)?,
528            expected_header,
529        })
530    }
531
532    pub fn read_value<R: Read>(&self, reader: &mut R) -> AvroResult<Value> {
533        let mut header = vec![0; self.expected_header.len()];
534        match reader.read_exact(&mut header) {
535            Ok(_) => {
536                if self.expected_header == header {
537                    decode_internal(
538                        self.write_schema.get_root_schema(),
539                        self.write_schema.get_names(),
540                        &None,
541                        reader,
542                    )
543                } else {
544                    Err(
545                        Details::SingleObjectHeaderMismatch(self.expected_header.clone(), header)
546                            .into(),
547                    )
548                }
549            }
550            Err(io_error) => Err(Details::ReadHeader(io_error).into()),
551        }
552    }
553}
554
555pub struct SpecificSingleObjectReader<T>
556where
557    T: AvroSchema,
558{
559    inner: GenericSingleObjectReader,
560    _model: PhantomData<T>,
561}
562
563impl<T> SpecificSingleObjectReader<T>
564where
565    T: AvroSchema,
566{
567    pub fn new() -> AvroResult<SpecificSingleObjectReader<T>> {
568        Ok(SpecificSingleObjectReader {
569            inner: GenericSingleObjectReader::new(T::get_schema())?,
570            _model: PhantomData,
571        })
572    }
573}
574
575impl<T> SpecificSingleObjectReader<T>
576where
577    T: AvroSchema + From<Value>,
578{
579    pub fn read_from_value<R: Read>(&self, reader: &mut R) -> AvroResult<T> {
580        self.inner.read_value(reader).map(|v| v.into())
581    }
582}
583
584impl<T> SpecificSingleObjectReader<T>
585where
586    T: AvroSchema + DeserializeOwned,
587{
588    pub fn read<R: Read>(&self, reader: &mut R) -> AvroResult<T> {
589        from_value::<T>(&self.inner.read_value(reader)?)
590    }
591}
592
593/// Reads the marker bytes from Avro bytes generated earlier by a `Writer`
594pub fn read_marker(bytes: &[u8]) -> [u8; 16] {
595    assert!(
596        bytes.len() > 16,
597        "The bytes are too short to read a marker from them"
598    );
599    let mut marker = [0_u8; 16];
600    marker.clone_from_slice(&bytes[(bytes.len() - 16)..]);
601    marker
602}
603
604#[cfg(test)]
605mod tests {
606    use super::*;
607    use crate::{encode::encode, headers::GlueSchemaUuidHeader, rabin::Rabin, types::Record};
608    use apache_avro_test_helper::TestResult;
609    use pretty_assertions::assert_eq;
610    use serde::Deserialize;
611    use std::io::Cursor;
612    use uuid::Uuid;
613
614    const SCHEMA: &str = r#"
615    {
616      "type": "record",
617      "name": "test",
618      "fields": [
619        {
620          "name": "a",
621          "type": "long",
622          "default": 42
623        },
624        {
625          "name": "b",
626          "type": "string"
627        }
628      ]
629    }
630    "#;
631    const UNION_SCHEMA: &str = r#"["null", "long"]"#;
632    const ENCODED: &[u8] = &[
633        79u8, 98u8, 106u8, 1u8, 4u8, 22u8, 97u8, 118u8, 114u8, 111u8, 46u8, 115u8, 99u8, 104u8,
634        101u8, 109u8, 97u8, 222u8, 1u8, 123u8, 34u8, 116u8, 121u8, 112u8, 101u8, 34u8, 58u8, 34u8,
635        114u8, 101u8, 99u8, 111u8, 114u8, 100u8, 34u8, 44u8, 34u8, 110u8, 97u8, 109u8, 101u8, 34u8,
636        58u8, 34u8, 116u8, 101u8, 115u8, 116u8, 34u8, 44u8, 34u8, 102u8, 105u8, 101u8, 108u8,
637        100u8, 115u8, 34u8, 58u8, 91u8, 123u8, 34u8, 110u8, 97u8, 109u8, 101u8, 34u8, 58u8, 34u8,
638        97u8, 34u8, 44u8, 34u8, 116u8, 121u8, 112u8, 101u8, 34u8, 58u8, 34u8, 108u8, 111u8, 110u8,
639        103u8, 34u8, 44u8, 34u8, 100u8, 101u8, 102u8, 97u8, 117u8, 108u8, 116u8, 34u8, 58u8, 52u8,
640        50u8, 125u8, 44u8, 123u8, 34u8, 110u8, 97u8, 109u8, 101u8, 34u8, 58u8, 34u8, 98u8, 34u8,
641        44u8, 34u8, 116u8, 121u8, 112u8, 101u8, 34u8, 58u8, 34u8, 115u8, 116u8, 114u8, 105u8,
642        110u8, 103u8, 34u8, 125u8, 93u8, 125u8, 20u8, 97u8, 118u8, 114u8, 111u8, 46u8, 99u8, 111u8,
643        100u8, 101u8, 99u8, 8u8, 110u8, 117u8, 108u8, 108u8, 0u8, 94u8, 61u8, 54u8, 221u8, 190u8,
644        207u8, 108u8, 180u8, 158u8, 57u8, 114u8, 40u8, 173u8, 199u8, 228u8, 239u8, 4u8, 20u8, 54u8,
645        6u8, 102u8, 111u8, 111u8, 84u8, 6u8, 98u8, 97u8, 114u8, 94u8, 61u8, 54u8, 221u8, 190u8,
646        207u8, 108u8, 180u8, 158u8, 57u8, 114u8, 40u8, 173u8, 199u8, 228u8, 239u8,
647    ];
648
649    #[test]
650    fn test_from_avro_datum() -> TestResult {
651        let schema = Schema::parse_str(SCHEMA)?;
652        let mut encoded: &'static [u8] = &[54, 6, 102, 111, 111];
653
654        let mut record = Record::new(&schema).unwrap();
655        record.put("a", 27i64);
656        record.put("b", "foo");
657        let expected = record.into();
658
659        assert_eq!(from_avro_datum(&schema, &mut encoded, None)?, expected);
660
661        Ok(())
662    }
663
664    #[test]
665    fn test_from_avro_datum_with_union_to_struct() -> TestResult {
666        const TEST_RECORD_SCHEMA_3240: &str = r#"
667    {
668      "type": "record",
669      "name": "test",
670      "fields": [
671        {
672          "name": "a",
673          "type": "long",
674          "default": 42
675        },
676        {
677          "name": "b",
678          "type": "string"
679        },
680        {
681            "name": "a_nullable_array",
682            "type": ["null", {"type": "array", "items": {"type": "string"}}],
683            "default": null
684        },
685        {
686            "name": "a_nullable_boolean",
687            "type": ["null", {"type": "boolean"}],
688            "default": null
689        },
690        {
691            "name": "a_nullable_string",
692            "type": ["null", {"type": "string"}],
693            "default": null
694        }
695      ]
696    }
697    "#;
698        #[derive(Default, Debug, Deserialize, PartialEq, Eq)]
699        struct TestRecord3240 {
700            a: i64,
701            b: String,
702            a_nullable_array: Option<Vec<String>>,
703            // we are missing the 'a_nullable_boolean' field to simulate missing keys
704            // a_nullable_boolean: Option<bool>,
705            a_nullable_string: Option<String>,
706        }
707
708        let schema = Schema::parse_str(TEST_RECORD_SCHEMA_3240)?;
709        let mut encoded: &'static [u8] = &[54, 6, 102, 111, 111];
710
711        let expected_record: TestRecord3240 = TestRecord3240 {
712            a: 27i64,
713            b: String::from("foo"),
714            a_nullable_array: None,
715            a_nullable_string: None,
716        };
717
718        let avro_datum = from_avro_datum(&schema, &mut encoded, None)?;
719        let parsed_record: TestRecord3240 = match &avro_datum {
720            Value::Record(_) => from_value::<TestRecord3240>(&avro_datum)?,
721            unexpected => {
722                panic!("could not map avro data to struct, found unexpected: {unexpected:?}")
723            }
724        };
725
726        assert_eq!(parsed_record, expected_record);
727
728        Ok(())
729    }
730
731    #[test]
732    fn test_null_union() -> TestResult {
733        let schema = Schema::parse_str(UNION_SCHEMA)?;
734        let mut encoded: &'static [u8] = &[2, 0];
735
736        assert_eq!(
737            from_avro_datum(&schema, &mut encoded, None)?,
738            Value::Union(1, Box::new(Value::Long(0)))
739        );
740
741        Ok(())
742    }
743
744    #[test]
745    fn test_reader_iterator() -> TestResult {
746        let schema = Schema::parse_str(SCHEMA)?;
747        let reader = Reader::with_schema(&schema, ENCODED)?;
748
749        let mut record1 = Record::new(&schema).unwrap();
750        record1.put("a", 27i64);
751        record1.put("b", "foo");
752
753        let mut record2 = Record::new(&schema).unwrap();
754        record2.put("a", 42i64);
755        record2.put("b", "bar");
756
757        let expected = [record1.into(), record2.into()];
758
759        for (i, value) in reader.enumerate() {
760            assert_eq!(value?, expected[i]);
761        }
762
763        Ok(())
764    }
765
766    #[test]
767    fn test_reader_invalid_header() -> TestResult {
768        let schema = Schema::parse_str(SCHEMA)?;
769        let invalid = ENCODED.iter().copied().skip(1).collect::<Vec<u8>>();
770        assert!(Reader::with_schema(&schema, &invalid[..]).is_err());
771
772        Ok(())
773    }
774
775    #[test]
776    fn test_reader_invalid_block() -> TestResult {
777        let schema = Schema::parse_str(SCHEMA)?;
778        let invalid = ENCODED
779            .iter()
780            .copied()
781            .rev()
782            .skip(19)
783            .collect::<Vec<u8>>()
784            .into_iter()
785            .rev()
786            .collect::<Vec<u8>>();
787        let reader = Reader::with_schema(&schema, &invalid[..])?;
788        for value in reader {
789            assert!(value.is_err());
790        }
791
792        Ok(())
793    }
794
795    #[test]
796    fn test_reader_empty_buffer() -> TestResult {
797        let empty = Cursor::new(Vec::new());
798        assert!(Reader::new(empty).is_err());
799
800        Ok(())
801    }
802
803    #[test]
804    fn test_reader_only_header() -> TestResult {
805        let invalid = ENCODED.iter().copied().take(165).collect::<Vec<u8>>();
806        let reader = Reader::new(&invalid[..])?;
807        for value in reader {
808            assert!(value.is_err());
809        }
810
811        Ok(())
812    }
813
814    #[test]
815    fn test_avro_3405_read_user_metadata_success() -> TestResult {
816        use crate::writer::Writer;
817
818        let schema = Schema::parse_str(SCHEMA)?;
819        let mut writer = Writer::new(&schema, Vec::new())?;
820
821        let mut user_meta_data: HashMap<String, Vec<u8>> = HashMap::new();
822        user_meta_data.insert(
823            "stringKey".to_string(),
824            "stringValue".to_string().into_bytes(),
825        );
826        user_meta_data.insert("bytesKey".to_string(), b"bytesValue".to_vec());
827        user_meta_data.insert("vecKey".to_string(), vec![1, 2, 3]);
828
829        for (k, v) in user_meta_data.iter() {
830            writer.add_user_metadata(k.to_string(), v)?;
831        }
832
833        let mut record = Record::new(&schema).unwrap();
834        record.put("a", 27i64);
835        record.put("b", "foo");
836
837        writer.append(record.clone())?;
838        writer.append(record.clone())?;
839        writer.flush()?;
840        let result = writer.into_inner()?;
841
842        let reader = Reader::new(&result[..])?;
843        assert_eq!(reader.user_metadata(), &user_meta_data);
844
845        Ok(())
846    }
847
848    #[derive(Deserialize, Clone, PartialEq, Debug)]
849    struct TestSingleObjectReader {
850        a: i64,
851        b: f64,
852        c: Vec<String>,
853    }
854
855    impl AvroSchema for TestSingleObjectReader {
856        fn get_schema() -> Schema {
857            let schema = r#"
858            {
859                "type":"record",
860                "name":"TestSingleObjectWrtierSerialize",
861                "fields":[
862                    {
863                        "name":"a",
864                        "type":"long"
865                    },
866                    {
867                        "name":"b",
868                        "type":"double"
869                    },
870                    {
871                        "name":"c",
872                        "type":{
873                            "type":"array",
874                            "items":"string"
875                        }
876                    }
877                ]
878            }
879            "#;
880            Schema::parse_str(schema).unwrap()
881        }
882    }
883
884    impl From<Value> for TestSingleObjectReader {
885        fn from(obj: Value) -> TestSingleObjectReader {
886            if let Value::Record(fields) = obj {
887                let mut a = None;
888                let mut b = None;
889                let mut c = vec![];
890                for (field_name, v) in fields {
891                    match (field_name.as_str(), v) {
892                        ("a", Value::Long(i)) => a = Some(i),
893                        ("b", Value::Double(d)) => b = Some(d),
894                        ("c", Value::Array(v)) => {
895                            for inner_val in v {
896                                if let Value::String(s) = inner_val {
897                                    c.push(s);
898                                }
899                            }
900                        }
901                        (key, value) => panic!("Unexpected pair: {key:?} -> {value:?}"),
902                    }
903                }
904                TestSingleObjectReader {
905                    a: a.unwrap(),
906                    b: b.unwrap(),
907                    c,
908                }
909            } else {
910                panic!("Expected a Value::Record but was {obj:?}")
911            }
912        }
913    }
914
915    impl From<TestSingleObjectReader> for Value {
916        fn from(obj: TestSingleObjectReader) -> Value {
917            Value::Record(vec![
918                ("a".into(), obj.a.into()),
919                ("b".into(), obj.b.into()),
920                (
921                    "c".into(),
922                    Value::Array(obj.c.into_iter().map(|s| s.into()).collect()),
923                ),
924            ])
925        }
926    }
927
928    #[test]
929    fn test_avro_3507_single_object_reader() -> TestResult {
930        let obj = TestSingleObjectReader {
931            a: 42,
932            b: 3.33,
933            c: vec!["cat".into(), "dog".into()],
934        };
935        let mut to_read = Vec::<u8>::new();
936        to_read.extend_from_slice(&[0xC3, 0x01]);
937        to_read.extend_from_slice(
938            &TestSingleObjectReader::get_schema()
939                .fingerprint::<Rabin>()
940                .bytes[..],
941        );
942        encode(
943            &obj.clone().into(),
944            &TestSingleObjectReader::get_schema(),
945            &mut to_read,
946        )
947        .expect("Encode should succeed");
948        let mut to_read = &to_read[..];
949        let generic_reader = GenericSingleObjectReader::new(TestSingleObjectReader::get_schema())
950            .expect("Schema should resolve");
951        let val = generic_reader
952            .read_value(&mut to_read)
953            .expect("Should read");
954        let expected_value: Value = obj.into();
955        assert_eq!(expected_value, val);
956
957        Ok(())
958    }
959
960    #[test]
961    fn avro_3642_test_single_object_reader_incomplete_reads() -> TestResult {
962        let obj = TestSingleObjectReader {
963            a: 42,
964            b: 3.33,
965            c: vec!["cat".into(), "dog".into()],
966        };
967        // The two-byte marker, to show that the message uses this single-record format
968        let to_read_1 = [0xC3, 0x01];
969        let mut to_read_2 = Vec::<u8>::new();
970        to_read_2.extend_from_slice(
971            &TestSingleObjectReader::get_schema()
972                .fingerprint::<Rabin>()
973                .bytes[..],
974        );
975        let mut to_read_3 = Vec::<u8>::new();
976        encode(
977            &obj.clone().into(),
978            &TestSingleObjectReader::get_schema(),
979            &mut to_read_3,
980        )
981        .expect("Encode should succeed");
982        let mut to_read = (&to_read_1[..]).chain(&to_read_2[..]).chain(&to_read_3[..]);
983        let generic_reader = GenericSingleObjectReader::new(TestSingleObjectReader::get_schema())
984            .expect("Schema should resolve");
985        let val = generic_reader
986            .read_value(&mut to_read)
987            .expect("Should read");
988        let expected_value: Value = obj.into();
989        assert_eq!(expected_value, val);
990
991        Ok(())
992    }
993
994    #[test]
995    fn test_avro_3507_reader_parity() -> TestResult {
996        let obj = TestSingleObjectReader {
997            a: 42,
998            b: 3.33,
999            c: vec!["cat".into(), "dog".into()],
1000        };
1001
1002        let mut to_read = Vec::<u8>::new();
1003        to_read.extend_from_slice(&[0xC3, 0x01]);
1004        to_read.extend_from_slice(
1005            &TestSingleObjectReader::get_schema()
1006                .fingerprint::<Rabin>()
1007                .bytes[..],
1008        );
1009        encode(
1010            &obj.clone().into(),
1011            &TestSingleObjectReader::get_schema(),
1012            &mut to_read,
1013        )
1014        .expect("Encode should succeed");
1015        let generic_reader = GenericSingleObjectReader::new(TestSingleObjectReader::get_schema())
1016            .expect("Schema should resolve");
1017        let specific_reader = SpecificSingleObjectReader::<TestSingleObjectReader>::new()
1018            .expect("schema should resolve");
1019        let mut to_read1 = &to_read[..];
1020        let mut to_read2 = &to_read[..];
1021        let mut to_read3 = &to_read[..];
1022
1023        let val = generic_reader
1024            .read_value(&mut to_read1)
1025            .expect("Should read");
1026        let read_obj1 = specific_reader
1027            .read_from_value(&mut to_read2)
1028            .expect("Should read from value");
1029        let read_obj2 = specific_reader
1030            .read(&mut to_read3)
1031            .expect("Should read from deserilize");
1032        let expected_value: Value = obj.clone().into();
1033        assert_eq!(obj, read_obj1);
1034        assert_eq!(obj, read_obj2);
1035        assert_eq!(val, expected_value);
1036
1037        Ok(())
1038    }
1039
1040    #[test]
1041    fn avro_rs_164_generic_reader_alternate_header() -> TestResult {
1042        let schema_uuid = Uuid::parse_str("b2f1cf00-0434-013e-439a-125eb8485a5f")?;
1043        let header_builder = GlueSchemaUuidHeader::from_uuid(schema_uuid);
1044        let generic_reader = GenericSingleObjectReader::new_with_header_builder(
1045            TestSingleObjectReader::get_schema(),
1046            header_builder,
1047        )
1048        .expect("failed to build reader");
1049        let data_to_read: Vec<u8> = vec![
1050            3, 0, 178, 241, 207, 0, 4, 52, 1, 62, 67, 154, 18, 94, 184, 72, 90, 95,
1051        ];
1052        let mut to_read = &data_to_read[..];
1053        let read_result = generic_reader
1054            .read_value(&mut to_read)
1055            .map_err(Error::into_details);
1056        matches!(read_result, Err(Details::ReadBytes(_)));
1057        Ok(())
1058    }
1059
1060    #[cfg(not(feature = "snappy"))]
1061    #[test]
1062    fn test_avro_3549_read_not_enabled_codec() {
1063        let snappy_compressed_avro = vec![
1064            79, 98, 106, 1, 4, 22, 97, 118, 114, 111, 46, 115, 99, 104, 101, 109, 97, 210, 1, 123,
1065            34, 102, 105, 101, 108, 100, 115, 34, 58, 91, 123, 34, 110, 97, 109, 101, 34, 58, 34,
1066            110, 117, 109, 34, 44, 34, 116, 121, 112, 101, 34, 58, 34, 115, 116, 114, 105, 110,
1067            103, 34, 125, 93, 44, 34, 110, 97, 109, 101, 34, 58, 34, 101, 118, 101, 110, 116, 34,
1068            44, 34, 110, 97, 109, 101, 115, 112, 97, 99, 101, 34, 58, 34, 101, 120, 97, 109, 112,
1069            108, 101, 110, 97, 109, 101, 115, 112, 97, 99, 101, 34, 44, 34, 116, 121, 112, 101, 34,
1070            58, 34, 114, 101, 99, 111, 114, 100, 34, 125, 20, 97, 118, 114, 111, 46, 99, 111, 100,
1071            101, 99, 12, 115, 110, 97, 112, 112, 121, 0, 213, 209, 241, 208, 200, 110, 164, 47,
1072            203, 25, 90, 235, 161, 167, 195, 177, 2, 20, 4, 12, 6, 49, 50, 51, 115, 38, 58, 0, 213,
1073            209, 241, 208, 200, 110, 164, 47, 203, 25, 90, 235, 161, 167, 195, 177,
1074        ];
1075
1076        if let Err(err) = Reader::new(snappy_compressed_avro.as_slice()) {
1077            assert_eq!("Codec 'snappy' is not supported/enabled", err.to_string());
1078        } else {
1079            panic!("Expected an error in the reading of the codec!");
1080        }
1081    }
1082}