1use crate::{
20 AvroResult, Codec, Error,
21 decode::{decode, decode_internal},
22 error::Details,
23 from_value,
24 headers::{HeaderBuilder, RabinFingerprintHeader},
25 schema::{
26 AvroSchema, Names, ResolvedOwnedSchema, ResolvedSchema, Schema, resolve_names,
27 resolve_names_with_schemata,
28 },
29 types::Value,
30 util,
31};
32use log::warn;
33use serde::de::DeserializeOwned;
34use serde_json::from_slice;
35use std::{
36 collections::HashMap,
37 io::{ErrorKind, Read},
38 marker::PhantomData,
39 str::FromStr,
40};
41
42#[derive(Debug, Clone)]
44struct Block<'r, R> {
45 reader: R,
46 buf: Vec<u8>,
48 buf_idx: usize,
49 message_count: usize,
51 marker: [u8; 16],
52 codec: Codec,
53 writer_schema: Schema,
54 schemata: Vec<&'r Schema>,
55 user_metadata: HashMap<String, Vec<u8>>,
56 names_refs: Names,
57}
58
59impl<'r, R: Read> Block<'r, R> {
60 fn new(reader: R, schemata: Vec<&'r Schema>) -> AvroResult<Block<'r, R>> {
61 let mut block = Block {
62 reader,
63 codec: Codec::Null,
64 writer_schema: Schema::Null,
65 schemata,
66 buf: vec![],
67 buf_idx: 0,
68 message_count: 0,
69 marker: [0; 16],
70 user_metadata: Default::default(),
71 names_refs: Default::default(),
72 };
73
74 block.read_header()?;
75 Ok(block)
76 }
77
78 fn read_header(&mut self) -> AvroResult<()> {
81 let mut buf = [0u8; 4];
82 self.reader
83 .read_exact(&mut buf)
84 .map_err(Details::ReadHeader)?;
85
86 if buf != [b'O', b'b', b'j', 1u8] {
87 return Err(Details::HeaderMagic.into());
88 }
89
90 let meta_schema = Schema::map(Schema::Bytes);
91 match decode(&meta_schema, &mut self.reader)? {
92 Value::Map(metadata) => {
93 self.read_writer_schema(&metadata)?;
94 self.codec = read_codec(&metadata)?;
95
96 for (key, value) in metadata {
97 if key == "avro.schema"
98 || key == "avro.codec"
99 || key == "avro.codec.compression_level"
100 {
101 } else if key.starts_with("avro.") {
103 warn!("Ignoring unknown metadata key: {key}");
104 } else {
105 self.read_user_metadata(key, value);
106 }
107 }
108 }
109 _ => {
110 return Err(Details::GetHeaderMetadata.into());
111 }
112 }
113
114 self.reader
115 .read_exact(&mut self.marker)
116 .map_err(|e| Details::ReadMarker(e).into())
117 }
118
119 fn fill_buf(&mut self, n: usize) -> AvroResult<()> {
120 self.buf.resize(util::safe_len(n)?, 0);
132 self.reader
133 .read_exact(&mut self.buf)
134 .map_err(Details::ReadIntoBuf)?;
135 self.buf_idx = 0;
136 Ok(())
137 }
138
139 fn read_block_next(&mut self) -> AvroResult<()> {
142 assert!(self.is_empty(), "Expected self to be empty!");
143 match util::read_long(&mut self.reader).map_err(Error::into_details) {
144 Ok(block_len) => {
145 self.message_count = block_len as usize;
146 let block_bytes = util::read_long(&mut self.reader)?;
147 self.fill_buf(block_bytes as usize)?;
148 let mut marker = [0u8; 16];
149 self.reader
150 .read_exact(&mut marker)
151 .map_err(Details::ReadBlockMarker)?;
152
153 if marker != self.marker {
154 return Err(Details::GetBlockMarker.into());
155 }
156
157 self.codec.decompress(&mut self.buf)
164 }
165 Err(Details::ReadVariableIntegerBytes(io_err)) => {
166 if let ErrorKind::UnexpectedEof = io_err.kind() {
167 Ok(())
169 } else {
170 Err(Details::ReadVariableIntegerBytes(io_err).into())
171 }
172 }
173 Err(e) => Err(Error::new(e)),
174 }
175 }
176
177 fn len(&self) -> usize {
178 self.message_count
179 }
180
181 fn is_empty(&self) -> bool {
182 self.len() == 0
183 }
184
185 fn read_next(&mut self, read_schema: Option<&Schema>) -> AvroResult<Option<Value>> {
186 if self.is_empty() {
187 self.read_block_next()?;
188 if self.is_empty() {
189 return Ok(None);
190 }
191 }
192
193 let mut block_bytes = &self.buf[self.buf_idx..];
194 let b_original = block_bytes.len();
195
196 let item = decode_internal(
197 &self.writer_schema,
198 &self.names_refs,
199 &None,
200 &mut block_bytes,
201 )?;
202 let item = match read_schema {
203 Some(schema) => item.resolve(schema)?,
204 None => item,
205 };
206
207 if b_original != 0 && b_original == block_bytes.len() {
208 return Err(Details::ReadBlock.into());
210 }
211 self.buf_idx += b_original - block_bytes.len();
212 self.message_count -= 1;
213 Ok(Some(item))
214 }
215
216 fn read_writer_schema(&mut self, metadata: &HashMap<String, Value>) -> AvroResult<()> {
217 let json: serde_json::Value = metadata
218 .get("avro.schema")
219 .and_then(|bytes| {
220 if let Value::Bytes(ref bytes) = *bytes {
221 from_slice(bytes.as_ref()).ok()
222 } else {
223 None
224 }
225 })
226 .ok_or(Details::GetAvroSchemaFromMap)?;
227 if !self.schemata.is_empty() {
228 let rs = ResolvedSchema::try_from(self.schemata.clone())?;
229 let names: Names = rs
230 .get_names()
231 .iter()
232 .map(|(name, schema)| (name.clone(), (*schema).clone()))
233 .collect();
234 self.writer_schema = Schema::parse_with_names(&json, names)?;
235 resolve_names_with_schemata(
236 self.schemata.iter().copied(),
237 &mut self.names_refs,
238 &None,
239 )?;
240 } else {
241 self.writer_schema = Schema::parse(&json)?;
242 resolve_names(&self.writer_schema, &mut self.names_refs, &None)?;
243 }
244 Ok(())
245 }
246
247 fn read_user_metadata(&mut self, key: String, value: Value) {
248 match value {
249 Value::Bytes(ref vec) => {
250 self.user_metadata.insert(key, vec.clone());
251 }
252 wrong => {
253 warn!("User metadata values must be Value::Bytes, found {wrong:?}");
254 }
255 }
256 }
257}
258
259fn read_codec(metadata: &HashMap<String, Value>) -> AvroResult<Codec> {
260 let result = metadata
261 .get("avro.codec")
262 .map(|codec| {
263 if let Value::Bytes(ref bytes) = *codec {
264 match std::str::from_utf8(bytes.as_ref()) {
265 Ok(utf8) => Ok(utf8),
266 Err(utf8_error) => Err(Details::ConvertToUtf8Error(utf8_error).into()),
267 }
268 } else {
269 Err(Details::BadCodecMetadata.into())
270 }
271 })
272 .map(|codec_res| match codec_res {
273 Ok(codec) => match Codec::from_str(codec) {
274 Ok(codec) => match codec {
275 #[cfg(feature = "bzip")]
276 Codec::Bzip2(_) => {
277 use crate::Bzip2Settings;
278 if let Some(Value::Bytes(bytes)) =
279 metadata.get("avro.codec.compression_level")
280 {
281 Ok(Codec::Bzip2(Bzip2Settings::new(bytes[0])))
282 } else {
283 Ok(codec)
284 }
285 }
286 #[cfg(feature = "xz")]
287 Codec::Xz(_) => {
288 use crate::XzSettings;
289 if let Some(Value::Bytes(bytes)) =
290 metadata.get("avro.codec.compression_level")
291 {
292 Ok(Codec::Xz(XzSettings::new(bytes[0])))
293 } else {
294 Ok(codec)
295 }
296 }
297 #[cfg(feature = "zstandard")]
298 Codec::Zstandard(_) => {
299 use crate::ZstandardSettings;
300 if let Some(Value::Bytes(bytes)) =
301 metadata.get("avro.codec.compression_level")
302 {
303 Ok(Codec::Zstandard(ZstandardSettings::new(bytes[0])))
304 } else {
305 Ok(codec)
306 }
307 }
308 _ => Ok(codec),
309 },
310 Err(_) => Err(Details::CodecNotSupported(codec.to_owned()).into()),
311 },
312 Err(err) => Err(err),
313 });
314
315 result.unwrap_or(Ok(Codec::Null))
316}
317
318pub struct Reader<'a, R> {
334 block: Block<'a, R>,
335 reader_schema: Option<&'a Schema>,
336 errored: bool,
337 should_resolve_schema: bool,
338}
339
340impl<'a, R: Read> Reader<'a, R> {
341 pub fn new(reader: R) -> AvroResult<Reader<'a, R>> {
346 let block = Block::new(reader, vec![])?;
347 let reader = Reader {
348 block,
349 reader_schema: None,
350 errored: false,
351 should_resolve_schema: false,
352 };
353 Ok(reader)
354 }
355
356 pub fn with_schema(schema: &'a Schema, reader: R) -> AvroResult<Reader<'a, R>> {
361 let block = Block::new(reader, vec![schema])?;
362 let mut reader = Reader {
363 block,
364 reader_schema: Some(schema),
365 errored: false,
366 should_resolve_schema: false,
367 };
368 reader.should_resolve_schema = reader.writer_schema() != schema;
370 Ok(reader)
371 }
372
373 pub fn with_schemata(
378 schema: &'a Schema,
379 schemata: Vec<&'a Schema>,
380 reader: R,
381 ) -> AvroResult<Reader<'a, R>> {
382 let block = Block::new(reader, schemata)?;
383 let mut reader = Reader {
384 block,
385 reader_schema: Some(schema),
386 errored: false,
387 should_resolve_schema: false,
388 };
389 reader.should_resolve_schema = reader.writer_schema() != schema;
391 Ok(reader)
392 }
393
394 #[inline]
396 pub fn writer_schema(&self) -> &Schema {
397 &self.block.writer_schema
398 }
399
400 #[inline]
402 pub fn reader_schema(&self) -> Option<&Schema> {
403 self.reader_schema
404 }
405
406 #[inline]
408 pub fn user_metadata(&self) -> &HashMap<String, Vec<u8>> {
409 &self.block.user_metadata
410 }
411
412 #[inline]
413 fn read_next(&mut self) -> AvroResult<Option<Value>> {
414 let read_schema = if self.should_resolve_schema {
415 self.reader_schema
416 } else {
417 None
418 };
419
420 self.block.read_next(read_schema)
421 }
422}
423
424impl<R: Read> Iterator for Reader<'_, R> {
425 type Item = AvroResult<Value>;
426
427 fn next(&mut self) -> Option<Self::Item> {
428 if self.errored {
430 return None;
431 };
432 match self.read_next() {
433 Ok(opt) => opt.map(Ok),
434 Err(e) => {
435 self.errored = true;
436 Some(Err(e))
437 }
438 }
439 }
440}
441
442pub fn from_avro_datum<R: Read>(
451 writer_schema: &Schema,
452 reader: &mut R,
453 reader_schema: Option<&Schema>,
454) -> AvroResult<Value> {
455 let value = decode(writer_schema, reader)?;
456 match reader_schema {
457 Some(schema) => value.resolve(schema),
458 None => Ok(value),
459 }
460}
461
462pub fn from_avro_datum_schemata<R: Read>(
469 writer_schema: &Schema,
470 writer_schemata: Vec<&Schema>,
471 reader: &mut R,
472 reader_schema: Option<&Schema>,
473) -> AvroResult<Value> {
474 from_avro_datum_reader_schemata(
475 writer_schema,
476 writer_schemata,
477 reader,
478 reader_schema,
479 Vec::with_capacity(0),
480 )
481}
482
483pub fn from_avro_datum_reader_schemata<R: Read>(
490 writer_schema: &Schema,
491 writer_schemata: Vec<&Schema>,
492 reader: &mut R,
493 reader_schema: Option<&Schema>,
494 reader_schemata: Vec<&Schema>,
495) -> AvroResult<Value> {
496 let rs = ResolvedSchema::try_from(writer_schemata)?;
497 let value = decode_internal(writer_schema, rs.get_names(), &None, reader)?;
498 match reader_schema {
499 Some(schema) => {
500 if reader_schemata.is_empty() {
501 value.resolve(schema)
502 } else {
503 value.resolve_schemata(schema, reader_schemata)
504 }
505 }
506 None => Ok(value),
507 }
508}
509
510pub struct GenericSingleObjectReader {
511 write_schema: ResolvedOwnedSchema,
512 expected_header: Vec<u8>,
513}
514
515impl GenericSingleObjectReader {
516 pub fn new(schema: Schema) -> AvroResult<GenericSingleObjectReader> {
517 let header_builder = RabinFingerprintHeader::from_schema(&schema);
518 Self::new_with_header_builder(schema, header_builder)
519 }
520
521 pub fn new_with_header_builder<HB: HeaderBuilder>(
522 schema: Schema,
523 header_builder: HB,
524 ) -> AvroResult<GenericSingleObjectReader> {
525 let expected_header = header_builder.build_header();
526 Ok(GenericSingleObjectReader {
527 write_schema: ResolvedOwnedSchema::try_from(schema)?,
528 expected_header,
529 })
530 }
531
532 pub fn read_value<R: Read>(&self, reader: &mut R) -> AvroResult<Value> {
533 let mut header = vec![0; self.expected_header.len()];
534 match reader.read_exact(&mut header) {
535 Ok(_) => {
536 if self.expected_header == header {
537 decode_internal(
538 self.write_schema.get_root_schema(),
539 self.write_schema.get_names(),
540 &None,
541 reader,
542 )
543 } else {
544 Err(
545 Details::SingleObjectHeaderMismatch(self.expected_header.clone(), header)
546 .into(),
547 )
548 }
549 }
550 Err(io_error) => Err(Details::ReadHeader(io_error).into()),
551 }
552 }
553}
554
555pub struct SpecificSingleObjectReader<T>
556where
557 T: AvroSchema,
558{
559 inner: GenericSingleObjectReader,
560 _model: PhantomData<T>,
561}
562
563impl<T> SpecificSingleObjectReader<T>
564where
565 T: AvroSchema,
566{
567 pub fn new() -> AvroResult<SpecificSingleObjectReader<T>> {
568 Ok(SpecificSingleObjectReader {
569 inner: GenericSingleObjectReader::new(T::get_schema())?,
570 _model: PhantomData,
571 })
572 }
573}
574
575impl<T> SpecificSingleObjectReader<T>
576where
577 T: AvroSchema + From<Value>,
578{
579 pub fn read_from_value<R: Read>(&self, reader: &mut R) -> AvroResult<T> {
580 self.inner.read_value(reader).map(|v| v.into())
581 }
582}
583
584impl<T> SpecificSingleObjectReader<T>
585where
586 T: AvroSchema + DeserializeOwned,
587{
588 pub fn read<R: Read>(&self, reader: &mut R) -> AvroResult<T> {
589 from_value::<T>(&self.inner.read_value(reader)?)
590 }
591}
592
593pub fn read_marker(bytes: &[u8]) -> [u8; 16] {
595 assert!(
596 bytes.len() > 16,
597 "The bytes are too short to read a marker from them"
598 );
599 let mut marker = [0_u8; 16];
600 marker.clone_from_slice(&bytes[(bytes.len() - 16)..]);
601 marker
602}
603
604#[cfg(test)]
605mod tests {
606 use super::*;
607 use crate::{encode::encode, headers::GlueSchemaUuidHeader, rabin::Rabin, types::Record};
608 use apache_avro_test_helper::TestResult;
609 use pretty_assertions::assert_eq;
610 use serde::Deserialize;
611 use std::io::Cursor;
612 use uuid::Uuid;
613
614 const SCHEMA: &str = r#"
615 {
616 "type": "record",
617 "name": "test",
618 "fields": [
619 {
620 "name": "a",
621 "type": "long",
622 "default": 42
623 },
624 {
625 "name": "b",
626 "type": "string"
627 }
628 ]
629 }
630 "#;
631 const UNION_SCHEMA: &str = r#"["null", "long"]"#;
632 const ENCODED: &[u8] = &[
633 79u8, 98u8, 106u8, 1u8, 4u8, 22u8, 97u8, 118u8, 114u8, 111u8, 46u8, 115u8, 99u8, 104u8,
634 101u8, 109u8, 97u8, 222u8, 1u8, 123u8, 34u8, 116u8, 121u8, 112u8, 101u8, 34u8, 58u8, 34u8,
635 114u8, 101u8, 99u8, 111u8, 114u8, 100u8, 34u8, 44u8, 34u8, 110u8, 97u8, 109u8, 101u8, 34u8,
636 58u8, 34u8, 116u8, 101u8, 115u8, 116u8, 34u8, 44u8, 34u8, 102u8, 105u8, 101u8, 108u8,
637 100u8, 115u8, 34u8, 58u8, 91u8, 123u8, 34u8, 110u8, 97u8, 109u8, 101u8, 34u8, 58u8, 34u8,
638 97u8, 34u8, 44u8, 34u8, 116u8, 121u8, 112u8, 101u8, 34u8, 58u8, 34u8, 108u8, 111u8, 110u8,
639 103u8, 34u8, 44u8, 34u8, 100u8, 101u8, 102u8, 97u8, 117u8, 108u8, 116u8, 34u8, 58u8, 52u8,
640 50u8, 125u8, 44u8, 123u8, 34u8, 110u8, 97u8, 109u8, 101u8, 34u8, 58u8, 34u8, 98u8, 34u8,
641 44u8, 34u8, 116u8, 121u8, 112u8, 101u8, 34u8, 58u8, 34u8, 115u8, 116u8, 114u8, 105u8,
642 110u8, 103u8, 34u8, 125u8, 93u8, 125u8, 20u8, 97u8, 118u8, 114u8, 111u8, 46u8, 99u8, 111u8,
643 100u8, 101u8, 99u8, 8u8, 110u8, 117u8, 108u8, 108u8, 0u8, 94u8, 61u8, 54u8, 221u8, 190u8,
644 207u8, 108u8, 180u8, 158u8, 57u8, 114u8, 40u8, 173u8, 199u8, 228u8, 239u8, 4u8, 20u8, 54u8,
645 6u8, 102u8, 111u8, 111u8, 84u8, 6u8, 98u8, 97u8, 114u8, 94u8, 61u8, 54u8, 221u8, 190u8,
646 207u8, 108u8, 180u8, 158u8, 57u8, 114u8, 40u8, 173u8, 199u8, 228u8, 239u8,
647 ];
648
649 #[test]
650 fn test_from_avro_datum() -> TestResult {
651 let schema = Schema::parse_str(SCHEMA)?;
652 let mut encoded: &'static [u8] = &[54, 6, 102, 111, 111];
653
654 let mut record = Record::new(&schema).unwrap();
655 record.put("a", 27i64);
656 record.put("b", "foo");
657 let expected = record.into();
658
659 assert_eq!(from_avro_datum(&schema, &mut encoded, None)?, expected);
660
661 Ok(())
662 }
663
664 #[test]
665 fn test_from_avro_datum_with_union_to_struct() -> TestResult {
666 const TEST_RECORD_SCHEMA_3240: &str = r#"
667 {
668 "type": "record",
669 "name": "test",
670 "fields": [
671 {
672 "name": "a",
673 "type": "long",
674 "default": 42
675 },
676 {
677 "name": "b",
678 "type": "string"
679 },
680 {
681 "name": "a_nullable_array",
682 "type": ["null", {"type": "array", "items": {"type": "string"}}],
683 "default": null
684 },
685 {
686 "name": "a_nullable_boolean",
687 "type": ["null", {"type": "boolean"}],
688 "default": null
689 },
690 {
691 "name": "a_nullable_string",
692 "type": ["null", {"type": "string"}],
693 "default": null
694 }
695 ]
696 }
697 "#;
698 #[derive(Default, Debug, Deserialize, PartialEq, Eq)]
699 struct TestRecord3240 {
700 a: i64,
701 b: String,
702 a_nullable_array: Option<Vec<String>>,
703 a_nullable_string: Option<String>,
706 }
707
708 let schema = Schema::parse_str(TEST_RECORD_SCHEMA_3240)?;
709 let mut encoded: &'static [u8] = &[54, 6, 102, 111, 111];
710
711 let expected_record: TestRecord3240 = TestRecord3240 {
712 a: 27i64,
713 b: String::from("foo"),
714 a_nullable_array: None,
715 a_nullable_string: None,
716 };
717
718 let avro_datum = from_avro_datum(&schema, &mut encoded, None)?;
719 let parsed_record: TestRecord3240 = match &avro_datum {
720 Value::Record(_) => from_value::<TestRecord3240>(&avro_datum)?,
721 unexpected => {
722 panic!("could not map avro data to struct, found unexpected: {unexpected:?}")
723 }
724 };
725
726 assert_eq!(parsed_record, expected_record);
727
728 Ok(())
729 }
730
731 #[test]
732 fn test_null_union() -> TestResult {
733 let schema = Schema::parse_str(UNION_SCHEMA)?;
734 let mut encoded: &'static [u8] = &[2, 0];
735
736 assert_eq!(
737 from_avro_datum(&schema, &mut encoded, None)?,
738 Value::Union(1, Box::new(Value::Long(0)))
739 );
740
741 Ok(())
742 }
743
744 #[test]
745 fn test_reader_iterator() -> TestResult {
746 let schema = Schema::parse_str(SCHEMA)?;
747 let reader = Reader::with_schema(&schema, ENCODED)?;
748
749 let mut record1 = Record::new(&schema).unwrap();
750 record1.put("a", 27i64);
751 record1.put("b", "foo");
752
753 let mut record2 = Record::new(&schema).unwrap();
754 record2.put("a", 42i64);
755 record2.put("b", "bar");
756
757 let expected = [record1.into(), record2.into()];
758
759 for (i, value) in reader.enumerate() {
760 assert_eq!(value?, expected[i]);
761 }
762
763 Ok(())
764 }
765
766 #[test]
767 fn test_reader_invalid_header() -> TestResult {
768 let schema = Schema::parse_str(SCHEMA)?;
769 let mut invalid = &ENCODED[1..];
770 assert!(Reader::with_schema(&schema, &mut invalid).is_err());
771
772 Ok(())
773 }
774
775 #[test]
776 fn test_reader_invalid_block() -> TestResult {
777 let schema = Schema::parse_str(SCHEMA)?;
778 let mut invalid = &ENCODED[0..ENCODED.len() - 19];
779 let reader = Reader::with_schema(&schema, &mut invalid)?;
780 for value in reader {
781 assert!(value.is_err());
782 }
783
784 Ok(())
785 }
786
787 #[test]
788 fn test_reader_empty_buffer() -> TestResult {
789 let empty = Cursor::new(Vec::new());
790 assert!(Reader::new(empty).is_err());
791
792 Ok(())
793 }
794
795 #[test]
796 fn test_reader_only_header() -> TestResult {
797 let mut invalid = &ENCODED[..165];
798 let reader = Reader::new(&mut invalid)?;
799 for value in reader {
800 assert!(value.is_err());
801 }
802
803 Ok(())
804 }
805
806 #[test]
807 fn test_avro_3405_read_user_metadata_success() -> TestResult {
808 use crate::writer::Writer;
809
810 let schema = Schema::parse_str(SCHEMA)?;
811 let mut writer = Writer::new(&schema, Vec::new())?;
812
813 let mut user_meta_data: HashMap<String, Vec<u8>> = HashMap::new();
814 user_meta_data.insert(
815 "stringKey".to_string(),
816 "stringValue".to_string().into_bytes(),
817 );
818 user_meta_data.insert("bytesKey".to_string(), b"bytesValue".to_vec());
819 user_meta_data.insert("vecKey".to_string(), vec![1, 2, 3]);
820
821 for (k, v) in user_meta_data.iter() {
822 writer.add_user_metadata(k.to_string(), v)?;
823 }
824
825 let mut record = Record::new(&schema).unwrap();
826 record.put("a", 27i64);
827 record.put("b", "foo");
828
829 writer.append(record.clone())?;
830 writer.append(record.clone())?;
831 writer.flush()?;
832 let result = writer.into_inner()?;
833
834 let reader = Reader::new(&result[..])?;
835 assert_eq!(reader.user_metadata(), &user_meta_data);
836
837 Ok(())
838 }
839
840 #[derive(Deserialize, Clone, PartialEq, Debug)]
841 struct TestSingleObjectReader {
842 a: i64,
843 b: f64,
844 c: Vec<String>,
845 }
846
847 impl AvroSchema for TestSingleObjectReader {
848 fn get_schema() -> Schema {
849 let schema = r#"
850 {
851 "type":"record",
852 "name":"TestSingleObjectWrtierSerialize",
853 "fields":[
854 {
855 "name":"a",
856 "type":"long"
857 },
858 {
859 "name":"b",
860 "type":"double"
861 },
862 {
863 "name":"c",
864 "type":{
865 "type":"array",
866 "items":"string"
867 }
868 }
869 ]
870 }
871 "#;
872 Schema::parse_str(schema).unwrap()
873 }
874 }
875
876 impl From<Value> for TestSingleObjectReader {
877 fn from(obj: Value) -> TestSingleObjectReader {
878 if let Value::Record(fields) = obj {
879 let mut a = None;
880 let mut b = None;
881 let mut c = vec![];
882 for (field_name, v) in fields {
883 match (field_name.as_str(), v) {
884 ("a", Value::Long(i)) => a = Some(i),
885 ("b", Value::Double(d)) => b = Some(d),
886 ("c", Value::Array(v)) => {
887 for inner_val in v {
888 if let Value::String(s) = inner_val {
889 c.push(s);
890 }
891 }
892 }
893 (key, value) => panic!("Unexpected pair: {key:?} -> {value:?}"),
894 }
895 }
896 TestSingleObjectReader {
897 a: a.unwrap(),
898 b: b.unwrap(),
899 c,
900 }
901 } else {
902 panic!("Expected a Value::Record but was {obj:?}")
903 }
904 }
905 }
906
907 impl From<TestSingleObjectReader> for Value {
908 fn from(obj: TestSingleObjectReader) -> Value {
909 Value::Record(vec![
910 ("a".into(), obj.a.into()),
911 ("b".into(), obj.b.into()),
912 (
913 "c".into(),
914 Value::Array(obj.c.into_iter().map(|s| s.into()).collect()),
915 ),
916 ])
917 }
918 }
919
920 #[test]
921 fn test_avro_3507_single_object_reader() -> TestResult {
922 let obj = TestSingleObjectReader {
923 a: 42,
924 b: 3.33,
925 c: vec!["cat".into(), "dog".into()],
926 };
927 let mut to_read = Vec::<u8>::new();
928 to_read.extend_from_slice(&[0xC3, 0x01]);
929 to_read.extend_from_slice(
930 &TestSingleObjectReader::get_schema()
931 .fingerprint::<Rabin>()
932 .bytes[..],
933 );
934 encode(
935 &obj.clone().into(),
936 &TestSingleObjectReader::get_schema(),
937 &mut to_read,
938 )
939 .expect("Encode should succeed");
940 let mut to_read = &to_read[..];
941 let generic_reader = GenericSingleObjectReader::new(TestSingleObjectReader::get_schema())
942 .expect("Schema should resolve");
943 let val = generic_reader
944 .read_value(&mut to_read)
945 .expect("Should read");
946 let expected_value: Value = obj.into();
947 assert_eq!(expected_value, val);
948
949 Ok(())
950 }
951
952 #[test]
953 fn avro_3642_test_single_object_reader_incomplete_reads() -> TestResult {
954 let obj = TestSingleObjectReader {
955 a: 42,
956 b: 3.33,
957 c: vec!["cat".into(), "dog".into()],
958 };
959 let to_read_1 = [0xC3, 0x01];
961 let mut to_read_2 = Vec::<u8>::new();
962 to_read_2.extend_from_slice(
963 &TestSingleObjectReader::get_schema()
964 .fingerprint::<Rabin>()
965 .bytes[..],
966 );
967 let mut to_read_3 = Vec::<u8>::new();
968 encode(
969 &obj.clone().into(),
970 &TestSingleObjectReader::get_schema(),
971 &mut to_read_3,
972 )
973 .expect("Encode should succeed");
974 let mut to_read = (&to_read_1[..]).chain(&to_read_2[..]).chain(&to_read_3[..]);
975 let generic_reader = GenericSingleObjectReader::new(TestSingleObjectReader::get_schema())
976 .expect("Schema should resolve");
977 let val = generic_reader
978 .read_value(&mut to_read)
979 .expect("Should read");
980 let expected_value: Value = obj.into();
981 assert_eq!(expected_value, val);
982
983 Ok(())
984 }
985
986 #[test]
987 fn test_avro_3507_reader_parity() -> TestResult {
988 let obj = TestSingleObjectReader {
989 a: 42,
990 b: 3.33,
991 c: vec!["cat".into(), "dog".into()],
992 };
993
994 let mut to_read = Vec::<u8>::new();
995 to_read.extend_from_slice(&[0xC3, 0x01]);
996 to_read.extend_from_slice(
997 &TestSingleObjectReader::get_schema()
998 .fingerprint::<Rabin>()
999 .bytes[..],
1000 );
1001 encode(
1002 &obj.clone().into(),
1003 &TestSingleObjectReader::get_schema(),
1004 &mut to_read,
1005 )
1006 .expect("Encode should succeed");
1007 let generic_reader = GenericSingleObjectReader::new(TestSingleObjectReader::get_schema())
1008 .expect("Schema should resolve");
1009 let specific_reader = SpecificSingleObjectReader::<TestSingleObjectReader>::new()
1010 .expect("schema should resolve");
1011 let mut to_read1 = &to_read[..];
1012 let mut to_read2 = &to_read[..];
1013 let mut to_read3 = &to_read[..];
1014
1015 let val = generic_reader
1016 .read_value(&mut to_read1)
1017 .expect("Should read");
1018 let read_obj1 = specific_reader
1019 .read_from_value(&mut to_read2)
1020 .expect("Should read from value");
1021 let read_obj2 = specific_reader
1022 .read(&mut to_read3)
1023 .expect("Should read from deserilize");
1024 let expected_value: Value = obj.clone().into();
1025 assert_eq!(obj, read_obj1);
1026 assert_eq!(obj, read_obj2);
1027 assert_eq!(val, expected_value);
1028
1029 Ok(())
1030 }
1031
1032 #[test]
1033 fn avro_rs_164_generic_reader_alternate_header() -> TestResult {
1034 let schema_uuid = Uuid::parse_str("b2f1cf00-0434-013e-439a-125eb8485a5f")?;
1035 let header_builder = GlueSchemaUuidHeader::from_uuid(schema_uuid);
1036 let generic_reader = GenericSingleObjectReader::new_with_header_builder(
1037 TestSingleObjectReader::get_schema(),
1038 header_builder,
1039 )
1040 .expect("failed to build reader");
1041 let data_to_read: Vec<u8> = vec![
1042 3, 0, 178, 241, 207, 0, 4, 52, 1, 62, 67, 154, 18, 94, 184, 72, 90, 95,
1043 ];
1044 let mut to_read = &data_to_read[..];
1045 let read_result = generic_reader
1046 .read_value(&mut to_read)
1047 .map_err(Error::into_details);
1048 matches!(read_result, Err(Details::ReadBytes(_)));
1049 Ok(())
1050 }
1051
1052 #[cfg(not(feature = "snappy"))]
1053 #[test]
1054 fn test_avro_3549_read_not_enabled_codec() {
1055 let snappy_compressed_avro = vec![
1056 79, 98, 106, 1, 4, 22, 97, 118, 114, 111, 46, 115, 99, 104, 101, 109, 97, 210, 1, 123,
1057 34, 102, 105, 101, 108, 100, 115, 34, 58, 91, 123, 34, 110, 97, 109, 101, 34, 58, 34,
1058 110, 117, 109, 34, 44, 34, 116, 121, 112, 101, 34, 58, 34, 115, 116, 114, 105, 110,
1059 103, 34, 125, 93, 44, 34, 110, 97, 109, 101, 34, 58, 34, 101, 118, 101, 110, 116, 34,
1060 44, 34, 110, 97, 109, 101, 115, 112, 97, 99, 101, 34, 58, 34, 101, 120, 97, 109, 112,
1061 108, 101, 110, 97, 109, 101, 115, 112, 97, 99, 101, 34, 44, 34, 116, 121, 112, 101, 34,
1062 58, 34, 114, 101, 99, 111, 114, 100, 34, 125, 20, 97, 118, 114, 111, 46, 99, 111, 100,
1063 101, 99, 12, 115, 110, 97, 112, 112, 121, 0, 213, 209, 241, 208, 200, 110, 164, 47,
1064 203, 25, 90, 235, 161, 167, 195, 177, 2, 20, 4, 12, 6, 49, 50, 51, 115, 38, 58, 0, 213,
1065 209, 241, 208, 200, 110, 164, 47, 203, 25, 90, 235, 161, 167, 195, 177,
1066 ];
1067
1068 if let Err(err) = Reader::new(snappy_compressed_avro.as_slice()) {
1069 assert_eq!("Codec 'snappy' is not supported/enabled", err.to_string());
1070 } else {
1071 panic!("Expected an error in the reading of the codec!");
1072 }
1073 }
1074}