1use crate::{
20 AvroResult, Codec, Error,
21 decode::{decode, decode_internal},
22 error::Details,
23 from_value,
24 headers::{HeaderBuilder, RabinFingerprintHeader},
25 schema::{
26 Names, ResolvedOwnedSchema, ResolvedSchema, Schema, resolve_names,
27 resolve_names_with_schemata,
28 },
29 serde::AvroSchema,
30 types::Value,
31 util,
32};
33use log::warn;
34use serde::de::DeserializeOwned;
35use serde_json::from_slice;
36use std::{
37 collections::HashMap,
38 io::{ErrorKind, Read},
39 marker::PhantomData,
40 str::FromStr,
41};
42
43#[derive(Debug, Clone)]
45struct Block<'r, R> {
46 reader: R,
47 buf: Vec<u8>,
49 buf_idx: usize,
50 message_count: usize,
52 marker: [u8; 16],
53 codec: Codec,
54 writer_schema: Schema,
55 schemata: Vec<&'r Schema>,
56 user_metadata: HashMap<String, Vec<u8>>,
57 names_refs: Names,
58}
59
60impl<'r, R: Read> Block<'r, R> {
61 fn new(reader: R, schemata: Vec<&'r Schema>) -> AvroResult<Block<'r, R>> {
62 let mut block = Block {
63 reader,
64 codec: Codec::Null,
65 writer_schema: Schema::Null,
66 schemata,
67 buf: vec![],
68 buf_idx: 0,
69 message_count: 0,
70 marker: [0; 16],
71 user_metadata: Default::default(),
72 names_refs: Default::default(),
73 };
74
75 block.read_header()?;
76 Ok(block)
77 }
78
79 fn read_header(&mut self) -> AvroResult<()> {
82 let mut buf = [0u8; 4];
83 self.reader
84 .read_exact(&mut buf)
85 .map_err(Details::ReadHeader)?;
86
87 if buf != [b'O', b'b', b'j', 1u8] {
88 return Err(Details::HeaderMagic.into());
89 }
90
91 let meta_schema = Schema::map(Schema::Bytes);
92 match decode(&meta_schema, &mut self.reader)? {
93 Value::Map(metadata) => {
94 self.read_writer_schema(&metadata)?;
95 self.codec = read_codec(&metadata)?;
96
97 for (key, value) in metadata {
98 if key == "avro.schema"
99 || key == "avro.codec"
100 || key == "avro.codec.compression_level"
101 {
102 } else if key.starts_with("avro.") {
104 warn!("Ignoring unknown metadata key: {key}");
105 } else {
106 self.read_user_metadata(key, value);
107 }
108 }
109 }
110 _ => {
111 return Err(Details::GetHeaderMetadata.into());
112 }
113 }
114
115 self.reader
116 .read_exact(&mut self.marker)
117 .map_err(|e| Details::ReadMarker(e).into())
118 }
119
120 fn fill_buf(&mut self, n: usize) -> AvroResult<()> {
121 self.buf.resize(util::safe_len(n)?, 0);
133 self.reader
134 .read_exact(&mut self.buf)
135 .map_err(Details::ReadIntoBuf)?;
136 self.buf_idx = 0;
137 Ok(())
138 }
139
140 fn read_block_next(&mut self) -> AvroResult<()> {
143 assert!(self.is_empty(), "Expected self to be empty!");
144 match util::read_long(&mut self.reader).map_err(Error::into_details) {
145 Ok(block_len) => {
146 self.message_count = block_len as usize;
147 let block_bytes = util::read_long(&mut self.reader)?;
148 self.fill_buf(block_bytes as usize)?;
149 let mut marker = [0u8; 16];
150 self.reader
151 .read_exact(&mut marker)
152 .map_err(Details::ReadBlockMarker)?;
153
154 if marker != self.marker {
155 return Err(Details::GetBlockMarker.into());
156 }
157
158 self.codec.decompress(&mut self.buf)
165 }
166 Err(Details::ReadVariableIntegerBytes(io_err)) => {
167 if let ErrorKind::UnexpectedEof = io_err.kind() {
168 Ok(())
170 } else {
171 Err(Details::ReadVariableIntegerBytes(io_err).into())
172 }
173 }
174 Err(e) => Err(Error::new(e)),
175 }
176 }
177
178 fn len(&self) -> usize {
179 self.message_count
180 }
181
182 fn is_empty(&self) -> bool {
183 self.len() == 0
184 }
185
186 fn read_next(&mut self, read_schema: Option<&Schema>) -> AvroResult<Option<Value>> {
187 if self.is_empty() {
188 self.read_block_next()?;
189 if self.is_empty() {
190 return Ok(None);
191 }
192 }
193
194 let mut block_bytes = &self.buf[self.buf_idx..];
195 let b_original = block_bytes.len();
196
197 let item = decode_internal(
198 &self.writer_schema,
199 &self.names_refs,
200 &None,
201 &mut block_bytes,
202 )?;
203 let item = match read_schema {
204 Some(schema) => item.resolve(schema)?,
205 None => item,
206 };
207
208 if b_original != 0 && b_original == block_bytes.len() {
209 return Err(Details::ReadBlock.into());
211 }
212 self.buf_idx += b_original - block_bytes.len();
213 self.message_count -= 1;
214 Ok(Some(item))
215 }
216
217 fn read_writer_schema(&mut self, metadata: &HashMap<String, Value>) -> AvroResult<()> {
218 let json: serde_json::Value = metadata
219 .get("avro.schema")
220 .and_then(|bytes| {
221 if let Value::Bytes(ref bytes) = *bytes {
222 from_slice(bytes.as_ref()).ok()
223 } else {
224 None
225 }
226 })
227 .ok_or(Details::GetAvroSchemaFromMap)?;
228 if !self.schemata.is_empty() {
229 let mut names = HashMap::new();
230 resolve_names_with_schemata(
231 self.schemata.iter().copied(),
232 &mut names,
233 &None,
234 &HashMap::new(),
235 )?;
236 self.names_refs = names.into_iter().map(|(n, s)| (n, s.clone())).collect();
237 self.writer_schema = Schema::parse_with_names(&json, self.names_refs.clone())?;
238 } else {
239 self.writer_schema = Schema::parse(&json)?;
240 let mut names = HashMap::new();
241 resolve_names(&self.writer_schema, &mut names, &None, &HashMap::new())?;
242 self.names_refs = names.into_iter().map(|(n, s)| (n, s.clone())).collect();
243 }
244 Ok(())
245 }
246
247 fn read_user_metadata(&mut self, key: String, value: Value) {
248 match value {
249 Value::Bytes(ref vec) => {
250 self.user_metadata.insert(key, vec.clone());
251 }
252 wrong => {
253 warn!("User metadata values must be Value::Bytes, found {wrong:?}");
254 }
255 }
256 }
257}
258
259fn read_codec(metadata: &HashMap<String, Value>) -> AvroResult<Codec> {
260 let result = metadata
261 .get("avro.codec")
262 .map(|codec| {
263 if let Value::Bytes(ref bytes) = *codec {
264 match std::str::from_utf8(bytes.as_ref()) {
265 Ok(utf8) => Ok(utf8),
266 Err(utf8_error) => Err(Details::ConvertToUtf8Error(utf8_error).into()),
267 }
268 } else {
269 Err(Details::BadCodecMetadata.into())
270 }
271 })
272 .map(|codec_res| match codec_res {
273 Ok(codec) => match Codec::from_str(codec) {
274 Ok(codec) => match codec {
275 #[cfg(feature = "bzip")]
276 Codec::Bzip2(_) => {
277 use crate::Bzip2Settings;
278 if let Some(Value::Bytes(bytes)) =
279 metadata.get("avro.codec.compression_level")
280 {
281 Ok(Codec::Bzip2(Bzip2Settings::new(bytes[0])))
282 } else {
283 Ok(codec)
284 }
285 }
286 #[cfg(feature = "xz")]
287 Codec::Xz(_) => {
288 use crate::XzSettings;
289 if let Some(Value::Bytes(bytes)) =
290 metadata.get("avro.codec.compression_level")
291 {
292 Ok(Codec::Xz(XzSettings::new(bytes[0])))
293 } else {
294 Ok(codec)
295 }
296 }
297 #[cfg(feature = "zstandard")]
298 Codec::Zstandard(_) => {
299 use crate::ZstandardSettings;
300 if let Some(Value::Bytes(bytes)) =
301 metadata.get("avro.codec.compression_level")
302 {
303 Ok(Codec::Zstandard(ZstandardSettings::new(bytes[0])))
304 } else {
305 Ok(codec)
306 }
307 }
308 _ => Ok(codec),
309 },
310 Err(_) => Err(Details::CodecNotSupported(codec.to_owned()).into()),
311 },
312 Err(err) => Err(err),
313 });
314
315 result.unwrap_or(Ok(Codec::Null))
316}
317
318pub struct Reader<'a, R> {
334 block: Block<'a, R>,
335 reader_schema: Option<&'a Schema>,
336 errored: bool,
337 should_resolve_schema: bool,
338}
339
340impl<'a, R: Read> Reader<'a, R> {
341 pub fn new(reader: R) -> AvroResult<Reader<'a, R>> {
346 let block = Block::new(reader, vec![])?;
347 let reader = Reader {
348 block,
349 reader_schema: None,
350 errored: false,
351 should_resolve_schema: false,
352 };
353 Ok(reader)
354 }
355
356 pub fn with_schema(schema: &'a Schema, reader: R) -> AvroResult<Reader<'a, R>> {
361 let block = Block::new(reader, vec![schema])?;
362 let mut reader = Reader {
363 block,
364 reader_schema: Some(schema),
365 errored: false,
366 should_resolve_schema: false,
367 };
368 reader.should_resolve_schema = reader.writer_schema() != schema;
370 Ok(reader)
371 }
372
373 pub fn with_schemata(
378 schema: &'a Schema,
379 schemata: Vec<&'a Schema>,
380 reader: R,
381 ) -> AvroResult<Reader<'a, R>> {
382 let block = Block::new(reader, schemata)?;
383 let mut reader = Reader {
384 block,
385 reader_schema: Some(schema),
386 errored: false,
387 should_resolve_schema: false,
388 };
389 reader.should_resolve_schema = reader.writer_schema() != schema;
391 Ok(reader)
392 }
393
394 #[inline]
396 pub fn writer_schema(&self) -> &Schema {
397 &self.block.writer_schema
398 }
399
400 #[inline]
402 pub fn reader_schema(&self) -> Option<&Schema> {
403 self.reader_schema
404 }
405
406 #[inline]
408 pub fn user_metadata(&self) -> &HashMap<String, Vec<u8>> {
409 &self.block.user_metadata
410 }
411
412 #[inline]
413 fn read_next(&mut self) -> AvroResult<Option<Value>> {
414 let read_schema = if self.should_resolve_schema {
415 self.reader_schema
416 } else {
417 None
418 };
419
420 self.block.read_next(read_schema)
421 }
422}
423
424impl<R: Read> Iterator for Reader<'_, R> {
425 type Item = AvroResult<Value>;
426
427 fn next(&mut self) -> Option<Self::Item> {
428 if self.errored {
430 return None;
431 };
432 match self.read_next() {
433 Ok(opt) => opt.map(Ok),
434 Err(e) => {
435 self.errored = true;
436 Some(Err(e))
437 }
438 }
439 }
440}
441
442pub fn from_avro_datum<R: Read>(
451 writer_schema: &Schema,
452 reader: &mut R,
453 reader_schema: Option<&Schema>,
454) -> AvroResult<Value> {
455 let value = decode(writer_schema, reader)?;
456 match reader_schema {
457 Some(schema) => value.resolve(schema),
458 None => Ok(value),
459 }
460}
461
462pub fn from_avro_datum_schemata<R: Read>(
469 writer_schema: &Schema,
470 writer_schemata: Vec<&Schema>,
471 reader: &mut R,
472 reader_schema: Option<&Schema>,
473) -> AvroResult<Value> {
474 from_avro_datum_reader_schemata(
475 writer_schema,
476 writer_schemata,
477 reader,
478 reader_schema,
479 Vec::with_capacity(0),
480 )
481}
482
483pub fn from_avro_datum_reader_schemata<R: Read>(
490 writer_schema: &Schema,
491 writer_schemata: Vec<&Schema>,
492 reader: &mut R,
493 reader_schema: Option<&Schema>,
494 reader_schemata: Vec<&Schema>,
495) -> AvroResult<Value> {
496 let rs = ResolvedSchema::try_from(writer_schemata)?;
497 let value = decode_internal(writer_schema, rs.get_names(), &None, reader)?;
498 match reader_schema {
499 Some(schema) => {
500 if reader_schemata.is_empty() {
501 value.resolve(schema)
502 } else {
503 value.resolve_schemata(schema, reader_schemata)
504 }
505 }
506 None => Ok(value),
507 }
508}
509
510pub struct GenericSingleObjectReader {
511 write_schema: ResolvedOwnedSchema,
512 expected_header: Vec<u8>,
513}
514
515impl GenericSingleObjectReader {
516 pub fn new(schema: Schema) -> AvroResult<GenericSingleObjectReader> {
517 let header_builder = RabinFingerprintHeader::from_schema(&schema);
518 Self::new_with_header_builder(schema, header_builder)
519 }
520
521 pub fn new_with_header_builder<HB: HeaderBuilder>(
522 schema: Schema,
523 header_builder: HB,
524 ) -> AvroResult<GenericSingleObjectReader> {
525 let expected_header = header_builder.build_header();
526 Ok(GenericSingleObjectReader {
527 write_schema: ResolvedOwnedSchema::try_from(schema)?,
528 expected_header,
529 })
530 }
531
532 pub fn read_value<R: Read>(&self, reader: &mut R) -> AvroResult<Value> {
533 let mut header = vec![0; self.expected_header.len()];
534 match reader.read_exact(&mut header) {
535 Ok(_) => {
536 if self.expected_header == header {
537 decode_internal(
538 self.write_schema.get_root_schema(),
539 self.write_schema.get_names(),
540 &None,
541 reader,
542 )
543 } else {
544 Err(
545 Details::SingleObjectHeaderMismatch(self.expected_header.clone(), header)
546 .into(),
547 )
548 }
549 }
550 Err(io_error) => Err(Details::ReadHeader(io_error).into()),
551 }
552 }
553}
554
555pub struct SpecificSingleObjectReader<T>
556where
557 T: AvroSchema,
558{
559 inner: GenericSingleObjectReader,
560 _model: PhantomData<T>,
561}
562
563impl<T> SpecificSingleObjectReader<T>
564where
565 T: AvroSchema,
566{
567 pub fn new() -> AvroResult<SpecificSingleObjectReader<T>> {
568 Ok(SpecificSingleObjectReader {
569 inner: GenericSingleObjectReader::new(T::get_schema())?,
570 _model: PhantomData,
571 })
572 }
573}
574
575impl<T> SpecificSingleObjectReader<T>
576where
577 T: AvroSchema + From<Value>,
578{
579 pub fn read_from_value<R: Read>(&self, reader: &mut R) -> AvroResult<T> {
580 self.inner.read_value(reader).map(|v| v.into())
581 }
582}
583
584impl<T> SpecificSingleObjectReader<T>
585where
586 T: AvroSchema + DeserializeOwned,
587{
588 pub fn read<R: Read>(&self, reader: &mut R) -> AvroResult<T> {
589 from_value::<T>(&self.inner.read_value(reader)?)
590 }
591}
592
593pub fn read_marker(bytes: &[u8]) -> [u8; 16] {
595 assert!(
596 bytes.len() > 16,
597 "The bytes are too short to read a marker from them"
598 );
599 let mut marker = [0_u8; 16];
600 marker.clone_from_slice(&bytes[(bytes.len() - 16)..]);
601 marker
602}
603
604#[cfg(test)]
605mod tests {
606 use super::*;
607 use crate::{encode::encode, headers::GlueSchemaUuidHeader, rabin::Rabin, types::Record};
608 use apache_avro_test_helper::TestResult;
609 use pretty_assertions::assert_eq;
610 use serde::Deserialize;
611 use std::io::Cursor;
612 use uuid::Uuid;
613
614 const SCHEMA: &str = r#"
615 {
616 "type": "record",
617 "name": "test",
618 "fields": [
619 {
620 "name": "a",
621 "type": "long",
622 "default": 42
623 },
624 {
625 "name": "b",
626 "type": "string"
627 }
628 ]
629 }
630 "#;
631 const UNION_SCHEMA: &str = r#"["null", "long"]"#;
632 const ENCODED: &[u8] = &[
633 79u8, 98u8, 106u8, 1u8, 4u8, 22u8, 97u8, 118u8, 114u8, 111u8, 46u8, 115u8, 99u8, 104u8,
634 101u8, 109u8, 97u8, 222u8, 1u8, 123u8, 34u8, 116u8, 121u8, 112u8, 101u8, 34u8, 58u8, 34u8,
635 114u8, 101u8, 99u8, 111u8, 114u8, 100u8, 34u8, 44u8, 34u8, 110u8, 97u8, 109u8, 101u8, 34u8,
636 58u8, 34u8, 116u8, 101u8, 115u8, 116u8, 34u8, 44u8, 34u8, 102u8, 105u8, 101u8, 108u8,
637 100u8, 115u8, 34u8, 58u8, 91u8, 123u8, 34u8, 110u8, 97u8, 109u8, 101u8, 34u8, 58u8, 34u8,
638 97u8, 34u8, 44u8, 34u8, 116u8, 121u8, 112u8, 101u8, 34u8, 58u8, 34u8, 108u8, 111u8, 110u8,
639 103u8, 34u8, 44u8, 34u8, 100u8, 101u8, 102u8, 97u8, 117u8, 108u8, 116u8, 34u8, 58u8, 52u8,
640 50u8, 125u8, 44u8, 123u8, 34u8, 110u8, 97u8, 109u8, 101u8, 34u8, 58u8, 34u8, 98u8, 34u8,
641 44u8, 34u8, 116u8, 121u8, 112u8, 101u8, 34u8, 58u8, 34u8, 115u8, 116u8, 114u8, 105u8,
642 110u8, 103u8, 34u8, 125u8, 93u8, 125u8, 20u8, 97u8, 118u8, 114u8, 111u8, 46u8, 99u8, 111u8,
643 100u8, 101u8, 99u8, 8u8, 110u8, 117u8, 108u8, 108u8, 0u8, 94u8, 61u8, 54u8, 221u8, 190u8,
644 207u8, 108u8, 180u8, 158u8, 57u8, 114u8, 40u8, 173u8, 199u8, 228u8, 239u8, 4u8, 20u8, 54u8,
645 6u8, 102u8, 111u8, 111u8, 84u8, 6u8, 98u8, 97u8, 114u8, 94u8, 61u8, 54u8, 221u8, 190u8,
646 207u8, 108u8, 180u8, 158u8, 57u8, 114u8, 40u8, 173u8, 199u8, 228u8, 239u8,
647 ];
648
649 #[test]
650 fn test_from_avro_datum() -> TestResult {
651 let schema = Schema::parse_str(SCHEMA)?;
652 let mut encoded: &'static [u8] = &[54, 6, 102, 111, 111];
653
654 let mut record = Record::new(&schema).unwrap();
655 record.put("a", 27i64);
656 record.put("b", "foo");
657 let expected = record.into();
658
659 assert_eq!(from_avro_datum(&schema, &mut encoded, None)?, expected);
660
661 Ok(())
662 }
663
664 #[test]
665 fn test_from_avro_datum_with_union_to_struct() -> TestResult {
666 const TEST_RECORD_SCHEMA_3240: &str = r#"
667 {
668 "type": "record",
669 "name": "test",
670 "fields": [
671 {
672 "name": "a",
673 "type": "long",
674 "default": 42
675 },
676 {
677 "name": "b",
678 "type": "string"
679 },
680 {
681 "name": "a_nullable_array",
682 "type": ["null", {"type": "array", "items": {"type": "string"}}],
683 "default": null
684 },
685 {
686 "name": "a_nullable_boolean",
687 "type": ["null", {"type": "boolean"}],
688 "default": null
689 },
690 {
691 "name": "a_nullable_string",
692 "type": ["null", {"type": "string"}],
693 "default": null
694 }
695 ]
696 }
697 "#;
698 #[derive(Default, Debug, Deserialize, PartialEq, Eq)]
699 struct TestRecord3240 {
700 a: i64,
701 b: String,
702 a_nullable_array: Option<Vec<String>>,
703 a_nullable_string: Option<String>,
706 }
707
708 let schema = Schema::parse_str(TEST_RECORD_SCHEMA_3240)?;
709 let mut encoded: &'static [u8] = &[54, 6, 102, 111, 111];
710
711 let expected_record: TestRecord3240 = TestRecord3240 {
712 a: 27i64,
713 b: String::from("foo"),
714 a_nullable_array: None,
715 a_nullable_string: None,
716 };
717
718 let avro_datum = from_avro_datum(&schema, &mut encoded, None)?;
719 let parsed_record: TestRecord3240 = match &avro_datum {
720 Value::Record(_) => from_value::<TestRecord3240>(&avro_datum)?,
721 unexpected => {
722 panic!("could not map avro data to struct, found unexpected: {unexpected:?}")
723 }
724 };
725
726 assert_eq!(parsed_record, expected_record);
727
728 Ok(())
729 }
730
731 #[test]
732 fn test_null_union() -> TestResult {
733 let schema = Schema::parse_str(UNION_SCHEMA)?;
734 let mut encoded: &'static [u8] = &[2, 0];
735
736 assert_eq!(
737 from_avro_datum(&schema, &mut encoded, None)?,
738 Value::Union(1, Box::new(Value::Long(0)))
739 );
740
741 Ok(())
742 }
743
744 #[test]
745 fn test_reader_iterator() -> TestResult {
746 let schema = Schema::parse_str(SCHEMA)?;
747 let reader = Reader::with_schema(&schema, ENCODED)?;
748
749 let mut record1 = Record::new(&schema).unwrap();
750 record1.put("a", 27i64);
751 record1.put("b", "foo");
752
753 let mut record2 = Record::new(&schema).unwrap();
754 record2.put("a", 42i64);
755 record2.put("b", "bar");
756
757 let expected = [record1.into(), record2.into()];
758
759 for (i, value) in reader.enumerate() {
760 assert_eq!(value?, expected[i]);
761 }
762
763 Ok(())
764 }
765
766 #[test]
767 fn test_reader_invalid_header() -> TestResult {
768 let schema = Schema::parse_str(SCHEMA)?;
769 let mut invalid = &ENCODED[1..];
770 assert!(Reader::with_schema(&schema, &mut invalid).is_err());
771
772 Ok(())
773 }
774
775 #[test]
776 fn test_reader_invalid_block() -> TestResult {
777 let schema = Schema::parse_str(SCHEMA)?;
778 let mut invalid = &ENCODED[0..ENCODED.len() - 19];
779 let reader = Reader::with_schema(&schema, &mut invalid)?;
780 for value in reader {
781 assert!(value.is_err());
782 }
783
784 Ok(())
785 }
786
787 #[test]
788 fn test_reader_empty_buffer() -> TestResult {
789 let empty = Cursor::new(Vec::new());
790 assert!(Reader::new(empty).is_err());
791
792 Ok(())
793 }
794
795 #[test]
796 fn test_reader_only_header() -> TestResult {
797 let mut invalid = &ENCODED[..165];
798 let reader = Reader::new(&mut invalid)?;
799 for value in reader {
800 assert!(value.is_err());
801 }
802
803 Ok(())
804 }
805
806 #[test]
807 fn test_avro_3405_read_user_metadata_success() -> TestResult {
808 use crate::writer::Writer;
809
810 let schema = Schema::parse_str(SCHEMA)?;
811 let mut writer = Writer::new(&schema, Vec::new())?;
812
813 let mut user_meta_data: HashMap<String, Vec<u8>> = HashMap::new();
814 user_meta_data.insert(
815 "stringKey".to_string(),
816 "stringValue".to_string().into_bytes(),
817 );
818 user_meta_data.insert("bytesKey".to_string(), b"bytesValue".to_vec());
819 user_meta_data.insert("vecKey".to_string(), vec![1, 2, 3]);
820
821 for (k, v) in user_meta_data.iter() {
822 writer.add_user_metadata(k.to_string(), v)?;
823 }
824
825 let mut record = Record::new(&schema).unwrap();
826 record.put("a", 27i64);
827 record.put("b", "foo");
828
829 writer.append_value(record.clone())?;
830 writer.append_value(record.clone())?;
831 writer.flush()?;
832 let result = writer.into_inner()?;
833
834 let reader = Reader::new(&result[..])?;
835 assert_eq!(reader.user_metadata(), &user_meta_data);
836
837 Ok(())
838 }
839
840 #[derive(Deserialize, Clone, PartialEq, Debug)]
841 struct TestSingleObjectReader {
842 a: i64,
843 b: f64,
844 c: Vec<String>,
845 }
846
847 impl AvroSchema for TestSingleObjectReader {
848 fn get_schema() -> Schema {
849 let schema = r#"
850 {
851 "type":"record",
852 "name":"TestSingleObjectWrtierSerialize",
853 "fields":[
854 {
855 "name":"a",
856 "type":"long"
857 },
858 {
859 "name":"b",
860 "type":"double"
861 },
862 {
863 "name":"c",
864 "type":{
865 "type":"array",
866 "items":"string"
867 }
868 }
869 ]
870 }
871 "#;
872 Schema::parse_str(schema).unwrap()
873 }
874 }
875
876 impl From<Value> for TestSingleObjectReader {
877 fn from(obj: Value) -> TestSingleObjectReader {
878 if let Value::Record(fields) = obj {
879 let mut a = None;
880 let mut b = None;
881 let mut c = vec![];
882 for (field_name, v) in fields {
883 match (field_name.as_str(), v) {
884 ("a", Value::Long(i)) => a = Some(i),
885 ("b", Value::Double(d)) => b = Some(d),
886 ("c", Value::Array(v)) => {
887 for inner_val in v {
888 if let Value::String(s) = inner_val {
889 c.push(s);
890 }
891 }
892 }
893 (key, value) => panic!("Unexpected pair: {key:?} -> {value:?}"),
894 }
895 }
896 TestSingleObjectReader {
897 a: a.unwrap(),
898 b: b.unwrap(),
899 c,
900 }
901 } else {
902 panic!("Expected a Value::Record but was {obj:?}")
903 }
904 }
905 }
906
907 impl From<TestSingleObjectReader> for Value {
908 fn from(obj: TestSingleObjectReader) -> Value {
909 Value::Record(vec![
910 ("a".into(), obj.a.into()),
911 ("b".into(), obj.b.into()),
912 (
913 "c".into(),
914 Value::Array(obj.c.into_iter().map(|s| s.into()).collect()),
915 ),
916 ])
917 }
918 }
919
920 #[test]
921 fn test_avro_3507_single_object_reader() -> TestResult {
922 let obj = TestSingleObjectReader {
923 a: 42,
924 b: 3.33,
925 c: vec!["cat".into(), "dog".into()],
926 };
927 let mut to_read = Vec::<u8>::new();
928 to_read.extend_from_slice(&[0xC3, 0x01]);
929 to_read.extend_from_slice(
930 &TestSingleObjectReader::get_schema()
931 .fingerprint::<Rabin>()
932 .bytes[..],
933 );
934 encode(
935 &obj.clone().into(),
936 &TestSingleObjectReader::get_schema(),
937 &mut to_read,
938 )
939 .expect("Encode should succeed");
940 let mut to_read = &to_read[..];
941 let generic_reader = GenericSingleObjectReader::new(TestSingleObjectReader::get_schema())
942 .expect("Schema should resolve");
943 let val = generic_reader
944 .read_value(&mut to_read)
945 .expect("Should read");
946 let expected_value: Value = obj.into();
947 assert_eq!(expected_value, val);
948
949 Ok(())
950 }
951
952 #[test]
953 fn avro_3642_test_single_object_reader_incomplete_reads() -> TestResult {
954 let obj = TestSingleObjectReader {
955 a: 42,
956 b: 3.33,
957 c: vec!["cat".into(), "dog".into()],
958 };
959 let to_read_1 = [0xC3, 0x01];
961 let mut to_read_2 = Vec::<u8>::new();
962 to_read_2.extend_from_slice(
963 &TestSingleObjectReader::get_schema()
964 .fingerprint::<Rabin>()
965 .bytes[..],
966 );
967 let mut to_read_3 = Vec::<u8>::new();
968 encode(
969 &obj.clone().into(),
970 &TestSingleObjectReader::get_schema(),
971 &mut to_read_3,
972 )
973 .expect("Encode should succeed");
974 let mut to_read = (&to_read_1[..]).chain(&to_read_2[..]).chain(&to_read_3[..]);
975 let generic_reader = GenericSingleObjectReader::new(TestSingleObjectReader::get_schema())
976 .expect("Schema should resolve");
977 let val = generic_reader
978 .read_value(&mut to_read)
979 .expect("Should read");
980 let expected_value: Value = obj.into();
981 assert_eq!(expected_value, val);
982
983 Ok(())
984 }
985
986 #[test]
987 fn test_avro_3507_reader_parity() -> TestResult {
988 let obj = TestSingleObjectReader {
989 a: 42,
990 b: 3.33,
991 c: vec!["cat".into(), "dog".into()],
992 };
993
994 let mut to_read = Vec::<u8>::new();
995 to_read.extend_from_slice(&[0xC3, 0x01]);
996 to_read.extend_from_slice(
997 &TestSingleObjectReader::get_schema()
998 .fingerprint::<Rabin>()
999 .bytes[..],
1000 );
1001 encode(
1002 &obj.clone().into(),
1003 &TestSingleObjectReader::get_schema(),
1004 &mut to_read,
1005 )
1006 .expect("Encode should succeed");
1007 let generic_reader = GenericSingleObjectReader::new(TestSingleObjectReader::get_schema())
1008 .expect("Schema should resolve");
1009 let specific_reader = SpecificSingleObjectReader::<TestSingleObjectReader>::new()
1010 .expect("schema should resolve");
1011 let mut to_read1 = &to_read[..];
1012 let mut to_read2 = &to_read[..];
1013 let mut to_read3 = &to_read[..];
1014
1015 let val = generic_reader
1016 .read_value(&mut to_read1)
1017 .expect("Should read");
1018 let read_obj1 = specific_reader
1019 .read_from_value(&mut to_read2)
1020 .expect("Should read from value");
1021 let read_obj2 = specific_reader
1022 .read(&mut to_read3)
1023 .expect("Should read from deserilize");
1024 let expected_value: Value = obj.clone().into();
1025 assert_eq!(obj, read_obj1);
1026 assert_eq!(obj, read_obj2);
1027 assert_eq!(val, expected_value);
1028
1029 Ok(())
1030 }
1031
1032 #[test]
1033 fn avro_rs_164_generic_reader_alternate_header() -> TestResult {
1034 let schema_uuid = Uuid::parse_str("b2f1cf00-0434-013e-439a-125eb8485a5f")?;
1035 let header_builder = GlueSchemaUuidHeader::from_uuid(schema_uuid);
1036 let generic_reader = GenericSingleObjectReader::new_with_header_builder(
1037 TestSingleObjectReader::get_schema(),
1038 header_builder,
1039 )
1040 .expect("failed to build reader");
1041 let data_to_read: Vec<u8> = vec![
1042 3, 0, 178, 241, 207, 0, 4, 52, 1, 62, 67, 154, 18, 94, 184, 72, 90, 95,
1043 ];
1044 let mut to_read = &data_to_read[..];
1045 let read_result = generic_reader
1046 .read_value(&mut to_read)
1047 .map_err(Error::into_details);
1048 matches!(read_result, Err(Details::ReadBytes(_)));
1049 Ok(())
1050 }
1051
1052 #[cfg(not(feature = "snappy"))]
1053 #[test]
1054 fn test_avro_3549_read_not_enabled_codec() {
1055 let snappy_compressed_avro = vec![
1056 79, 98, 106, 1, 4, 22, 97, 118, 114, 111, 46, 115, 99, 104, 101, 109, 97, 210, 1, 123,
1057 34, 102, 105, 101, 108, 100, 115, 34, 58, 91, 123, 34, 110, 97, 109, 101, 34, 58, 34,
1058 110, 117, 109, 34, 44, 34, 116, 121, 112, 101, 34, 58, 34, 115, 116, 114, 105, 110,
1059 103, 34, 125, 93, 44, 34, 110, 97, 109, 101, 34, 58, 34, 101, 118, 101, 110, 116, 34,
1060 44, 34, 110, 97, 109, 101, 115, 112, 97, 99, 101, 34, 58, 34, 101, 120, 97, 109, 112,
1061 108, 101, 110, 97, 109, 101, 115, 112, 97, 99, 101, 34, 44, 34, 116, 121, 112, 101, 34,
1062 58, 34, 114, 101, 99, 111, 114, 100, 34, 125, 20, 97, 118, 114, 111, 46, 99, 111, 100,
1063 101, 99, 12, 115, 110, 97, 112, 112, 121, 0, 213, 209, 241, 208, 200, 110, 164, 47,
1064 203, 25, 90, 235, 161, 167, 195, 177, 2, 20, 4, 12, 6, 49, 50, 51, 115, 38, 58, 0, 213,
1065 209, 241, 208, 200, 110, 164, 47, 203, 25, 90, 235, 161, 167, 195, 177,
1066 ];
1067
1068 if let Err(err) = Reader::new(snappy_compressed_avro.as_slice()) {
1069 assert_eq!("Codec 'snappy' is not supported/enabled", err.to_string());
1070 } else {
1071 panic!("Expected an error in the reading of the codec!");
1072 }
1073 }
1074}