1use crate::{
20 AvroResult, Codec, Error,
21 decode::{decode, decode_internal},
22 error::Details,
23 from_value,
24 headers::{HeaderBuilder, RabinFingerprintHeader},
25 schema::{
26 AvroSchema, Names, ResolvedOwnedSchema, ResolvedSchema, Schema, resolve_names,
27 resolve_names_with_schemata,
28 },
29 types::Value,
30 util,
31};
32use log::warn;
33use serde::de::DeserializeOwned;
34use serde_json::from_slice;
35use std::{
36 collections::HashMap,
37 io::{ErrorKind, Read},
38 marker::PhantomData,
39 str::FromStr,
40};
41
42#[derive(Debug, Clone)]
44struct Block<'r, R> {
45 reader: R,
46 buf: Vec<u8>,
48 buf_idx: usize,
49 message_count: usize,
51 marker: [u8; 16],
52 codec: Codec,
53 writer_schema: Schema,
54 schemata: Vec<&'r Schema>,
55 user_metadata: HashMap<String, Vec<u8>>,
56 names_refs: Names,
57}
58
59impl<'r, R: Read> Block<'r, R> {
60 fn new(reader: R, schemata: Vec<&'r Schema>) -> AvroResult<Block<'r, R>> {
61 let mut block = Block {
62 reader,
63 codec: Codec::Null,
64 writer_schema: Schema::Null,
65 schemata,
66 buf: vec![],
67 buf_idx: 0,
68 message_count: 0,
69 marker: [0; 16],
70 user_metadata: Default::default(),
71 names_refs: Default::default(),
72 };
73
74 block.read_header()?;
75 Ok(block)
76 }
77
78 fn read_header(&mut self) -> AvroResult<()> {
81 let mut buf = [0u8; 4];
82 self.reader
83 .read_exact(&mut buf)
84 .map_err(Details::ReadHeader)?;
85
86 if buf != [b'O', b'b', b'j', 1u8] {
87 return Err(Details::HeaderMagic.into());
88 }
89
90 let meta_schema = Schema::map(Schema::Bytes);
91 match decode(&meta_schema, &mut self.reader)? {
92 Value::Map(metadata) => {
93 self.read_writer_schema(&metadata)?;
94 self.codec = read_codec(&metadata)?;
95
96 for (key, value) in metadata {
97 if key == "avro.schema"
98 || key == "avro.codec"
99 || key == "avro.codec.compression_level"
100 {
101 } else if key.starts_with("avro.") {
103 warn!("Ignoring unknown metadata key: {key}");
104 } else {
105 self.read_user_metadata(key, value);
106 }
107 }
108 }
109 _ => {
110 return Err(Details::GetHeaderMetadata.into());
111 }
112 }
113
114 self.reader
115 .read_exact(&mut self.marker)
116 .map_err(|e| Details::ReadMarker(e).into())
117 }
118
119 fn fill_buf(&mut self, n: usize) -> AvroResult<()> {
120 self.buf.resize(util::safe_len(n)?, 0);
132 self.reader
133 .read_exact(&mut self.buf)
134 .map_err(Details::ReadIntoBuf)?;
135 self.buf_idx = 0;
136 Ok(())
137 }
138
139 fn read_block_next(&mut self) -> AvroResult<()> {
142 assert!(self.is_empty(), "Expected self to be empty!");
143 match util::read_long(&mut self.reader).map_err(Error::into_details) {
144 Ok(block_len) => {
145 self.message_count = block_len as usize;
146 let block_bytes = util::read_long(&mut self.reader)?;
147 self.fill_buf(block_bytes as usize)?;
148 let mut marker = [0u8; 16];
149 self.reader
150 .read_exact(&mut marker)
151 .map_err(Details::ReadBlockMarker)?;
152
153 if marker != self.marker {
154 return Err(Details::GetBlockMarker.into());
155 }
156
157 self.codec.decompress(&mut self.buf)
164 }
165 Err(Details::ReadVariableIntegerBytes(io_err)) => {
166 if let ErrorKind::UnexpectedEof = io_err.kind() {
167 Ok(())
169 } else {
170 Err(Details::ReadVariableIntegerBytes(io_err).into())
171 }
172 }
173 Err(e) => Err(Error::new(e)),
174 }
175 }
176
177 fn len(&self) -> usize {
178 self.message_count
179 }
180
181 fn is_empty(&self) -> bool {
182 self.len() == 0
183 }
184
185 fn read_next(&mut self, read_schema: Option<&Schema>) -> AvroResult<Option<Value>> {
186 if self.is_empty() {
187 self.read_block_next()?;
188 if self.is_empty() {
189 return Ok(None);
190 }
191 }
192
193 let mut block_bytes = &self.buf[self.buf_idx..];
194 let b_original = block_bytes.len();
195
196 let item = decode_internal(
197 &self.writer_schema,
198 &self.names_refs,
199 &None,
200 &mut block_bytes,
201 )?;
202 let item = match read_schema {
203 Some(schema) => item.resolve(schema)?,
204 None => item,
205 };
206
207 if b_original != 0 && b_original == block_bytes.len() {
208 return Err(Details::ReadBlock.into());
210 }
211 self.buf_idx += b_original - block_bytes.len();
212 self.message_count -= 1;
213 Ok(Some(item))
214 }
215
216 fn read_writer_schema(&mut self, metadata: &HashMap<String, Value>) -> AvroResult<()> {
217 let json: serde_json::Value = metadata
218 .get("avro.schema")
219 .and_then(|bytes| {
220 if let Value::Bytes(ref bytes) = *bytes {
221 from_slice(bytes.as_ref()).ok()
222 } else {
223 None
224 }
225 })
226 .ok_or(Details::GetAvroSchemaFromMap)?;
227 if !self.schemata.is_empty() {
228 let rs = ResolvedSchema::try_from(self.schemata.clone())?;
229 let names: Names = rs
230 .get_names()
231 .iter()
232 .map(|(name, schema)| (name.clone(), (*schema).clone()))
233 .collect();
234 self.writer_schema = Schema::parse_with_names(&json, names)?;
235 resolve_names_with_schemata(
236 self.schemata.iter().copied(),
237 &mut self.names_refs,
238 &None,
239 )?;
240 } else {
241 self.writer_schema = Schema::parse(&json)?;
242 resolve_names(&self.writer_schema, &mut self.names_refs, &None)?;
243 }
244 Ok(())
245 }
246
247 fn read_user_metadata(&mut self, key: String, value: Value) {
248 match value {
249 Value::Bytes(ref vec) => {
250 self.user_metadata.insert(key, vec.clone());
251 }
252 wrong => {
253 warn!("User metadata values must be Value::Bytes, found {wrong:?}");
254 }
255 }
256 }
257}
258
259fn read_codec(metadata: &HashMap<String, Value>) -> AvroResult<Codec> {
260 let result = metadata
261 .get("avro.codec")
262 .map(|codec| {
263 if let Value::Bytes(ref bytes) = *codec {
264 match std::str::from_utf8(bytes.as_ref()) {
265 Ok(utf8) => Ok(utf8),
266 Err(utf8_error) => Err(Details::ConvertToUtf8Error(utf8_error).into()),
267 }
268 } else {
269 Err(Details::BadCodecMetadata.into())
270 }
271 })
272 .map(|codec_res| match codec_res {
273 Ok(codec) => match Codec::from_str(codec) {
274 Ok(codec) => match codec {
275 #[cfg(feature = "bzip")]
276 Codec::Bzip2(_) => {
277 use crate::Bzip2Settings;
278 if let Some(Value::Bytes(bytes)) =
279 metadata.get("avro.codec.compression_level")
280 {
281 Ok(Codec::Bzip2(Bzip2Settings::new(bytes[0])))
282 } else {
283 Ok(codec)
284 }
285 }
286 #[cfg(feature = "xz")]
287 Codec::Xz(_) => {
288 use crate::XzSettings;
289 if let Some(Value::Bytes(bytes)) =
290 metadata.get("avro.codec.compression_level")
291 {
292 Ok(Codec::Xz(XzSettings::new(bytes[0])))
293 } else {
294 Ok(codec)
295 }
296 }
297 #[cfg(feature = "zstandard")]
298 Codec::Zstandard(_) => {
299 use crate::ZstandardSettings;
300 if let Some(Value::Bytes(bytes)) =
301 metadata.get("avro.codec.compression_level")
302 {
303 Ok(Codec::Zstandard(ZstandardSettings::new(bytes[0])))
304 } else {
305 Ok(codec)
306 }
307 }
308 _ => Ok(codec),
309 },
310 Err(_) => Err(Details::CodecNotSupported(codec.to_owned()).into()),
311 },
312 Err(err) => Err(err),
313 });
314
315 result.unwrap_or(Ok(Codec::Null))
316}
317
318pub struct Reader<'a, R> {
334 block: Block<'a, R>,
335 reader_schema: Option<&'a Schema>,
336 errored: bool,
337 should_resolve_schema: bool,
338}
339
340impl<'a, R: Read> Reader<'a, R> {
341 pub fn new(reader: R) -> AvroResult<Reader<'a, R>> {
346 let block = Block::new(reader, vec![])?;
347 let reader = Reader {
348 block,
349 reader_schema: None,
350 errored: false,
351 should_resolve_schema: false,
352 };
353 Ok(reader)
354 }
355
356 pub fn with_schema(schema: &'a Schema, reader: R) -> AvroResult<Reader<'a, R>> {
361 let block = Block::new(reader, vec![schema])?;
362 let mut reader = Reader {
363 block,
364 reader_schema: Some(schema),
365 errored: false,
366 should_resolve_schema: false,
367 };
368 reader.should_resolve_schema = reader.writer_schema() != schema;
370 Ok(reader)
371 }
372
373 pub fn with_schemata(
378 schema: &'a Schema,
379 schemata: Vec<&'a Schema>,
380 reader: R,
381 ) -> AvroResult<Reader<'a, R>> {
382 let block = Block::new(reader, schemata)?;
383 let mut reader = Reader {
384 block,
385 reader_schema: Some(schema),
386 errored: false,
387 should_resolve_schema: false,
388 };
389 reader.should_resolve_schema = reader.writer_schema() != schema;
391 Ok(reader)
392 }
393
394 #[inline]
396 pub fn writer_schema(&self) -> &Schema {
397 &self.block.writer_schema
398 }
399
400 #[inline]
402 pub fn reader_schema(&self) -> Option<&Schema> {
403 self.reader_schema
404 }
405
406 #[inline]
408 pub fn user_metadata(&self) -> &HashMap<String, Vec<u8>> {
409 &self.block.user_metadata
410 }
411
412 #[inline]
413 fn read_next(&mut self) -> AvroResult<Option<Value>> {
414 let read_schema = if self.should_resolve_schema {
415 self.reader_schema
416 } else {
417 None
418 };
419
420 self.block.read_next(read_schema)
421 }
422}
423
424impl<R: Read> Iterator for Reader<'_, R> {
425 type Item = AvroResult<Value>;
426
427 fn next(&mut self) -> Option<Self::Item> {
428 if self.errored {
430 return None;
431 };
432 match self.read_next() {
433 Ok(opt) => opt.map(Ok),
434 Err(e) => {
435 self.errored = true;
436 Some(Err(e))
437 }
438 }
439 }
440}
441
442pub fn from_avro_datum<R: Read>(
451 writer_schema: &Schema,
452 reader: &mut R,
453 reader_schema: Option<&Schema>,
454) -> AvroResult<Value> {
455 let value = decode(writer_schema, reader)?;
456 match reader_schema {
457 Some(schema) => value.resolve(schema),
458 None => Ok(value),
459 }
460}
461
462pub fn from_avro_datum_schemata<R: Read>(
469 writer_schema: &Schema,
470 writer_schemata: Vec<&Schema>,
471 reader: &mut R,
472 reader_schema: Option<&Schema>,
473) -> AvroResult<Value> {
474 from_avro_datum_reader_schemata(
475 writer_schema,
476 writer_schemata,
477 reader,
478 reader_schema,
479 Vec::with_capacity(0),
480 )
481}
482
483pub fn from_avro_datum_reader_schemata<R: Read>(
490 writer_schema: &Schema,
491 writer_schemata: Vec<&Schema>,
492 reader: &mut R,
493 reader_schema: Option<&Schema>,
494 reader_schemata: Vec<&Schema>,
495) -> AvroResult<Value> {
496 let rs = ResolvedSchema::try_from(writer_schemata)?;
497 let value = decode_internal(writer_schema, rs.get_names(), &None, reader)?;
498 match reader_schema {
499 Some(schema) => {
500 if reader_schemata.is_empty() {
501 value.resolve(schema)
502 } else {
503 value.resolve_schemata(schema, reader_schemata)
504 }
505 }
506 None => Ok(value),
507 }
508}
509
510pub struct GenericSingleObjectReader {
511 write_schema: ResolvedOwnedSchema,
512 expected_header: Vec<u8>,
513}
514
515impl GenericSingleObjectReader {
516 pub fn new(schema: Schema) -> AvroResult<GenericSingleObjectReader> {
517 let header_builder = RabinFingerprintHeader::from_schema(&schema);
518 Self::new_with_header_builder(schema, header_builder)
519 }
520
521 pub fn new_with_header_builder<HB: HeaderBuilder>(
522 schema: Schema,
523 header_builder: HB,
524 ) -> AvroResult<GenericSingleObjectReader> {
525 let expected_header = header_builder.build_header();
526 Ok(GenericSingleObjectReader {
527 write_schema: ResolvedOwnedSchema::try_from(schema)?,
528 expected_header,
529 })
530 }
531
532 pub fn read_value<R: Read>(&self, reader: &mut R) -> AvroResult<Value> {
533 let mut header = vec![0; self.expected_header.len()];
534 match reader.read_exact(&mut header) {
535 Ok(_) => {
536 if self.expected_header == header {
537 decode_internal(
538 self.write_schema.get_root_schema(),
539 self.write_schema.get_names(),
540 &None,
541 reader,
542 )
543 } else {
544 Err(
545 Details::SingleObjectHeaderMismatch(self.expected_header.clone(), header)
546 .into(),
547 )
548 }
549 }
550 Err(io_error) => Err(Details::ReadHeader(io_error).into()),
551 }
552 }
553}
554
555pub struct SpecificSingleObjectReader<T>
556where
557 T: AvroSchema,
558{
559 inner: GenericSingleObjectReader,
560 _model: PhantomData<T>,
561}
562
563impl<T> SpecificSingleObjectReader<T>
564where
565 T: AvroSchema,
566{
567 pub fn new() -> AvroResult<SpecificSingleObjectReader<T>> {
568 Ok(SpecificSingleObjectReader {
569 inner: GenericSingleObjectReader::new(T::get_schema())?,
570 _model: PhantomData,
571 })
572 }
573}
574
575impl<T> SpecificSingleObjectReader<T>
576where
577 T: AvroSchema + From<Value>,
578{
579 pub fn read_from_value<R: Read>(&self, reader: &mut R) -> AvroResult<T> {
580 self.inner.read_value(reader).map(|v| v.into())
581 }
582}
583
584impl<T> SpecificSingleObjectReader<T>
585where
586 T: AvroSchema + DeserializeOwned,
587{
588 pub fn read<R: Read>(&self, reader: &mut R) -> AvroResult<T> {
589 from_value::<T>(&self.inner.read_value(reader)?)
590 }
591}
592
593pub fn read_marker(bytes: &[u8]) -> [u8; 16] {
595 assert!(
596 bytes.len() > 16,
597 "The bytes are too short to read a marker from them"
598 );
599 let mut marker = [0_u8; 16];
600 marker.clone_from_slice(&bytes[(bytes.len() - 16)..]);
601 marker
602}
603
604#[cfg(test)]
605mod tests {
606 use super::*;
607 use crate::{encode::encode, headers::GlueSchemaUuidHeader, rabin::Rabin, types::Record};
608 use apache_avro_test_helper::TestResult;
609 use pretty_assertions::assert_eq;
610 use serde::Deserialize;
611 use std::io::Cursor;
612 use uuid::Uuid;
613
614 const SCHEMA: &str = r#"
615 {
616 "type": "record",
617 "name": "test",
618 "fields": [
619 {
620 "name": "a",
621 "type": "long",
622 "default": 42
623 },
624 {
625 "name": "b",
626 "type": "string"
627 }
628 ]
629 }
630 "#;
631 const UNION_SCHEMA: &str = r#"["null", "long"]"#;
632 const ENCODED: &[u8] = &[
633 79u8, 98u8, 106u8, 1u8, 4u8, 22u8, 97u8, 118u8, 114u8, 111u8, 46u8, 115u8, 99u8, 104u8,
634 101u8, 109u8, 97u8, 222u8, 1u8, 123u8, 34u8, 116u8, 121u8, 112u8, 101u8, 34u8, 58u8, 34u8,
635 114u8, 101u8, 99u8, 111u8, 114u8, 100u8, 34u8, 44u8, 34u8, 110u8, 97u8, 109u8, 101u8, 34u8,
636 58u8, 34u8, 116u8, 101u8, 115u8, 116u8, 34u8, 44u8, 34u8, 102u8, 105u8, 101u8, 108u8,
637 100u8, 115u8, 34u8, 58u8, 91u8, 123u8, 34u8, 110u8, 97u8, 109u8, 101u8, 34u8, 58u8, 34u8,
638 97u8, 34u8, 44u8, 34u8, 116u8, 121u8, 112u8, 101u8, 34u8, 58u8, 34u8, 108u8, 111u8, 110u8,
639 103u8, 34u8, 44u8, 34u8, 100u8, 101u8, 102u8, 97u8, 117u8, 108u8, 116u8, 34u8, 58u8, 52u8,
640 50u8, 125u8, 44u8, 123u8, 34u8, 110u8, 97u8, 109u8, 101u8, 34u8, 58u8, 34u8, 98u8, 34u8,
641 44u8, 34u8, 116u8, 121u8, 112u8, 101u8, 34u8, 58u8, 34u8, 115u8, 116u8, 114u8, 105u8,
642 110u8, 103u8, 34u8, 125u8, 93u8, 125u8, 20u8, 97u8, 118u8, 114u8, 111u8, 46u8, 99u8, 111u8,
643 100u8, 101u8, 99u8, 8u8, 110u8, 117u8, 108u8, 108u8, 0u8, 94u8, 61u8, 54u8, 221u8, 190u8,
644 207u8, 108u8, 180u8, 158u8, 57u8, 114u8, 40u8, 173u8, 199u8, 228u8, 239u8, 4u8, 20u8, 54u8,
645 6u8, 102u8, 111u8, 111u8, 84u8, 6u8, 98u8, 97u8, 114u8, 94u8, 61u8, 54u8, 221u8, 190u8,
646 207u8, 108u8, 180u8, 158u8, 57u8, 114u8, 40u8, 173u8, 199u8, 228u8, 239u8,
647 ];
648
649 #[test]
650 fn test_from_avro_datum() -> TestResult {
651 let schema = Schema::parse_str(SCHEMA)?;
652 let mut encoded: &'static [u8] = &[54, 6, 102, 111, 111];
653
654 let mut record = Record::new(&schema).unwrap();
655 record.put("a", 27i64);
656 record.put("b", "foo");
657 let expected = record.into();
658
659 assert_eq!(from_avro_datum(&schema, &mut encoded, None)?, expected);
660
661 Ok(())
662 }
663
664 #[test]
665 fn test_from_avro_datum_with_union_to_struct() -> TestResult {
666 const TEST_RECORD_SCHEMA_3240: &str = r#"
667 {
668 "type": "record",
669 "name": "test",
670 "fields": [
671 {
672 "name": "a",
673 "type": "long",
674 "default": 42
675 },
676 {
677 "name": "b",
678 "type": "string"
679 },
680 {
681 "name": "a_nullable_array",
682 "type": ["null", {"type": "array", "items": {"type": "string"}}],
683 "default": null
684 },
685 {
686 "name": "a_nullable_boolean",
687 "type": ["null", {"type": "boolean"}],
688 "default": null
689 },
690 {
691 "name": "a_nullable_string",
692 "type": ["null", {"type": "string"}],
693 "default": null
694 }
695 ]
696 }
697 "#;
698 #[derive(Default, Debug, Deserialize, PartialEq, Eq)]
699 struct TestRecord3240 {
700 a: i64,
701 b: String,
702 a_nullable_array: Option<Vec<String>>,
703 a_nullable_string: Option<String>,
706 }
707
708 let schema = Schema::parse_str(TEST_RECORD_SCHEMA_3240)?;
709 let mut encoded: &'static [u8] = &[54, 6, 102, 111, 111];
710
711 let expected_record: TestRecord3240 = TestRecord3240 {
712 a: 27i64,
713 b: String::from("foo"),
714 a_nullable_array: None,
715 a_nullable_string: None,
716 };
717
718 let avro_datum = from_avro_datum(&schema, &mut encoded, None)?;
719 let parsed_record: TestRecord3240 = match &avro_datum {
720 Value::Record(_) => from_value::<TestRecord3240>(&avro_datum)?,
721 unexpected => {
722 panic!("could not map avro data to struct, found unexpected: {unexpected:?}")
723 }
724 };
725
726 assert_eq!(parsed_record, expected_record);
727
728 Ok(())
729 }
730
731 #[test]
732 fn test_null_union() -> TestResult {
733 let schema = Schema::parse_str(UNION_SCHEMA)?;
734 let mut encoded: &'static [u8] = &[2, 0];
735
736 assert_eq!(
737 from_avro_datum(&schema, &mut encoded, None)?,
738 Value::Union(1, Box::new(Value::Long(0)))
739 );
740
741 Ok(())
742 }
743
744 #[test]
745 fn test_reader_iterator() -> TestResult {
746 let schema = Schema::parse_str(SCHEMA)?;
747 let reader = Reader::with_schema(&schema, ENCODED)?;
748
749 let mut record1 = Record::new(&schema).unwrap();
750 record1.put("a", 27i64);
751 record1.put("b", "foo");
752
753 let mut record2 = Record::new(&schema).unwrap();
754 record2.put("a", 42i64);
755 record2.put("b", "bar");
756
757 let expected = [record1.into(), record2.into()];
758
759 for (i, value) in reader.enumerate() {
760 assert_eq!(value?, expected[i]);
761 }
762
763 Ok(())
764 }
765
766 #[test]
767 fn test_reader_invalid_header() -> TestResult {
768 let schema = Schema::parse_str(SCHEMA)?;
769 let invalid = ENCODED.iter().copied().skip(1).collect::<Vec<u8>>();
770 assert!(Reader::with_schema(&schema, &invalid[..]).is_err());
771
772 Ok(())
773 }
774
775 #[test]
776 fn test_reader_invalid_block() -> TestResult {
777 let schema = Schema::parse_str(SCHEMA)?;
778 let invalid = ENCODED
779 .iter()
780 .copied()
781 .rev()
782 .skip(19)
783 .collect::<Vec<u8>>()
784 .into_iter()
785 .rev()
786 .collect::<Vec<u8>>();
787 let reader = Reader::with_schema(&schema, &invalid[..])?;
788 for value in reader {
789 assert!(value.is_err());
790 }
791
792 Ok(())
793 }
794
795 #[test]
796 fn test_reader_empty_buffer() -> TestResult {
797 let empty = Cursor::new(Vec::new());
798 assert!(Reader::new(empty).is_err());
799
800 Ok(())
801 }
802
803 #[test]
804 fn test_reader_only_header() -> TestResult {
805 let invalid = ENCODED.iter().copied().take(165).collect::<Vec<u8>>();
806 let reader = Reader::new(&invalid[..])?;
807 for value in reader {
808 assert!(value.is_err());
809 }
810
811 Ok(())
812 }
813
814 #[test]
815 fn test_avro_3405_read_user_metadata_success() -> TestResult {
816 use crate::writer::Writer;
817
818 let schema = Schema::parse_str(SCHEMA)?;
819 let mut writer = Writer::new(&schema, Vec::new())?;
820
821 let mut user_meta_data: HashMap<String, Vec<u8>> = HashMap::new();
822 user_meta_data.insert(
823 "stringKey".to_string(),
824 "stringValue".to_string().into_bytes(),
825 );
826 user_meta_data.insert("bytesKey".to_string(), b"bytesValue".to_vec());
827 user_meta_data.insert("vecKey".to_string(), vec![1, 2, 3]);
828
829 for (k, v) in user_meta_data.iter() {
830 writer.add_user_metadata(k.to_string(), v)?;
831 }
832
833 let mut record = Record::new(&schema).unwrap();
834 record.put("a", 27i64);
835 record.put("b", "foo");
836
837 writer.append(record.clone())?;
838 writer.append(record.clone())?;
839 writer.flush()?;
840 let result = writer.into_inner()?;
841
842 let reader = Reader::new(&result[..])?;
843 assert_eq!(reader.user_metadata(), &user_meta_data);
844
845 Ok(())
846 }
847
848 #[derive(Deserialize, Clone, PartialEq, Debug)]
849 struct TestSingleObjectReader {
850 a: i64,
851 b: f64,
852 c: Vec<String>,
853 }
854
855 impl AvroSchema for TestSingleObjectReader {
856 fn get_schema() -> Schema {
857 let schema = r#"
858 {
859 "type":"record",
860 "name":"TestSingleObjectWrtierSerialize",
861 "fields":[
862 {
863 "name":"a",
864 "type":"long"
865 },
866 {
867 "name":"b",
868 "type":"double"
869 },
870 {
871 "name":"c",
872 "type":{
873 "type":"array",
874 "items":"string"
875 }
876 }
877 ]
878 }
879 "#;
880 Schema::parse_str(schema).unwrap()
881 }
882 }
883
884 impl From<Value> for TestSingleObjectReader {
885 fn from(obj: Value) -> TestSingleObjectReader {
886 if let Value::Record(fields) = obj {
887 let mut a = None;
888 let mut b = None;
889 let mut c = vec![];
890 for (field_name, v) in fields {
891 match (field_name.as_str(), v) {
892 ("a", Value::Long(i)) => a = Some(i),
893 ("b", Value::Double(d)) => b = Some(d),
894 ("c", Value::Array(v)) => {
895 for inner_val in v {
896 if let Value::String(s) = inner_val {
897 c.push(s);
898 }
899 }
900 }
901 (key, value) => panic!("Unexpected pair: {key:?} -> {value:?}"),
902 }
903 }
904 TestSingleObjectReader {
905 a: a.unwrap(),
906 b: b.unwrap(),
907 c,
908 }
909 } else {
910 panic!("Expected a Value::Record but was {obj:?}")
911 }
912 }
913 }
914
915 impl From<TestSingleObjectReader> for Value {
916 fn from(obj: TestSingleObjectReader) -> Value {
917 Value::Record(vec![
918 ("a".into(), obj.a.into()),
919 ("b".into(), obj.b.into()),
920 (
921 "c".into(),
922 Value::Array(obj.c.into_iter().map(|s| s.into()).collect()),
923 ),
924 ])
925 }
926 }
927
928 #[test]
929 fn test_avro_3507_single_object_reader() -> TestResult {
930 let obj = TestSingleObjectReader {
931 a: 42,
932 b: 3.33,
933 c: vec!["cat".into(), "dog".into()],
934 };
935 let mut to_read = Vec::<u8>::new();
936 to_read.extend_from_slice(&[0xC3, 0x01]);
937 to_read.extend_from_slice(
938 &TestSingleObjectReader::get_schema()
939 .fingerprint::<Rabin>()
940 .bytes[..],
941 );
942 encode(
943 &obj.clone().into(),
944 &TestSingleObjectReader::get_schema(),
945 &mut to_read,
946 )
947 .expect("Encode should succeed");
948 let mut to_read = &to_read[..];
949 let generic_reader = GenericSingleObjectReader::new(TestSingleObjectReader::get_schema())
950 .expect("Schema should resolve");
951 let val = generic_reader
952 .read_value(&mut to_read)
953 .expect("Should read");
954 let expected_value: Value = obj.into();
955 assert_eq!(expected_value, val);
956
957 Ok(())
958 }
959
960 #[test]
961 fn avro_3642_test_single_object_reader_incomplete_reads() -> TestResult {
962 let obj = TestSingleObjectReader {
963 a: 42,
964 b: 3.33,
965 c: vec!["cat".into(), "dog".into()],
966 };
967 let to_read_1 = [0xC3, 0x01];
969 let mut to_read_2 = Vec::<u8>::new();
970 to_read_2.extend_from_slice(
971 &TestSingleObjectReader::get_schema()
972 .fingerprint::<Rabin>()
973 .bytes[..],
974 );
975 let mut to_read_3 = Vec::<u8>::new();
976 encode(
977 &obj.clone().into(),
978 &TestSingleObjectReader::get_schema(),
979 &mut to_read_3,
980 )
981 .expect("Encode should succeed");
982 let mut to_read = (&to_read_1[..]).chain(&to_read_2[..]).chain(&to_read_3[..]);
983 let generic_reader = GenericSingleObjectReader::new(TestSingleObjectReader::get_schema())
984 .expect("Schema should resolve");
985 let val = generic_reader
986 .read_value(&mut to_read)
987 .expect("Should read");
988 let expected_value: Value = obj.into();
989 assert_eq!(expected_value, val);
990
991 Ok(())
992 }
993
994 #[test]
995 fn test_avro_3507_reader_parity() -> TestResult {
996 let obj = TestSingleObjectReader {
997 a: 42,
998 b: 3.33,
999 c: vec!["cat".into(), "dog".into()],
1000 };
1001
1002 let mut to_read = Vec::<u8>::new();
1003 to_read.extend_from_slice(&[0xC3, 0x01]);
1004 to_read.extend_from_slice(
1005 &TestSingleObjectReader::get_schema()
1006 .fingerprint::<Rabin>()
1007 .bytes[..],
1008 );
1009 encode(
1010 &obj.clone().into(),
1011 &TestSingleObjectReader::get_schema(),
1012 &mut to_read,
1013 )
1014 .expect("Encode should succeed");
1015 let generic_reader = GenericSingleObjectReader::new(TestSingleObjectReader::get_schema())
1016 .expect("Schema should resolve");
1017 let specific_reader = SpecificSingleObjectReader::<TestSingleObjectReader>::new()
1018 .expect("schema should resolve");
1019 let mut to_read1 = &to_read[..];
1020 let mut to_read2 = &to_read[..];
1021 let mut to_read3 = &to_read[..];
1022
1023 let val = generic_reader
1024 .read_value(&mut to_read1)
1025 .expect("Should read");
1026 let read_obj1 = specific_reader
1027 .read_from_value(&mut to_read2)
1028 .expect("Should read from value");
1029 let read_obj2 = specific_reader
1030 .read(&mut to_read3)
1031 .expect("Should read from deserilize");
1032 let expected_value: Value = obj.clone().into();
1033 assert_eq!(obj, read_obj1);
1034 assert_eq!(obj, read_obj2);
1035 assert_eq!(val, expected_value);
1036
1037 Ok(())
1038 }
1039
1040 #[test]
1041 fn avro_rs_164_generic_reader_alternate_header() -> TestResult {
1042 let schema_uuid = Uuid::parse_str("b2f1cf00-0434-013e-439a-125eb8485a5f")?;
1043 let header_builder = GlueSchemaUuidHeader::from_uuid(schema_uuid);
1044 let generic_reader = GenericSingleObjectReader::new_with_header_builder(
1045 TestSingleObjectReader::get_schema(),
1046 header_builder,
1047 )
1048 .expect("failed to build reader");
1049 let data_to_read: Vec<u8> = vec![
1050 3, 0, 178, 241, 207, 0, 4, 52, 1, 62, 67, 154, 18, 94, 184, 72, 90, 95,
1051 ];
1052 let mut to_read = &data_to_read[..];
1053 let read_result = generic_reader
1054 .read_value(&mut to_read)
1055 .map_err(Error::into_details);
1056 matches!(read_result, Err(Details::ReadBytes(_)));
1057 Ok(())
1058 }
1059
1060 #[cfg(not(feature = "snappy"))]
1061 #[test]
1062 fn test_avro_3549_read_not_enabled_codec() {
1063 let snappy_compressed_avro = vec![
1064 79, 98, 106, 1, 4, 22, 97, 118, 114, 111, 46, 115, 99, 104, 101, 109, 97, 210, 1, 123,
1065 34, 102, 105, 101, 108, 100, 115, 34, 58, 91, 123, 34, 110, 97, 109, 101, 34, 58, 34,
1066 110, 117, 109, 34, 44, 34, 116, 121, 112, 101, 34, 58, 34, 115, 116, 114, 105, 110,
1067 103, 34, 125, 93, 44, 34, 110, 97, 109, 101, 34, 58, 34, 101, 118, 101, 110, 116, 34,
1068 44, 34, 110, 97, 109, 101, 115, 112, 97, 99, 101, 34, 58, 34, 101, 120, 97, 109, 112,
1069 108, 101, 110, 97, 109, 101, 115, 112, 97, 99, 101, 34, 44, 34, 116, 121, 112, 101, 34,
1070 58, 34, 114, 101, 99, 111, 114, 100, 34, 125, 20, 97, 118, 114, 111, 46, 99, 111, 100,
1071 101, 99, 12, 115, 110, 97, 112, 112, 121, 0, 213, 209, 241, 208, 200, 110, 164, 47,
1072 203, 25, 90, 235, 161, 167, 195, 177, 2, 20, 4, 12, 6, 49, 50, 51, 115, 38, 58, 0, 213,
1073 209, 241, 208, 200, 110, 164, 47, 203, 25, 90, 235, 161, 167, 195, 177,
1074 ];
1075
1076 if let Err(err) = Reader::new(snappy_compressed_avro.as_slice()) {
1077 assert_eq!("Codec 'snappy' is not supported/enabled", err.to_string());
1078 } else {
1079 panic!("Expected an error in the reading of the codec!");
1080 }
1081 }
1082}