1use crate::{
20 decode::{decode, decode_internal},
21 from_value,
22 headers::{HeaderBuilder, RabinFingerprintHeader},
23 schema::{
24 resolve_names, resolve_names_with_schemata, AvroSchema, Names, ResolvedOwnedSchema,
25 ResolvedSchema, Schema,
26 },
27 types::Value,
28 util, AvroResult, Codec, Error,
29};
30use log::warn;
31use serde::de::DeserializeOwned;
32use serde_json::from_slice;
33use std::{
34 collections::HashMap,
35 io::{ErrorKind, Read},
36 marker::PhantomData,
37 str::FromStr,
38};
39
40#[derive(Debug, Clone)]
42struct Block<'r, R> {
43 reader: R,
44 buf: Vec<u8>,
46 buf_idx: usize,
47 message_count: usize,
49 marker: [u8; 16],
50 codec: Codec,
51 writer_schema: Schema,
52 schemata: Vec<&'r Schema>,
53 user_metadata: HashMap<String, Vec<u8>>,
54 names_refs: Names,
55}
56
57impl<'r, R: Read> Block<'r, R> {
58 fn new(reader: R, schemata: Vec<&'r Schema>) -> AvroResult<Block<'r, R>> {
59 let mut block = Block {
60 reader,
61 codec: Codec::Null,
62 writer_schema: Schema::Null,
63 schemata,
64 buf: vec![],
65 buf_idx: 0,
66 message_count: 0,
67 marker: [0; 16],
68 user_metadata: Default::default(),
69 names_refs: Default::default(),
70 };
71
72 block.read_header()?;
73 Ok(block)
74 }
75
76 fn read_header(&mut self) -> AvroResult<()> {
79 let mut buf = [0u8; 4];
80 self.reader
81 .read_exact(&mut buf)
82 .map_err(Error::ReadHeader)?;
83
84 if buf != [b'O', b'b', b'j', 1u8] {
85 return Err(Error::HeaderMagic);
86 }
87
88 let meta_schema = Schema::map(Schema::Bytes);
89 if let Value::Map(metadata) = decode(&meta_schema, &mut self.reader)? {
90 self.read_writer_schema(&metadata)?;
91 self.codec = read_codec(&metadata)?;
92
93 for (key, value) in metadata {
94 if key == "avro.schema"
95 || key == "avro.codec"
96 || key == "avro.codec.compression_level"
97 {
98 } else if key.starts_with("avro.") {
100 warn!("Ignoring unknown metadata key: {key}");
101 } else {
102 self.read_user_metadata(key, value);
103 }
104 }
105 } else {
106 return Err(Error::GetHeaderMetadata);
107 }
108
109 self.reader
110 .read_exact(&mut self.marker)
111 .map_err(Error::ReadMarker)
112 }
113
114 fn fill_buf(&mut self, n: usize) -> AvroResult<()> {
115 self.buf.resize(util::safe_len(n)?, 0);
127 self.reader
128 .read_exact(&mut self.buf)
129 .map_err(Error::ReadIntoBuf)?;
130 self.buf_idx = 0;
131 Ok(())
132 }
133
134 fn read_block_next(&mut self) -> AvroResult<()> {
137 assert!(self.is_empty(), "Expected self to be empty!");
138 match util::read_long(&mut self.reader) {
139 Ok(block_len) => {
140 self.message_count = block_len as usize;
141 let block_bytes = util::read_long(&mut self.reader)?;
142 self.fill_buf(block_bytes as usize)?;
143 let mut marker = [0u8; 16];
144 self.reader
145 .read_exact(&mut marker)
146 .map_err(Error::ReadBlockMarker)?;
147
148 if marker != self.marker {
149 return Err(Error::GetBlockMarker);
150 }
151
152 self.codec.decompress(&mut self.buf)
159 }
160 Err(Error::ReadVariableIntegerBytes(io_err)) => {
161 if let ErrorKind::UnexpectedEof = io_err.kind() {
162 Ok(())
164 } else {
165 Err(Error::ReadVariableIntegerBytes(io_err))
166 }
167 }
168 Err(e) => Err(e),
169 }
170 }
171
172 fn len(&self) -> usize {
173 self.message_count
174 }
175
176 fn is_empty(&self) -> bool {
177 self.len() == 0
178 }
179
180 fn read_next(&mut self, read_schema: Option<&Schema>) -> AvroResult<Option<Value>> {
181 if self.is_empty() {
182 self.read_block_next()?;
183 if self.is_empty() {
184 return Ok(None);
185 }
186 }
187
188 let mut block_bytes = &self.buf[self.buf_idx..];
189 let b_original = block_bytes.len();
190
191 let item = decode_internal(
192 &self.writer_schema,
193 &self.names_refs,
194 &None,
195 &mut block_bytes,
196 )?;
197 let item = match read_schema {
198 Some(schema) => item.resolve(schema)?,
199 None => item,
200 };
201
202 if b_original != 0 && b_original == block_bytes.len() {
203 return Err(Error::ReadBlock);
205 }
206 self.buf_idx += b_original - block_bytes.len();
207 self.message_count -= 1;
208 Ok(Some(item))
209 }
210
211 fn read_writer_schema(&mut self, metadata: &HashMap<String, Value>) -> AvroResult<()> {
212 let json: serde_json::Value = metadata
213 .get("avro.schema")
214 .and_then(|bytes| {
215 if let Value::Bytes(ref bytes) = *bytes {
216 from_slice(bytes.as_ref()).ok()
217 } else {
218 None
219 }
220 })
221 .ok_or(Error::GetAvroSchemaFromMap)?;
222 if !self.schemata.is_empty() {
223 let rs = ResolvedSchema::try_from(self.schemata.clone())?;
224 let names: Names = rs
225 .get_names()
226 .iter()
227 .map(|(name, schema)| (name.clone(), (*schema).clone()))
228 .collect();
229 self.writer_schema = Schema::parse_with_names(&json, names)?;
230 resolve_names_with_schemata(&self.schemata, &mut self.names_refs, &None)?;
231 } else {
232 self.writer_schema = Schema::parse(&json)?;
233 resolve_names(&self.writer_schema, &mut self.names_refs, &None)?;
234 }
235 Ok(())
236 }
237
238 fn read_user_metadata(&mut self, key: String, value: Value) {
239 match value {
240 Value::Bytes(ref vec) => {
241 self.user_metadata.insert(key, vec.clone());
242 }
243 wrong => {
244 warn!("User metadata values must be Value::Bytes, found {wrong:?}");
245 }
246 }
247 }
248}
249
250fn read_codec(metadata: &HashMap<String, Value>) -> AvroResult<Codec> {
251 let result = metadata
252 .get("avro.codec")
253 .map(|codec| {
254 if let Value::Bytes(ref bytes) = *codec {
255 match std::str::from_utf8(bytes.as_ref()) {
256 Ok(utf8) => Ok(utf8),
257 Err(utf8_error) => Err(Error::ConvertToUtf8Error(utf8_error)),
258 }
259 } else {
260 Err(Error::BadCodecMetadata)
261 }
262 })
263 .map(|codec_res| match codec_res {
264 Ok(codec) => match Codec::from_str(codec) {
265 Ok(codec) => match codec {
266 #[cfg(feature = "bzip")]
267 Codec::Bzip2(_) => {
268 use crate::Bzip2Settings;
269 if let Some(Value::Bytes(bytes)) =
270 metadata.get("avro.codec.compression_level")
271 {
272 Ok(Codec::Bzip2(Bzip2Settings::new(bytes[0])))
273 } else {
274 Ok(codec)
275 }
276 }
277 #[cfg(feature = "xz")]
278 Codec::Xz(_) => {
279 use crate::XzSettings;
280 if let Some(Value::Bytes(bytes)) =
281 metadata.get("avro.codec.compression_level")
282 {
283 Ok(Codec::Xz(XzSettings::new(bytes[0])))
284 } else {
285 Ok(codec)
286 }
287 }
288 #[cfg(feature = "zstandard")]
289 Codec::Zstandard(_) => {
290 use crate::ZstandardSettings;
291 if let Some(Value::Bytes(bytes)) =
292 metadata.get("avro.codec.compression_level")
293 {
294 Ok(Codec::Zstandard(ZstandardSettings::new(bytes[0])))
295 } else {
296 Ok(codec)
297 }
298 }
299 _ => Ok(codec),
300 },
301 Err(_) => Err(Error::CodecNotSupported(codec.to_owned())),
302 },
303 Err(err) => Err(err),
304 });
305
306 result.unwrap_or(Ok(Codec::Null))
307}
308
309pub struct Reader<'a, R> {
325 block: Block<'a, R>,
326 reader_schema: Option<&'a Schema>,
327 errored: bool,
328 should_resolve_schema: bool,
329}
330
331impl<'a, R: Read> Reader<'a, R> {
332 pub fn new(reader: R) -> AvroResult<Reader<'a, R>> {
337 let block = Block::new(reader, vec![])?;
338 let reader = Reader {
339 block,
340 reader_schema: None,
341 errored: false,
342 should_resolve_schema: false,
343 };
344 Ok(reader)
345 }
346
347 pub fn with_schema(schema: &'a Schema, reader: R) -> AvroResult<Reader<'a, R>> {
352 let block = Block::new(reader, vec![schema])?;
353 let mut reader = Reader {
354 block,
355 reader_schema: Some(schema),
356 errored: false,
357 should_resolve_schema: false,
358 };
359 reader.should_resolve_schema = reader.writer_schema() != schema;
361 Ok(reader)
362 }
363
364 pub fn with_schemata(
369 schema: &'a Schema,
370 schemata: Vec<&'a Schema>,
371 reader: R,
372 ) -> AvroResult<Reader<'a, R>> {
373 let block = Block::new(reader, schemata)?;
374 let mut reader = Reader {
375 block,
376 reader_schema: Some(schema),
377 errored: false,
378 should_resolve_schema: false,
379 };
380 reader.should_resolve_schema = reader.writer_schema() != schema;
382 Ok(reader)
383 }
384
385 #[inline]
387 pub fn writer_schema(&self) -> &Schema {
388 &self.block.writer_schema
389 }
390
391 #[inline]
393 pub fn reader_schema(&self) -> Option<&Schema> {
394 self.reader_schema
395 }
396
397 #[inline]
399 pub fn user_metadata(&self) -> &HashMap<String, Vec<u8>> {
400 &self.block.user_metadata
401 }
402
403 #[inline]
404 fn read_next(&mut self) -> AvroResult<Option<Value>> {
405 let read_schema = if self.should_resolve_schema {
406 self.reader_schema
407 } else {
408 None
409 };
410
411 self.block.read_next(read_schema)
412 }
413}
414
415impl<R: Read> Iterator for Reader<'_, R> {
416 type Item = AvroResult<Value>;
417
418 fn next(&mut self) -> Option<Self::Item> {
419 if self.errored {
421 return None;
422 };
423 match self.read_next() {
424 Ok(opt) => opt.map(Ok),
425 Err(e) => {
426 self.errored = true;
427 Some(Err(e))
428 }
429 }
430 }
431}
432
433pub fn from_avro_datum<R: Read>(
442 writer_schema: &Schema,
443 reader: &mut R,
444 reader_schema: Option<&Schema>,
445) -> AvroResult<Value> {
446 let value = decode(writer_schema, reader)?;
447 match reader_schema {
448 Some(schema) => value.resolve(schema),
449 None => Ok(value),
450 }
451}
452
453pub fn from_avro_datum_schemata<R: Read>(
460 writer_schema: &Schema,
461 writer_schemata: Vec<&Schema>,
462 reader: &mut R,
463 reader_schema: Option<&Schema>,
464) -> AvroResult<Value> {
465 from_avro_datum_reader_schemata(
466 writer_schema,
467 writer_schemata,
468 reader,
469 reader_schema,
470 Vec::with_capacity(0),
471 )
472}
473
474pub fn from_avro_datum_reader_schemata<R: Read>(
481 writer_schema: &Schema,
482 writer_schemata: Vec<&Schema>,
483 reader: &mut R,
484 reader_schema: Option<&Schema>,
485 reader_schemata: Vec<&Schema>,
486) -> AvroResult<Value> {
487 let rs = ResolvedSchema::try_from(writer_schemata)?;
488 let value = decode_internal(writer_schema, rs.get_names(), &None, reader)?;
489 match reader_schema {
490 Some(schema) => {
491 if reader_schemata.is_empty() {
492 value.resolve(schema)
493 } else {
494 value.resolve_schemata(schema, reader_schemata)
495 }
496 }
497 None => Ok(value),
498 }
499}
500
501pub struct GenericSingleObjectReader {
502 write_schema: ResolvedOwnedSchema,
503 expected_header: Vec<u8>,
504}
505
506impl GenericSingleObjectReader {
507 pub fn new(schema: Schema) -> AvroResult<GenericSingleObjectReader> {
508 let header_builder = RabinFingerprintHeader::from_schema(&schema);
509 Self::new_with_header_builder(schema, header_builder)
510 }
511
512 pub fn new_with_header_builder<HB: HeaderBuilder>(
513 schema: Schema,
514 header_builder: HB,
515 ) -> AvroResult<GenericSingleObjectReader> {
516 let expected_header = header_builder.build_header();
517 Ok(GenericSingleObjectReader {
518 write_schema: ResolvedOwnedSchema::try_from(schema)?,
519 expected_header,
520 })
521 }
522
523 pub fn read_value<R: Read>(&self, reader: &mut R) -> AvroResult<Value> {
524 let mut header = vec![0; self.expected_header.len()];
525 match reader.read_exact(&mut header) {
526 Ok(_) => {
527 if self.expected_header == header {
528 decode_internal(
529 self.write_schema.get_root_schema(),
530 self.write_schema.get_names(),
531 &None,
532 reader,
533 )
534 } else {
535 Err(Error::SingleObjectHeaderMismatch(
536 self.expected_header.clone(),
537 header,
538 ))
539 }
540 }
541 Err(io_error) => Err(Error::ReadHeader(io_error)),
542 }
543 }
544}
545
546pub struct SpecificSingleObjectReader<T>
547where
548 T: AvroSchema,
549{
550 inner: GenericSingleObjectReader,
551 _model: PhantomData<T>,
552}
553
554impl<T> SpecificSingleObjectReader<T>
555where
556 T: AvroSchema,
557{
558 pub fn new() -> AvroResult<SpecificSingleObjectReader<T>> {
559 Ok(SpecificSingleObjectReader {
560 inner: GenericSingleObjectReader::new(T::get_schema())?,
561 _model: PhantomData,
562 })
563 }
564}
565
566impl<T> SpecificSingleObjectReader<T>
567where
568 T: AvroSchema + From<Value>,
569{
570 pub fn read_from_value<R: Read>(&self, reader: &mut R) -> AvroResult<T> {
571 self.inner.read_value(reader).map(|v| v.into())
572 }
573}
574
575impl<T> SpecificSingleObjectReader<T>
576where
577 T: AvroSchema + DeserializeOwned,
578{
579 pub fn read<R: Read>(&self, reader: &mut R) -> AvroResult<T> {
580 from_value::<T>(&self.inner.read_value(reader)?)
581 }
582}
583
584pub fn read_marker(bytes: &[u8]) -> [u8; 16] {
586 assert!(
587 bytes.len() > 16,
588 "The bytes are too short to read a marker from them"
589 );
590 let mut marker = [0_u8; 16];
591 marker.clone_from_slice(&bytes[(bytes.len() - 16)..]);
592 marker
593}
594
595#[cfg(test)]
596mod tests {
597 use super::*;
598 use crate::{encode::encode, headers::GlueSchemaUuidHeader, rabin::Rabin, types::Record};
599 use apache_avro_test_helper::TestResult;
600 use pretty_assertions::assert_eq;
601 use serde::Deserialize;
602 use std::io::Cursor;
603 use uuid::Uuid;
604
605 const SCHEMA: &str = r#"
606 {
607 "type": "record",
608 "name": "test",
609 "fields": [
610 {
611 "name": "a",
612 "type": "long",
613 "default": 42
614 },
615 {
616 "name": "b",
617 "type": "string"
618 }
619 ]
620 }
621 "#;
622 const UNION_SCHEMA: &str = r#"["null", "long"]"#;
623 const ENCODED: &[u8] = &[
624 79u8, 98u8, 106u8, 1u8, 4u8, 22u8, 97u8, 118u8, 114u8, 111u8, 46u8, 115u8, 99u8, 104u8,
625 101u8, 109u8, 97u8, 222u8, 1u8, 123u8, 34u8, 116u8, 121u8, 112u8, 101u8, 34u8, 58u8, 34u8,
626 114u8, 101u8, 99u8, 111u8, 114u8, 100u8, 34u8, 44u8, 34u8, 110u8, 97u8, 109u8, 101u8, 34u8,
627 58u8, 34u8, 116u8, 101u8, 115u8, 116u8, 34u8, 44u8, 34u8, 102u8, 105u8, 101u8, 108u8,
628 100u8, 115u8, 34u8, 58u8, 91u8, 123u8, 34u8, 110u8, 97u8, 109u8, 101u8, 34u8, 58u8, 34u8,
629 97u8, 34u8, 44u8, 34u8, 116u8, 121u8, 112u8, 101u8, 34u8, 58u8, 34u8, 108u8, 111u8, 110u8,
630 103u8, 34u8, 44u8, 34u8, 100u8, 101u8, 102u8, 97u8, 117u8, 108u8, 116u8, 34u8, 58u8, 52u8,
631 50u8, 125u8, 44u8, 123u8, 34u8, 110u8, 97u8, 109u8, 101u8, 34u8, 58u8, 34u8, 98u8, 34u8,
632 44u8, 34u8, 116u8, 121u8, 112u8, 101u8, 34u8, 58u8, 34u8, 115u8, 116u8, 114u8, 105u8,
633 110u8, 103u8, 34u8, 125u8, 93u8, 125u8, 20u8, 97u8, 118u8, 114u8, 111u8, 46u8, 99u8, 111u8,
634 100u8, 101u8, 99u8, 8u8, 110u8, 117u8, 108u8, 108u8, 0u8, 94u8, 61u8, 54u8, 221u8, 190u8,
635 207u8, 108u8, 180u8, 158u8, 57u8, 114u8, 40u8, 173u8, 199u8, 228u8, 239u8, 4u8, 20u8, 54u8,
636 6u8, 102u8, 111u8, 111u8, 84u8, 6u8, 98u8, 97u8, 114u8, 94u8, 61u8, 54u8, 221u8, 190u8,
637 207u8, 108u8, 180u8, 158u8, 57u8, 114u8, 40u8, 173u8, 199u8, 228u8, 239u8,
638 ];
639
640 #[test]
641 fn test_from_avro_datum() -> TestResult {
642 let schema = Schema::parse_str(SCHEMA)?;
643 let mut encoded: &'static [u8] = &[54, 6, 102, 111, 111];
644
645 let mut record = Record::new(&schema).unwrap();
646 record.put("a", 27i64);
647 record.put("b", "foo");
648 let expected = record.into();
649
650 assert_eq!(from_avro_datum(&schema, &mut encoded, None)?, expected);
651
652 Ok(())
653 }
654
655 #[test]
656 fn test_from_avro_datum_with_union_to_struct() -> TestResult {
657 const TEST_RECORD_SCHEMA_3240: &str = r#"
658 {
659 "type": "record",
660 "name": "test",
661 "fields": [
662 {
663 "name": "a",
664 "type": "long",
665 "default": 42
666 },
667 {
668 "name": "b",
669 "type": "string"
670 },
671 {
672 "name": "a_nullable_array",
673 "type": ["null", {"type": "array", "items": {"type": "string"}}],
674 "default": null
675 },
676 {
677 "name": "a_nullable_boolean",
678 "type": ["null", {"type": "boolean"}],
679 "default": null
680 },
681 {
682 "name": "a_nullable_string",
683 "type": ["null", {"type": "string"}],
684 "default": null
685 }
686 ]
687 }
688 "#;
689 #[derive(Default, Debug, Deserialize, PartialEq, Eq)]
690 struct TestRecord3240 {
691 a: i64,
692 b: String,
693 a_nullable_array: Option<Vec<String>>,
694 a_nullable_string: Option<String>,
697 }
698
699 let schema = Schema::parse_str(TEST_RECORD_SCHEMA_3240)?;
700 let mut encoded: &'static [u8] = &[54, 6, 102, 111, 111];
701
702 let expected_record: TestRecord3240 = TestRecord3240 {
703 a: 27i64,
704 b: String::from("foo"),
705 a_nullable_array: None,
706 a_nullable_string: None,
707 };
708
709 let avro_datum = from_avro_datum(&schema, &mut encoded, None)?;
710 let parsed_record: TestRecord3240 = match &avro_datum {
711 Value::Record(_) => from_value::<TestRecord3240>(&avro_datum)?,
712 unexpected => {
713 panic!("could not map avro data to struct, found unexpected: {unexpected:?}")
714 }
715 };
716
717 assert_eq!(parsed_record, expected_record);
718
719 Ok(())
720 }
721
722 #[test]
723 fn test_null_union() -> TestResult {
724 let schema = Schema::parse_str(UNION_SCHEMA)?;
725 let mut encoded: &'static [u8] = &[2, 0];
726
727 assert_eq!(
728 from_avro_datum(&schema, &mut encoded, None)?,
729 Value::Union(1, Box::new(Value::Long(0)))
730 );
731
732 Ok(())
733 }
734
735 #[test]
736 fn test_reader_iterator() -> TestResult {
737 let schema = Schema::parse_str(SCHEMA)?;
738 let reader = Reader::with_schema(&schema, ENCODED)?;
739
740 let mut record1 = Record::new(&schema).unwrap();
741 record1.put("a", 27i64);
742 record1.put("b", "foo");
743
744 let mut record2 = Record::new(&schema).unwrap();
745 record2.put("a", 42i64);
746 record2.put("b", "bar");
747
748 let expected = [record1.into(), record2.into()];
749
750 for (i, value) in reader.enumerate() {
751 assert_eq!(value?, expected[i]);
752 }
753
754 Ok(())
755 }
756
757 #[test]
758 fn test_reader_invalid_header() -> TestResult {
759 let schema = Schema::parse_str(SCHEMA)?;
760 let invalid = ENCODED.iter().copied().skip(1).collect::<Vec<u8>>();
761 assert!(Reader::with_schema(&schema, &invalid[..]).is_err());
762
763 Ok(())
764 }
765
766 #[test]
767 fn test_reader_invalid_block() -> TestResult {
768 let schema = Schema::parse_str(SCHEMA)?;
769 let invalid = ENCODED
770 .iter()
771 .copied()
772 .rev()
773 .skip(19)
774 .collect::<Vec<u8>>()
775 .into_iter()
776 .rev()
777 .collect::<Vec<u8>>();
778 let reader = Reader::with_schema(&schema, &invalid[..])?;
779 for value in reader {
780 assert!(value.is_err());
781 }
782
783 Ok(())
784 }
785
786 #[test]
787 fn test_reader_empty_buffer() -> TestResult {
788 let empty = Cursor::new(Vec::new());
789 assert!(Reader::new(empty).is_err());
790
791 Ok(())
792 }
793
794 #[test]
795 fn test_reader_only_header() -> TestResult {
796 let invalid = ENCODED.iter().copied().take(165).collect::<Vec<u8>>();
797 let reader = Reader::new(&invalid[..])?;
798 for value in reader {
799 assert!(value.is_err());
800 }
801
802 Ok(())
803 }
804
805 #[test]
806 fn test_avro_3405_read_user_metadata_success() -> TestResult {
807 use crate::writer::Writer;
808
809 let schema = Schema::parse_str(SCHEMA)?;
810 let mut writer = Writer::new(&schema, Vec::new());
811
812 let mut user_meta_data: HashMap<String, Vec<u8>> = HashMap::new();
813 user_meta_data.insert(
814 "stringKey".to_string(),
815 "stringValue".to_string().into_bytes(),
816 );
817 user_meta_data.insert("bytesKey".to_string(), b"bytesValue".to_vec());
818 user_meta_data.insert("vecKey".to_string(), vec![1, 2, 3]);
819
820 for (k, v) in user_meta_data.iter() {
821 writer.add_user_metadata(k.to_string(), v)?;
822 }
823
824 let mut record = Record::new(&schema).unwrap();
825 record.put("a", 27i64);
826 record.put("b", "foo");
827
828 writer.append(record.clone())?;
829 writer.append(record.clone())?;
830 writer.flush()?;
831 let result = writer.into_inner()?;
832
833 let reader = Reader::new(&result[..])?;
834 assert_eq!(reader.user_metadata(), &user_meta_data);
835
836 Ok(())
837 }
838
839 #[derive(Deserialize, Clone, PartialEq, Debug)]
840 struct TestSingleObjectReader {
841 a: i64,
842 b: f64,
843 c: Vec<String>,
844 }
845
846 impl AvroSchema for TestSingleObjectReader {
847 fn get_schema() -> Schema {
848 let schema = r#"
849 {
850 "type":"record",
851 "name":"TestSingleObjectWrtierSerialize",
852 "fields":[
853 {
854 "name":"a",
855 "type":"long"
856 },
857 {
858 "name":"b",
859 "type":"double"
860 },
861 {
862 "name":"c",
863 "type":{
864 "type":"array",
865 "items":"string"
866 }
867 }
868 ]
869 }
870 "#;
871 Schema::parse_str(schema).unwrap()
872 }
873 }
874
875 impl From<Value> for TestSingleObjectReader {
876 fn from(obj: Value) -> TestSingleObjectReader {
877 if let Value::Record(fields) = obj {
878 let mut a = None;
879 let mut b = None;
880 let mut c = vec![];
881 for (field_name, v) in fields {
882 match (field_name.as_str(), v) {
883 ("a", Value::Long(i)) => a = Some(i),
884 ("b", Value::Double(d)) => b = Some(d),
885 ("c", Value::Array(v)) => {
886 for inner_val in v {
887 if let Value::String(s) = inner_val {
888 c.push(s);
889 }
890 }
891 }
892 (key, value) => panic!("Unexpected pair: {key:?} -> {value:?}"),
893 }
894 }
895 TestSingleObjectReader {
896 a: a.unwrap(),
897 b: b.unwrap(),
898 c,
899 }
900 } else {
901 panic!("Expected a Value::Record but was {obj:?}")
902 }
903 }
904 }
905
906 impl From<TestSingleObjectReader> for Value {
907 fn from(obj: TestSingleObjectReader) -> Value {
908 Value::Record(vec![
909 ("a".into(), obj.a.into()),
910 ("b".into(), obj.b.into()),
911 (
912 "c".into(),
913 Value::Array(obj.c.into_iter().map(|s| s.into()).collect()),
914 ),
915 ])
916 }
917 }
918
919 #[test]
920 fn test_avro_3507_single_object_reader() -> TestResult {
921 let obj = TestSingleObjectReader {
922 a: 42,
923 b: 3.33,
924 c: vec!["cat".into(), "dog".into()],
925 };
926 let mut to_read = Vec::<u8>::new();
927 to_read.extend_from_slice(&[0xC3, 0x01]);
928 to_read.extend_from_slice(
929 &TestSingleObjectReader::get_schema()
930 .fingerprint::<Rabin>()
931 .bytes[..],
932 );
933 encode(
934 &obj.clone().into(),
935 &TestSingleObjectReader::get_schema(),
936 &mut to_read,
937 )
938 .expect("Encode should succeed");
939 let mut to_read = &to_read[..];
940 let generic_reader = GenericSingleObjectReader::new(TestSingleObjectReader::get_schema())
941 .expect("Schema should resolve");
942 let val = generic_reader
943 .read_value(&mut to_read)
944 .expect("Should read");
945 let expected_value: Value = obj.into();
946 assert_eq!(expected_value, val);
947
948 Ok(())
949 }
950
951 #[test]
952 fn avro_3642_test_single_object_reader_incomplete_reads() -> TestResult {
953 let obj = TestSingleObjectReader {
954 a: 42,
955 b: 3.33,
956 c: vec!["cat".into(), "dog".into()],
957 };
958 let to_read_1 = [0xC3, 0x01];
960 let mut to_read_2 = Vec::<u8>::new();
961 to_read_2.extend_from_slice(
962 &TestSingleObjectReader::get_schema()
963 .fingerprint::<Rabin>()
964 .bytes[..],
965 );
966 let mut to_read_3 = Vec::<u8>::new();
967 encode(
968 &obj.clone().into(),
969 &TestSingleObjectReader::get_schema(),
970 &mut to_read_3,
971 )
972 .expect("Encode should succeed");
973 let mut to_read = (&to_read_1[..]).chain(&to_read_2[..]).chain(&to_read_3[..]);
974 let generic_reader = GenericSingleObjectReader::new(TestSingleObjectReader::get_schema())
975 .expect("Schema should resolve");
976 let val = generic_reader
977 .read_value(&mut to_read)
978 .expect("Should read");
979 let expected_value: Value = obj.into();
980 assert_eq!(expected_value, val);
981
982 Ok(())
983 }
984
985 #[test]
986 fn test_avro_3507_reader_parity() -> TestResult {
987 let obj = TestSingleObjectReader {
988 a: 42,
989 b: 3.33,
990 c: vec!["cat".into(), "dog".into()],
991 };
992
993 let mut to_read = Vec::<u8>::new();
994 to_read.extend_from_slice(&[0xC3, 0x01]);
995 to_read.extend_from_slice(
996 &TestSingleObjectReader::get_schema()
997 .fingerprint::<Rabin>()
998 .bytes[..],
999 );
1000 encode(
1001 &obj.clone().into(),
1002 &TestSingleObjectReader::get_schema(),
1003 &mut to_read,
1004 )
1005 .expect("Encode should succeed");
1006 let generic_reader = GenericSingleObjectReader::new(TestSingleObjectReader::get_schema())
1007 .expect("Schema should resolve");
1008 let specific_reader = SpecificSingleObjectReader::<TestSingleObjectReader>::new()
1009 .expect("schema should resolve");
1010 let mut to_read1 = &to_read[..];
1011 let mut to_read2 = &to_read[..];
1012 let mut to_read3 = &to_read[..];
1013
1014 let val = generic_reader
1015 .read_value(&mut to_read1)
1016 .expect("Should read");
1017 let read_obj1 = specific_reader
1018 .read_from_value(&mut to_read2)
1019 .expect("Should read from value");
1020 let read_obj2 = specific_reader
1021 .read(&mut to_read3)
1022 .expect("Should read from deserilize");
1023 let expected_value: Value = obj.clone().into();
1024 assert_eq!(obj, read_obj1);
1025 assert_eq!(obj, read_obj2);
1026 assert_eq!(val, expected_value);
1027
1028 Ok(())
1029 }
1030
1031 #[test]
1032 fn avro_rs_164_generic_reader_alternate_header() -> TestResult {
1033 let schema_uuid = Uuid::parse_str("b2f1cf00-0434-013e-439a-125eb8485a5f")?;
1034 let header_builder = GlueSchemaUuidHeader::from_uuid(schema_uuid);
1035 let generic_reader = GenericSingleObjectReader::new_with_header_builder(
1036 TestSingleObjectReader::get_schema(),
1037 header_builder,
1038 )
1039 .expect("failed to build reader");
1040 let data_to_read: Vec<u8> = vec![
1041 3, 0, 178, 241, 207, 0, 4, 52, 1, 62, 67, 154, 18, 94, 184, 72, 90, 95,
1042 ];
1043 let mut to_read = &data_to_read[..];
1044 let read_result = generic_reader.read_value(&mut to_read);
1045 matches!(read_result, Err(crate::Error::ReadBytes(_)));
1046 Ok(())
1047 }
1048
1049 #[cfg(not(feature = "snappy"))]
1050 #[test]
1051 fn test_avro_3549_read_not_enabled_codec() {
1052 let snappy_compressed_avro = vec![
1053 79, 98, 106, 1, 4, 22, 97, 118, 114, 111, 46, 115, 99, 104, 101, 109, 97, 210, 1, 123,
1054 34, 102, 105, 101, 108, 100, 115, 34, 58, 91, 123, 34, 110, 97, 109, 101, 34, 58, 34,
1055 110, 117, 109, 34, 44, 34, 116, 121, 112, 101, 34, 58, 34, 115, 116, 114, 105, 110,
1056 103, 34, 125, 93, 44, 34, 110, 97, 109, 101, 34, 58, 34, 101, 118, 101, 110, 116, 34,
1057 44, 34, 110, 97, 109, 101, 115, 112, 97, 99, 101, 34, 58, 34, 101, 120, 97, 109, 112,
1058 108, 101, 110, 97, 109, 101, 115, 112, 97, 99, 101, 34, 44, 34, 116, 121, 112, 101, 34,
1059 58, 34, 114, 101, 99, 111, 114, 100, 34, 125, 20, 97, 118, 114, 111, 46, 99, 111, 100,
1060 101, 99, 12, 115, 110, 97, 112, 112, 121, 0, 213, 209, 241, 208, 200, 110, 164, 47,
1061 203, 25, 90, 235, 161, 167, 195, 177, 2, 20, 4, 12, 6, 49, 50, 51, 115, 38, 58, 0, 213,
1062 209, 241, 208, 200, 110, 164, 47, 203, 25, 90, 235, 161, 167, 195, 177,
1063 ];
1064
1065 if let Err(err) = Reader::new(snappy_compressed_avro.as_slice()) {
1066 assert_eq!("Codec 'snappy' is not supported/enabled", err.to_string());
1067 } else {
1068 panic!("Expected an error in the reading of the codec!");
1069 }
1070 }
1071}