1use crate::{
20 decode::{decode, decode_internal},
21 from_value,
22 headers::{HeaderBuilder, RabinFingerprintHeader},
23 schema::{
24 resolve_names, resolve_names_with_schemata, AvroSchema, Names, ResolvedOwnedSchema,
25 ResolvedSchema, Schema,
26 },
27 types::Value,
28 util, AvroResult, Codec, Error,
29};
30use log::warn;
31use serde::de::DeserializeOwned;
32use serde_json::from_slice;
33use std::{
34 collections::HashMap,
35 io::{ErrorKind, Read},
36 marker::PhantomData,
37 str::FromStr,
38};
39
40#[derive(Debug, Clone)]
42struct Block<'r, R> {
43 reader: R,
44 buf: Vec<u8>,
46 buf_idx: usize,
47 message_count: usize,
49 marker: [u8; 16],
50 codec: Codec,
51 writer_schema: Schema,
52 schemata: Vec<&'r Schema>,
53 user_metadata: HashMap<String, Vec<u8>>,
54 names_refs: Names,
55}
56
57impl<'r, R: Read> Block<'r, R> {
58 fn new(reader: R, schemata: Vec<&'r Schema>) -> AvroResult<Block<'r, R>> {
59 let mut block = Block {
60 reader,
61 codec: Codec::Null,
62 writer_schema: Schema::Null,
63 schemata,
64 buf: vec![],
65 buf_idx: 0,
66 message_count: 0,
67 marker: [0; 16],
68 user_metadata: Default::default(),
69 names_refs: Default::default(),
70 };
71
72 block.read_header()?;
73 Ok(block)
74 }
75
76 fn read_header(&mut self) -> AvroResult<()> {
79 let mut buf = [0u8; 4];
80 self.reader
81 .read_exact(&mut buf)
82 .map_err(Error::ReadHeader)?;
83
84 if buf != [b'O', b'b', b'j', 1u8] {
85 return Err(Error::HeaderMagic);
86 }
87
88 let meta_schema = Schema::map(Schema::Bytes);
89 if let Value::Map(metadata) = decode(&meta_schema, &mut self.reader)? {
90 self.read_writer_schema(&metadata)?;
91 self.codec = read_codec(&metadata)?;
92
93 for (key, value) in metadata {
94 if key == "avro.schema"
95 || key == "avro.codec"
96 || key == "avro.codec.compression_level"
97 {
98 } else if key.starts_with("avro.") {
100 warn!("Ignoring unknown metadata key: {}", key);
101 } else {
102 self.read_user_metadata(key, value);
103 }
104 }
105 } else {
106 return Err(Error::GetHeaderMetadata);
107 }
108
109 self.reader
110 .read_exact(&mut self.marker)
111 .map_err(Error::ReadMarker)
112 }
113
114 fn fill_buf(&mut self, n: usize) -> AvroResult<()> {
115 self.buf.resize(util::safe_len(n)?, 0);
127 self.reader
128 .read_exact(&mut self.buf)
129 .map_err(Error::ReadIntoBuf)?;
130 self.buf_idx = 0;
131 Ok(())
132 }
133
134 fn read_block_next(&mut self) -> AvroResult<()> {
137 assert!(self.is_empty(), "Expected self to be empty!");
138 match util::read_long(&mut self.reader) {
139 Ok(block_len) => {
140 self.message_count = block_len as usize;
141 let block_bytes = util::read_long(&mut self.reader)?;
142 self.fill_buf(block_bytes as usize)?;
143 let mut marker = [0u8; 16];
144 self.reader
145 .read_exact(&mut marker)
146 .map_err(Error::ReadBlockMarker)?;
147
148 if marker != self.marker {
149 return Err(Error::GetBlockMarker);
150 }
151
152 self.codec.decompress(&mut self.buf)
159 }
160 Err(Error::ReadVariableIntegerBytes(io_err)) => {
161 if let ErrorKind::UnexpectedEof = io_err.kind() {
162 Ok(())
164 } else {
165 Err(Error::ReadVariableIntegerBytes(io_err))
166 }
167 }
168 Err(e) => Err(e),
169 }
170 }
171
172 fn len(&self) -> usize {
173 self.message_count
174 }
175
176 fn is_empty(&self) -> bool {
177 self.len() == 0
178 }
179
180 fn read_next(&mut self, read_schema: Option<&Schema>) -> AvroResult<Option<Value>> {
181 if self.is_empty() {
182 self.read_block_next()?;
183 if self.is_empty() {
184 return Ok(None);
185 }
186 }
187
188 let mut block_bytes = &self.buf[self.buf_idx..];
189 let b_original = block_bytes.len();
190
191 let item = decode_internal(
192 &self.writer_schema,
193 &self.names_refs,
194 &None,
195 &mut block_bytes,
196 )?;
197 let item = match read_schema {
198 Some(schema) => item.resolve(schema)?,
199 None => item,
200 };
201
202 if b_original != 0 && b_original == block_bytes.len() {
203 return Err(Error::ReadBlock);
205 }
206 self.buf_idx += b_original - block_bytes.len();
207 self.message_count -= 1;
208 Ok(Some(item))
209 }
210
211 fn read_writer_schema(&mut self, metadata: &HashMap<String, Value>) -> AvroResult<()> {
212 let json: serde_json::Value = metadata
213 .get("avro.schema")
214 .and_then(|bytes| {
215 if let Value::Bytes(ref bytes) = *bytes {
216 from_slice(bytes.as_ref()).ok()
217 } else {
218 None
219 }
220 })
221 .ok_or(Error::GetAvroSchemaFromMap)?;
222 if !self.schemata.is_empty() {
223 let rs = ResolvedSchema::try_from(self.schemata.clone())?;
224 let names: Names = rs
225 .get_names()
226 .iter()
227 .map(|(name, schema)| (name.clone(), (*schema).clone()))
228 .collect();
229 self.writer_schema = Schema::parse_with_names(&json, names)?;
230 resolve_names_with_schemata(&self.schemata, &mut self.names_refs, &None)?;
231 } else {
232 self.writer_schema = Schema::parse(&json)?;
233 resolve_names(&self.writer_schema, &mut self.names_refs, &None)?;
234 }
235 Ok(())
236 }
237
238 fn read_user_metadata(&mut self, key: String, value: Value) {
239 match value {
240 Value::Bytes(ref vec) => {
241 self.user_metadata.insert(key, vec.clone());
242 }
243 wrong => {
244 warn!(
245 "User metadata values must be Value::Bytes, found {:?}",
246 wrong
247 );
248 }
249 }
250 }
251}
252
253fn read_codec(metadata: &HashMap<String, Value>) -> AvroResult<Codec> {
254 let result = metadata
255 .get("avro.codec")
256 .map(|codec| {
257 if let Value::Bytes(ref bytes) = *codec {
258 match std::str::from_utf8(bytes.as_ref()) {
259 Ok(utf8) => Ok(utf8),
260 Err(utf8_error) => Err(Error::ConvertToUtf8Error(utf8_error)),
261 }
262 } else {
263 Err(Error::BadCodecMetadata)
264 }
265 })
266 .map(|codec_res| match codec_res {
267 Ok(codec) => match Codec::from_str(codec) {
268 Ok(codec) => match codec {
269 #[cfg(feature = "bzip")]
270 Codec::Bzip2(_) => {
271 use crate::Bzip2Settings;
272 if let Some(Value::Bytes(bytes)) =
273 metadata.get("avro.codec.compression_level")
274 {
275 Ok(Codec::Bzip2(Bzip2Settings::new(bytes[0])))
276 } else {
277 Ok(codec)
278 }
279 }
280 #[cfg(feature = "xz")]
281 Codec::Xz(_) => {
282 use crate::XzSettings;
283 if let Some(Value::Bytes(bytes)) =
284 metadata.get("avro.codec.compression_level")
285 {
286 Ok(Codec::Xz(XzSettings::new(bytes[0])))
287 } else {
288 Ok(codec)
289 }
290 }
291 #[cfg(feature = "zstandard")]
292 Codec::Zstandard(_) => {
293 use crate::ZstandardSettings;
294 if let Some(Value::Bytes(bytes)) =
295 metadata.get("avro.codec.compression_level")
296 {
297 Ok(Codec::Zstandard(ZstandardSettings::new(bytes[0])))
298 } else {
299 Ok(codec)
300 }
301 }
302 _ => Ok(codec),
303 },
304 Err(_) => Err(Error::CodecNotSupported(codec.to_owned())),
305 },
306 Err(err) => Err(err),
307 });
308
309 result.unwrap_or(Ok(Codec::Null))
310}
311
312pub struct Reader<'a, R> {
328 block: Block<'a, R>,
329 reader_schema: Option<&'a Schema>,
330 errored: bool,
331 should_resolve_schema: bool,
332}
333
334impl<'a, R: Read> Reader<'a, R> {
335 pub fn new(reader: R) -> AvroResult<Reader<'a, R>> {
340 let block = Block::new(reader, vec![])?;
341 let reader = Reader {
342 block,
343 reader_schema: None,
344 errored: false,
345 should_resolve_schema: false,
346 };
347 Ok(reader)
348 }
349
350 pub fn with_schema(schema: &'a Schema, reader: R) -> AvroResult<Reader<'a, R>> {
355 let block = Block::new(reader, vec![schema])?;
356 let mut reader = Reader {
357 block,
358 reader_schema: Some(schema),
359 errored: false,
360 should_resolve_schema: false,
361 };
362 reader.should_resolve_schema = reader.writer_schema() != schema;
364 Ok(reader)
365 }
366
367 pub fn with_schemata(
372 schema: &'a Schema,
373 schemata: Vec<&'a Schema>,
374 reader: R,
375 ) -> AvroResult<Reader<'a, R>> {
376 let block = Block::new(reader, schemata)?;
377 let mut reader = Reader {
378 block,
379 reader_schema: Some(schema),
380 errored: false,
381 should_resolve_schema: false,
382 };
383 reader.should_resolve_schema = reader.writer_schema() != schema;
385 Ok(reader)
386 }
387
388 #[inline]
390 pub fn writer_schema(&self) -> &Schema {
391 &self.block.writer_schema
392 }
393
394 #[inline]
396 pub fn reader_schema(&self) -> Option<&Schema> {
397 self.reader_schema
398 }
399
400 #[inline]
402 pub fn user_metadata(&self) -> &HashMap<String, Vec<u8>> {
403 &self.block.user_metadata
404 }
405
406 #[inline]
407 fn read_next(&mut self) -> AvroResult<Option<Value>> {
408 let read_schema = if self.should_resolve_schema {
409 self.reader_schema
410 } else {
411 None
412 };
413
414 self.block.read_next(read_schema)
415 }
416}
417
418impl<R: Read> Iterator for Reader<'_, R> {
419 type Item = AvroResult<Value>;
420
421 fn next(&mut self) -> Option<Self::Item> {
422 if self.errored {
424 return None;
425 };
426 match self.read_next() {
427 Ok(opt) => opt.map(Ok),
428 Err(e) => {
429 self.errored = true;
430 Some(Err(e))
431 }
432 }
433 }
434}
435
436pub fn from_avro_datum<R: Read>(
445 writer_schema: &Schema,
446 reader: &mut R,
447 reader_schema: Option<&Schema>,
448) -> AvroResult<Value> {
449 let value = decode(writer_schema, reader)?;
450 match reader_schema {
451 Some(schema) => value.resolve(schema),
452 None => Ok(value),
453 }
454}
455
456pub fn from_avro_datum_schemata<R: Read>(
463 writer_schema: &Schema,
464 writer_schemata: Vec<&Schema>,
465 reader: &mut R,
466 reader_schema: Option<&Schema>,
467) -> AvroResult<Value> {
468 from_avro_datum_reader_schemata(
469 writer_schema,
470 writer_schemata,
471 reader,
472 reader_schema,
473 Vec::with_capacity(0),
474 )
475}
476
477pub fn from_avro_datum_reader_schemata<R: Read>(
484 writer_schema: &Schema,
485 writer_schemata: Vec<&Schema>,
486 reader: &mut R,
487 reader_schema: Option<&Schema>,
488 reader_schemata: Vec<&Schema>,
489) -> AvroResult<Value> {
490 let rs = ResolvedSchema::try_from(writer_schemata)?;
491 let value = decode_internal(writer_schema, rs.get_names(), &None, reader)?;
492 match reader_schema {
493 Some(schema) => {
494 if reader_schemata.is_empty() {
495 value.resolve(schema)
496 } else {
497 value.resolve_schemata(schema, reader_schemata)
498 }
499 }
500 None => Ok(value),
501 }
502}
503
504pub struct GenericSingleObjectReader {
505 write_schema: ResolvedOwnedSchema,
506 expected_header: Vec<u8>,
507}
508
509impl GenericSingleObjectReader {
510 pub fn new(schema: Schema) -> AvroResult<GenericSingleObjectReader> {
511 let header_builder = RabinFingerprintHeader::from_schema(&schema);
512 Self::new_with_header_builder(schema, header_builder)
513 }
514
515 pub fn new_with_header_builder<HB: HeaderBuilder>(
516 schema: Schema,
517 header_builder: HB,
518 ) -> AvroResult<GenericSingleObjectReader> {
519 let expected_header = header_builder.build_header();
520 Ok(GenericSingleObjectReader {
521 write_schema: ResolvedOwnedSchema::try_from(schema)?,
522 expected_header,
523 })
524 }
525
526 pub fn read_value<R: Read>(&self, reader: &mut R) -> AvroResult<Value> {
527 let mut header = vec![0; self.expected_header.len()];
528 match reader.read_exact(&mut header) {
529 Ok(_) => {
530 if self.expected_header == header {
531 decode_internal(
532 self.write_schema.get_root_schema(),
533 self.write_schema.get_names(),
534 &None,
535 reader,
536 )
537 } else {
538 Err(Error::SingleObjectHeaderMismatch(
539 self.expected_header.clone(),
540 header,
541 ))
542 }
543 }
544 Err(io_error) => Err(Error::ReadHeader(io_error)),
545 }
546 }
547}
548
549pub struct SpecificSingleObjectReader<T>
550where
551 T: AvroSchema,
552{
553 inner: GenericSingleObjectReader,
554 _model: PhantomData<T>,
555}
556
557impl<T> SpecificSingleObjectReader<T>
558where
559 T: AvroSchema,
560{
561 pub fn new() -> AvroResult<SpecificSingleObjectReader<T>> {
562 Ok(SpecificSingleObjectReader {
563 inner: GenericSingleObjectReader::new(T::get_schema())?,
564 _model: PhantomData,
565 })
566 }
567}
568
569impl<T> SpecificSingleObjectReader<T>
570where
571 T: AvroSchema + From<Value>,
572{
573 pub fn read_from_value<R: Read>(&self, reader: &mut R) -> AvroResult<T> {
574 self.inner.read_value(reader).map(|v| v.into())
575 }
576}
577
578impl<T> SpecificSingleObjectReader<T>
579where
580 T: AvroSchema + DeserializeOwned,
581{
582 pub fn read<R: Read>(&self, reader: &mut R) -> AvroResult<T> {
583 from_value::<T>(&self.inner.read_value(reader)?)
584 }
585}
586
587pub fn read_marker(bytes: &[u8]) -> [u8; 16] {
589 assert!(
590 bytes.len() > 16,
591 "The bytes are too short to read a marker from them"
592 );
593 let mut marker = [0_u8; 16];
594 marker.clone_from_slice(&bytes[(bytes.len() - 16)..]);
595 marker
596}
597
598#[cfg(test)]
599mod tests {
600 use super::*;
601 use crate::{encode::encode, headers::GlueSchemaUuidHeader, rabin::Rabin, types::Record};
602 use apache_avro_test_helper::TestResult;
603 use pretty_assertions::assert_eq;
604 use serde::Deserialize;
605 use std::io::Cursor;
606 use uuid::Uuid;
607
608 const SCHEMA: &str = r#"
609 {
610 "type": "record",
611 "name": "test",
612 "fields": [
613 {
614 "name": "a",
615 "type": "long",
616 "default": 42
617 },
618 {
619 "name": "b",
620 "type": "string"
621 }
622 ]
623 }
624 "#;
625 const UNION_SCHEMA: &str = r#"["null", "long"]"#;
626 const ENCODED: &[u8] = &[
627 79u8, 98u8, 106u8, 1u8, 4u8, 22u8, 97u8, 118u8, 114u8, 111u8, 46u8, 115u8, 99u8, 104u8,
628 101u8, 109u8, 97u8, 222u8, 1u8, 123u8, 34u8, 116u8, 121u8, 112u8, 101u8, 34u8, 58u8, 34u8,
629 114u8, 101u8, 99u8, 111u8, 114u8, 100u8, 34u8, 44u8, 34u8, 110u8, 97u8, 109u8, 101u8, 34u8,
630 58u8, 34u8, 116u8, 101u8, 115u8, 116u8, 34u8, 44u8, 34u8, 102u8, 105u8, 101u8, 108u8,
631 100u8, 115u8, 34u8, 58u8, 91u8, 123u8, 34u8, 110u8, 97u8, 109u8, 101u8, 34u8, 58u8, 34u8,
632 97u8, 34u8, 44u8, 34u8, 116u8, 121u8, 112u8, 101u8, 34u8, 58u8, 34u8, 108u8, 111u8, 110u8,
633 103u8, 34u8, 44u8, 34u8, 100u8, 101u8, 102u8, 97u8, 117u8, 108u8, 116u8, 34u8, 58u8, 52u8,
634 50u8, 125u8, 44u8, 123u8, 34u8, 110u8, 97u8, 109u8, 101u8, 34u8, 58u8, 34u8, 98u8, 34u8,
635 44u8, 34u8, 116u8, 121u8, 112u8, 101u8, 34u8, 58u8, 34u8, 115u8, 116u8, 114u8, 105u8,
636 110u8, 103u8, 34u8, 125u8, 93u8, 125u8, 20u8, 97u8, 118u8, 114u8, 111u8, 46u8, 99u8, 111u8,
637 100u8, 101u8, 99u8, 8u8, 110u8, 117u8, 108u8, 108u8, 0u8, 94u8, 61u8, 54u8, 221u8, 190u8,
638 207u8, 108u8, 180u8, 158u8, 57u8, 114u8, 40u8, 173u8, 199u8, 228u8, 239u8, 4u8, 20u8, 54u8,
639 6u8, 102u8, 111u8, 111u8, 84u8, 6u8, 98u8, 97u8, 114u8, 94u8, 61u8, 54u8, 221u8, 190u8,
640 207u8, 108u8, 180u8, 158u8, 57u8, 114u8, 40u8, 173u8, 199u8, 228u8, 239u8,
641 ];
642
643 #[test]
644 fn test_from_avro_datum() -> TestResult {
645 let schema = Schema::parse_str(SCHEMA)?;
646 let mut encoded: &'static [u8] = &[54, 6, 102, 111, 111];
647
648 let mut record = Record::new(&schema).unwrap();
649 record.put("a", 27i64);
650 record.put("b", "foo");
651 let expected = record.into();
652
653 assert_eq!(from_avro_datum(&schema, &mut encoded, None)?, expected);
654
655 Ok(())
656 }
657
658 #[test]
659 fn test_from_avro_datum_with_union_to_struct() -> TestResult {
660 const TEST_RECORD_SCHEMA_3240: &str = r#"
661 {
662 "type": "record",
663 "name": "test",
664 "fields": [
665 {
666 "name": "a",
667 "type": "long",
668 "default": 42
669 },
670 {
671 "name": "b",
672 "type": "string"
673 },
674 {
675 "name": "a_nullable_array",
676 "type": ["null", {"type": "array", "items": {"type": "string"}}],
677 "default": null
678 },
679 {
680 "name": "a_nullable_boolean",
681 "type": ["null", {"type": "boolean"}],
682 "default": null
683 },
684 {
685 "name": "a_nullable_string",
686 "type": ["null", {"type": "string"}],
687 "default": null
688 }
689 ]
690 }
691 "#;
692 #[derive(Default, Debug, Deserialize, PartialEq, Eq)]
693 struct TestRecord3240 {
694 a: i64,
695 b: String,
696 a_nullable_array: Option<Vec<String>>,
697 a_nullable_string: Option<String>,
700 }
701
702 let schema = Schema::parse_str(TEST_RECORD_SCHEMA_3240)?;
703 let mut encoded: &'static [u8] = &[54, 6, 102, 111, 111];
704
705 let expected_record: TestRecord3240 = TestRecord3240 {
706 a: 27i64,
707 b: String::from("foo"),
708 a_nullable_array: None,
709 a_nullable_string: None,
710 };
711
712 let avro_datum = from_avro_datum(&schema, &mut encoded, None)?;
713 let parsed_record: TestRecord3240 = match &avro_datum {
714 Value::Record(_) => from_value::<TestRecord3240>(&avro_datum)?,
715 unexpected => {
716 panic!("could not map avro data to struct, found unexpected: {unexpected:?}")
717 }
718 };
719
720 assert_eq!(parsed_record, expected_record);
721
722 Ok(())
723 }
724
725 #[test]
726 fn test_null_union() -> TestResult {
727 let schema = Schema::parse_str(UNION_SCHEMA)?;
728 let mut encoded: &'static [u8] = &[2, 0];
729
730 assert_eq!(
731 from_avro_datum(&schema, &mut encoded, None)?,
732 Value::Union(1, Box::new(Value::Long(0)))
733 );
734
735 Ok(())
736 }
737
738 #[test]
739 fn test_reader_iterator() -> TestResult {
740 let schema = Schema::parse_str(SCHEMA)?;
741 let reader = Reader::with_schema(&schema, ENCODED)?;
742
743 let mut record1 = Record::new(&schema).unwrap();
744 record1.put("a", 27i64);
745 record1.put("b", "foo");
746
747 let mut record2 = Record::new(&schema).unwrap();
748 record2.put("a", 42i64);
749 record2.put("b", "bar");
750
751 let expected = [record1.into(), record2.into()];
752
753 for (i, value) in reader.enumerate() {
754 assert_eq!(value?, expected[i]);
755 }
756
757 Ok(())
758 }
759
760 #[test]
761 fn test_reader_invalid_header() -> TestResult {
762 let schema = Schema::parse_str(SCHEMA)?;
763 let invalid = ENCODED.iter().copied().skip(1).collect::<Vec<u8>>();
764 assert!(Reader::with_schema(&schema, &invalid[..]).is_err());
765
766 Ok(())
767 }
768
769 #[test]
770 fn test_reader_invalid_block() -> TestResult {
771 let schema = Schema::parse_str(SCHEMA)?;
772 let invalid = ENCODED
773 .iter()
774 .copied()
775 .rev()
776 .skip(19)
777 .collect::<Vec<u8>>()
778 .into_iter()
779 .rev()
780 .collect::<Vec<u8>>();
781 let reader = Reader::with_schema(&schema, &invalid[..])?;
782 for value in reader {
783 assert!(value.is_err());
784 }
785
786 Ok(())
787 }
788
789 #[test]
790 fn test_reader_empty_buffer() -> TestResult {
791 let empty = Cursor::new(Vec::new());
792 assert!(Reader::new(empty).is_err());
793
794 Ok(())
795 }
796
797 #[test]
798 fn test_reader_only_header() -> TestResult {
799 let invalid = ENCODED.iter().copied().take(165).collect::<Vec<u8>>();
800 let reader = Reader::new(&invalid[..])?;
801 for value in reader {
802 assert!(value.is_err());
803 }
804
805 Ok(())
806 }
807
808 #[test]
809 fn test_avro_3405_read_user_metadata_success() -> TestResult {
810 use crate::writer::Writer;
811
812 let schema = Schema::parse_str(SCHEMA)?;
813 let mut writer = Writer::new(&schema, Vec::new());
814
815 let mut user_meta_data: HashMap<String, Vec<u8>> = HashMap::new();
816 user_meta_data.insert(
817 "stringKey".to_string(),
818 "stringValue".to_string().into_bytes(),
819 );
820 user_meta_data.insert("bytesKey".to_string(), b"bytesValue".to_vec());
821 user_meta_data.insert("vecKey".to_string(), vec![1, 2, 3]);
822
823 for (k, v) in user_meta_data.iter() {
824 writer.add_user_metadata(k.to_string(), v)?;
825 }
826
827 let mut record = Record::new(&schema).unwrap();
828 record.put("a", 27i64);
829 record.put("b", "foo");
830
831 writer.append(record.clone())?;
832 writer.append(record.clone())?;
833 writer.flush()?;
834 let result = writer.into_inner()?;
835
836 let reader = Reader::new(&result[..])?;
837 assert_eq!(reader.user_metadata(), &user_meta_data);
838
839 Ok(())
840 }
841
842 #[derive(Deserialize, Clone, PartialEq, Debug)]
843 struct TestSingleObjectReader {
844 a: i64,
845 b: f64,
846 c: Vec<String>,
847 }
848
849 impl AvroSchema for TestSingleObjectReader {
850 fn get_schema() -> Schema {
851 let schema = r#"
852 {
853 "type":"record",
854 "name":"TestSingleObjectWrtierSerialize",
855 "fields":[
856 {
857 "name":"a",
858 "type":"long"
859 },
860 {
861 "name":"b",
862 "type":"double"
863 },
864 {
865 "name":"c",
866 "type":{
867 "type":"array",
868 "items":"string"
869 }
870 }
871 ]
872 }
873 "#;
874 Schema::parse_str(schema).unwrap()
875 }
876 }
877
878 impl From<Value> for TestSingleObjectReader {
879 fn from(obj: Value) -> TestSingleObjectReader {
880 if let Value::Record(fields) = obj {
881 let mut a = None;
882 let mut b = None;
883 let mut c = vec![];
884 for (field_name, v) in fields {
885 match (field_name.as_str(), v) {
886 ("a", Value::Long(i)) => a = Some(i),
887 ("b", Value::Double(d)) => b = Some(d),
888 ("c", Value::Array(v)) => {
889 for inner_val in v {
890 if let Value::String(s) = inner_val {
891 c.push(s);
892 }
893 }
894 }
895 (key, value) => panic!("Unexpected pair: {key:?} -> {value:?}"),
896 }
897 }
898 TestSingleObjectReader {
899 a: a.unwrap(),
900 b: b.unwrap(),
901 c,
902 }
903 } else {
904 panic!("Expected a Value::Record but was {obj:?}")
905 }
906 }
907 }
908
909 impl From<TestSingleObjectReader> for Value {
910 fn from(obj: TestSingleObjectReader) -> Value {
911 Value::Record(vec![
912 ("a".into(), obj.a.into()),
913 ("b".into(), obj.b.into()),
914 (
915 "c".into(),
916 Value::Array(obj.c.into_iter().map(|s| s.into()).collect()),
917 ),
918 ])
919 }
920 }
921
922 #[test]
923 fn test_avro_3507_single_object_reader() -> TestResult {
924 let obj = TestSingleObjectReader {
925 a: 42,
926 b: 3.33,
927 c: vec!["cat".into(), "dog".into()],
928 };
929 let mut to_read = Vec::<u8>::new();
930 to_read.extend_from_slice(&[0xC3, 0x01]);
931 to_read.extend_from_slice(
932 &TestSingleObjectReader::get_schema()
933 .fingerprint::<Rabin>()
934 .bytes[..],
935 );
936 encode(
937 &obj.clone().into(),
938 &TestSingleObjectReader::get_schema(),
939 &mut to_read,
940 )
941 .expect("Encode should succeed");
942 let mut to_read = &to_read[..];
943 let generic_reader = GenericSingleObjectReader::new(TestSingleObjectReader::get_schema())
944 .expect("Schema should resolve");
945 let val = generic_reader
946 .read_value(&mut to_read)
947 .expect("Should read");
948 let expected_value: Value = obj.into();
949 assert_eq!(expected_value, val);
950
951 Ok(())
952 }
953
954 #[test]
955 fn avro_3642_test_single_object_reader_incomplete_reads() -> TestResult {
956 let obj = TestSingleObjectReader {
957 a: 42,
958 b: 3.33,
959 c: vec!["cat".into(), "dog".into()],
960 };
961 let to_read_1 = [0xC3, 0x01];
963 let mut to_read_2 = Vec::<u8>::new();
964 to_read_2.extend_from_slice(
965 &TestSingleObjectReader::get_schema()
966 .fingerprint::<Rabin>()
967 .bytes[..],
968 );
969 let mut to_read_3 = Vec::<u8>::new();
970 encode(
971 &obj.clone().into(),
972 &TestSingleObjectReader::get_schema(),
973 &mut to_read_3,
974 )
975 .expect("Encode should succeed");
976 let mut to_read = (&to_read_1[..]).chain(&to_read_2[..]).chain(&to_read_3[..]);
977 let generic_reader = GenericSingleObjectReader::new(TestSingleObjectReader::get_schema())
978 .expect("Schema should resolve");
979 let val = generic_reader
980 .read_value(&mut to_read)
981 .expect("Should read");
982 let expected_value: Value = obj.into();
983 assert_eq!(expected_value, val);
984
985 Ok(())
986 }
987
988 #[test]
989 fn test_avro_3507_reader_parity() -> TestResult {
990 let obj = TestSingleObjectReader {
991 a: 42,
992 b: 3.33,
993 c: vec!["cat".into(), "dog".into()],
994 };
995
996 let mut to_read = Vec::<u8>::new();
997 to_read.extend_from_slice(&[0xC3, 0x01]);
998 to_read.extend_from_slice(
999 &TestSingleObjectReader::get_schema()
1000 .fingerprint::<Rabin>()
1001 .bytes[..],
1002 );
1003 encode(
1004 &obj.clone().into(),
1005 &TestSingleObjectReader::get_schema(),
1006 &mut to_read,
1007 )
1008 .expect("Encode should succeed");
1009 let generic_reader = GenericSingleObjectReader::new(TestSingleObjectReader::get_schema())
1010 .expect("Schema should resolve");
1011 let specific_reader = SpecificSingleObjectReader::<TestSingleObjectReader>::new()
1012 .expect("schema should resolve");
1013 let mut to_read1 = &to_read[..];
1014 let mut to_read2 = &to_read[..];
1015 let mut to_read3 = &to_read[..];
1016
1017 let val = generic_reader
1018 .read_value(&mut to_read1)
1019 .expect("Should read");
1020 let read_obj1 = specific_reader
1021 .read_from_value(&mut to_read2)
1022 .expect("Should read from value");
1023 let read_obj2 = specific_reader
1024 .read(&mut to_read3)
1025 .expect("Should read from deserilize");
1026 let expected_value: Value = obj.clone().into();
1027 assert_eq!(obj, read_obj1);
1028 assert_eq!(obj, read_obj2);
1029 assert_eq!(val, expected_value);
1030
1031 Ok(())
1032 }
1033
1034 #[test]
1035 fn avro_rs_164_generic_reader_alternate_header() -> TestResult {
1036 let schema_uuid = Uuid::parse_str("b2f1cf00-0434-013e-439a-125eb8485a5f")?;
1037 let header_builder = GlueSchemaUuidHeader::from_uuid(schema_uuid);
1038 let generic_reader = GenericSingleObjectReader::new_with_header_builder(
1039 TestSingleObjectReader::get_schema(),
1040 header_builder,
1041 )
1042 .expect("failed to build reader");
1043 let data_to_read: Vec<u8> = vec![
1044 3, 0, 178, 241, 207, 0, 4, 52, 1, 62, 67, 154, 18, 94, 184, 72, 90, 95,
1045 ];
1046 let mut to_read = &data_to_read[..];
1047 let read_result = generic_reader.read_value(&mut to_read);
1048 matches!(read_result, Err(crate::Error::ReadBytes(_)));
1049 Ok(())
1050 }
1051
1052 #[cfg(not(feature = "snappy"))]
1053 #[test]
1054 fn test_avro_3549_read_not_enabled_codec() {
1055 let snappy_compressed_avro = vec![
1056 79, 98, 106, 1, 4, 22, 97, 118, 114, 111, 46, 115, 99, 104, 101, 109, 97, 210, 1, 123,
1057 34, 102, 105, 101, 108, 100, 115, 34, 58, 91, 123, 34, 110, 97, 109, 101, 34, 58, 34,
1058 110, 117, 109, 34, 44, 34, 116, 121, 112, 101, 34, 58, 34, 115, 116, 114, 105, 110,
1059 103, 34, 125, 93, 44, 34, 110, 97, 109, 101, 34, 58, 34, 101, 118, 101, 110, 116, 34,
1060 44, 34, 110, 97, 109, 101, 115, 112, 97, 99, 101, 34, 58, 34, 101, 120, 97, 109, 112,
1061 108, 101, 110, 97, 109, 101, 115, 112, 97, 99, 101, 34, 44, 34, 116, 121, 112, 101, 34,
1062 58, 34, 114, 101, 99, 111, 114, 100, 34, 125, 20, 97, 118, 114, 111, 46, 99, 111, 100,
1063 101, 99, 12, 115, 110, 97, 112, 112, 121, 0, 213, 209, 241, 208, 200, 110, 164, 47,
1064 203, 25, 90, 235, 161, 167, 195, 177, 2, 20, 4, 12, 6, 49, 50, 51, 115, 38, 58, 0, 213,
1065 209, 241, 208, 200, 110, 164, 47, 203, 25, 90, 235, 161, 167, 195, 177,
1066 ];
1067
1068 if let Err(err) = Reader::new(snappy_compressed_avro.as_slice()) {
1069 assert_eq!("Codec 'snappy' is not supported/enabled", err.to_string());
1070 } else {
1071 panic!("Expected an error in the reading of the codec!");
1072 }
1073 }
1074}