1use crate::{
20 AvroResult, Codec, Error,
21 decode::{decode, decode_internal},
22 error::Details,
23 from_value,
24 headers::{HeaderBuilder, RabinFingerprintHeader},
25 schema::{
26 AvroSchema, Names, ResolvedOwnedSchema, ResolvedSchema, Schema, resolve_names,
27 resolve_names_with_schemata,
28 },
29 types::Value,
30 util,
31};
32use log::warn;
33use serde::de::DeserializeOwned;
34use serde_json::from_slice;
35use std::{
36 collections::HashMap,
37 io::{ErrorKind, Read},
38 marker::PhantomData,
39 str::FromStr,
40};
41
42#[derive(Debug, Clone)]
44struct Block<'r, R> {
45 reader: R,
46 buf: Vec<u8>,
48 buf_idx: usize,
49 message_count: usize,
51 marker: [u8; 16],
52 codec: Codec,
53 writer_schema: Schema,
54 schemata: Vec<&'r Schema>,
55 user_metadata: HashMap<String, Vec<u8>>,
56 names_refs: Names,
57}
58
59impl<'r, R: Read> Block<'r, R> {
60 fn new(reader: R, schemata: Vec<&'r Schema>) -> AvroResult<Block<'r, R>> {
61 let mut block = Block {
62 reader,
63 codec: Codec::Null,
64 writer_schema: Schema::Null,
65 schemata,
66 buf: vec![],
67 buf_idx: 0,
68 message_count: 0,
69 marker: [0; 16],
70 user_metadata: Default::default(),
71 names_refs: Default::default(),
72 };
73
74 block.read_header()?;
75 Ok(block)
76 }
77
78 fn read_header(&mut self) -> AvroResult<()> {
81 let mut buf = [0u8; 4];
82 self.reader
83 .read_exact(&mut buf)
84 .map_err(Details::ReadHeader)?;
85
86 if buf != [b'O', b'b', b'j', 1u8] {
87 return Err(Details::HeaderMagic.into());
88 }
89
90 let meta_schema = Schema::map(Schema::Bytes);
91 match decode(&meta_schema, &mut self.reader)? {
92 Value::Map(metadata) => {
93 self.read_writer_schema(&metadata)?;
94 self.codec = read_codec(&metadata)?;
95
96 for (key, value) in metadata {
97 if key == "avro.schema"
98 || key == "avro.codec"
99 || key == "avro.codec.compression_level"
100 {
101 } else if key.starts_with("avro.") {
103 warn!("Ignoring unknown metadata key: {key}");
104 } else {
105 self.read_user_metadata(key, value);
106 }
107 }
108 }
109 _ => {
110 return Err(Details::GetHeaderMetadata.into());
111 }
112 }
113
114 self.reader
115 .read_exact(&mut self.marker)
116 .map_err(|e| Details::ReadMarker(e).into())
117 }
118
119 fn fill_buf(&mut self, n: usize) -> AvroResult<()> {
120 self.buf.resize(util::safe_len(n)?, 0);
132 self.reader
133 .read_exact(&mut self.buf)
134 .map_err(Details::ReadIntoBuf)?;
135 self.buf_idx = 0;
136 Ok(())
137 }
138
139 fn read_block_next(&mut self) -> AvroResult<()> {
142 assert!(self.is_empty(), "Expected self to be empty!");
143 match util::read_long(&mut self.reader).map_err(Error::into_details) {
144 Ok(block_len) => {
145 self.message_count = block_len as usize;
146 let block_bytes = util::read_long(&mut self.reader)?;
147 self.fill_buf(block_bytes as usize)?;
148 let mut marker = [0u8; 16];
149 self.reader
150 .read_exact(&mut marker)
151 .map_err(Details::ReadBlockMarker)?;
152
153 if marker != self.marker {
154 return Err(Details::GetBlockMarker.into());
155 }
156
157 self.codec.decompress(&mut self.buf)
164 }
165 Err(Details::ReadVariableIntegerBytes(io_err)) => {
166 if let ErrorKind::UnexpectedEof = io_err.kind() {
167 Ok(())
169 } else {
170 Err(Details::ReadVariableIntegerBytes(io_err).into())
171 }
172 }
173 Err(e) => Err(Error::new(e)),
174 }
175 }
176
177 fn len(&self) -> usize {
178 self.message_count
179 }
180
181 fn is_empty(&self) -> bool {
182 self.len() == 0
183 }
184
185 fn read_next(&mut self, read_schema: Option<&Schema>) -> AvroResult<Option<Value>> {
186 if self.is_empty() {
187 self.read_block_next()?;
188 if self.is_empty() {
189 return Ok(None);
190 }
191 }
192
193 let mut block_bytes = &self.buf[self.buf_idx..];
194 let b_original = block_bytes.len();
195
196 let item = decode_internal(
197 &self.writer_schema,
198 &self.names_refs,
199 &None,
200 &mut block_bytes,
201 )?;
202 let item = match read_schema {
203 Some(schema) => item.resolve(schema)?,
204 None => item,
205 };
206
207 if b_original != 0 && b_original == block_bytes.len() {
208 return Err(Details::ReadBlock.into());
210 }
211 self.buf_idx += b_original - block_bytes.len();
212 self.message_count -= 1;
213 Ok(Some(item))
214 }
215
216 fn read_writer_schema(&mut self, metadata: &HashMap<String, Value>) -> AvroResult<()> {
217 let json: serde_json::Value = metadata
218 .get("avro.schema")
219 .and_then(|bytes| {
220 if let Value::Bytes(ref bytes) = *bytes {
221 from_slice(bytes.as_ref()).ok()
222 } else {
223 None
224 }
225 })
226 .ok_or(Details::GetAvroSchemaFromMap)?;
227 if !self.schemata.is_empty() {
228 let rs = ResolvedSchema::try_from(self.schemata.clone())?;
229 let names: Names = rs
230 .get_names()
231 .iter()
232 .map(|(name, schema)| (name.clone(), (*schema).clone()))
233 .collect();
234 self.writer_schema = Schema::parse_with_names(&json, names)?;
235 resolve_names_with_schemata(&self.schemata, &mut self.names_refs, &None)?;
236 } else {
237 self.writer_schema = Schema::parse(&json)?;
238 resolve_names(&self.writer_schema, &mut self.names_refs, &None)?;
239 }
240 Ok(())
241 }
242
243 fn read_user_metadata(&mut self, key: String, value: Value) {
244 match value {
245 Value::Bytes(ref vec) => {
246 self.user_metadata.insert(key, vec.clone());
247 }
248 wrong => {
249 warn!("User metadata values must be Value::Bytes, found {wrong:?}");
250 }
251 }
252 }
253}
254
255fn read_codec(metadata: &HashMap<String, Value>) -> AvroResult<Codec> {
256 let result = metadata
257 .get("avro.codec")
258 .map(|codec| {
259 if let Value::Bytes(ref bytes) = *codec {
260 match std::str::from_utf8(bytes.as_ref()) {
261 Ok(utf8) => Ok(utf8),
262 Err(utf8_error) => Err(Details::ConvertToUtf8Error(utf8_error).into()),
263 }
264 } else {
265 Err(Details::BadCodecMetadata.into())
266 }
267 })
268 .map(|codec_res| match codec_res {
269 Ok(codec) => match Codec::from_str(codec) {
270 Ok(codec) => match codec {
271 #[cfg(feature = "bzip")]
272 Codec::Bzip2(_) => {
273 use crate::Bzip2Settings;
274 if let Some(Value::Bytes(bytes)) =
275 metadata.get("avro.codec.compression_level")
276 {
277 Ok(Codec::Bzip2(Bzip2Settings::new(bytes[0])))
278 } else {
279 Ok(codec)
280 }
281 }
282 #[cfg(feature = "xz")]
283 Codec::Xz(_) => {
284 use crate::XzSettings;
285 if let Some(Value::Bytes(bytes)) =
286 metadata.get("avro.codec.compression_level")
287 {
288 Ok(Codec::Xz(XzSettings::new(bytes[0])))
289 } else {
290 Ok(codec)
291 }
292 }
293 #[cfg(feature = "zstandard")]
294 Codec::Zstandard(_) => {
295 use crate::ZstandardSettings;
296 if let Some(Value::Bytes(bytes)) =
297 metadata.get("avro.codec.compression_level")
298 {
299 Ok(Codec::Zstandard(ZstandardSettings::new(bytes[0])))
300 } else {
301 Ok(codec)
302 }
303 }
304 _ => Ok(codec),
305 },
306 Err(_) => Err(Details::CodecNotSupported(codec.to_owned()).into()),
307 },
308 Err(err) => Err(err),
309 });
310
311 result.unwrap_or(Ok(Codec::Null))
312}
313
314pub struct Reader<'a, R> {
330 block: Block<'a, R>,
331 reader_schema: Option<&'a Schema>,
332 errored: bool,
333 should_resolve_schema: bool,
334}
335
336impl<'a, R: Read> Reader<'a, R> {
337 pub fn new(reader: R) -> AvroResult<Reader<'a, R>> {
342 let block = Block::new(reader, vec![])?;
343 let reader = Reader {
344 block,
345 reader_schema: None,
346 errored: false,
347 should_resolve_schema: false,
348 };
349 Ok(reader)
350 }
351
352 pub fn with_schema(schema: &'a Schema, reader: R) -> AvroResult<Reader<'a, R>> {
357 let block = Block::new(reader, vec![schema])?;
358 let mut reader = Reader {
359 block,
360 reader_schema: Some(schema),
361 errored: false,
362 should_resolve_schema: false,
363 };
364 reader.should_resolve_schema = reader.writer_schema() != schema;
366 Ok(reader)
367 }
368
369 pub fn with_schemata(
374 schema: &'a Schema,
375 schemata: Vec<&'a Schema>,
376 reader: R,
377 ) -> AvroResult<Reader<'a, R>> {
378 let block = Block::new(reader, schemata)?;
379 let mut reader = Reader {
380 block,
381 reader_schema: Some(schema),
382 errored: false,
383 should_resolve_schema: false,
384 };
385 reader.should_resolve_schema = reader.writer_schema() != schema;
387 Ok(reader)
388 }
389
390 #[inline]
392 pub fn writer_schema(&self) -> &Schema {
393 &self.block.writer_schema
394 }
395
396 #[inline]
398 pub fn reader_schema(&self) -> Option<&Schema> {
399 self.reader_schema
400 }
401
402 #[inline]
404 pub fn user_metadata(&self) -> &HashMap<String, Vec<u8>> {
405 &self.block.user_metadata
406 }
407
408 #[inline]
409 fn read_next(&mut self) -> AvroResult<Option<Value>> {
410 let read_schema = if self.should_resolve_schema {
411 self.reader_schema
412 } else {
413 None
414 };
415
416 self.block.read_next(read_schema)
417 }
418}
419
420impl<R: Read> Iterator for Reader<'_, R> {
421 type Item = AvroResult<Value>;
422
423 fn next(&mut self) -> Option<Self::Item> {
424 if self.errored {
426 return None;
427 };
428 match self.read_next() {
429 Ok(opt) => opt.map(Ok),
430 Err(e) => {
431 self.errored = true;
432 Some(Err(e))
433 }
434 }
435 }
436}
437
438pub fn from_avro_datum<R: Read>(
447 writer_schema: &Schema,
448 reader: &mut R,
449 reader_schema: Option<&Schema>,
450) -> AvroResult<Value> {
451 let value = decode(writer_schema, reader)?;
452 match reader_schema {
453 Some(schema) => value.resolve(schema),
454 None => Ok(value),
455 }
456}
457
458pub fn from_avro_datum_schemata<R: Read>(
465 writer_schema: &Schema,
466 writer_schemata: Vec<&Schema>,
467 reader: &mut R,
468 reader_schema: Option<&Schema>,
469) -> AvroResult<Value> {
470 from_avro_datum_reader_schemata(
471 writer_schema,
472 writer_schemata,
473 reader,
474 reader_schema,
475 Vec::with_capacity(0),
476 )
477}
478
479pub fn from_avro_datum_reader_schemata<R: Read>(
486 writer_schema: &Schema,
487 writer_schemata: Vec<&Schema>,
488 reader: &mut R,
489 reader_schema: Option<&Schema>,
490 reader_schemata: Vec<&Schema>,
491) -> AvroResult<Value> {
492 let rs = ResolvedSchema::try_from(writer_schemata)?;
493 let value = decode_internal(writer_schema, rs.get_names(), &None, reader)?;
494 match reader_schema {
495 Some(schema) => {
496 if reader_schemata.is_empty() {
497 value.resolve(schema)
498 } else {
499 value.resolve_schemata(schema, reader_schemata)
500 }
501 }
502 None => Ok(value),
503 }
504}
505
506pub struct GenericSingleObjectReader {
507 write_schema: ResolvedOwnedSchema,
508 expected_header: Vec<u8>,
509}
510
511impl GenericSingleObjectReader {
512 pub fn new(schema: Schema) -> AvroResult<GenericSingleObjectReader> {
513 let header_builder = RabinFingerprintHeader::from_schema(&schema);
514 Self::new_with_header_builder(schema, header_builder)
515 }
516
517 pub fn new_with_header_builder<HB: HeaderBuilder>(
518 schema: Schema,
519 header_builder: HB,
520 ) -> AvroResult<GenericSingleObjectReader> {
521 let expected_header = header_builder.build_header();
522 Ok(GenericSingleObjectReader {
523 write_schema: ResolvedOwnedSchema::try_from(schema)?,
524 expected_header,
525 })
526 }
527
528 pub fn read_value<R: Read>(&self, reader: &mut R) -> AvroResult<Value> {
529 let mut header = vec![0; self.expected_header.len()];
530 match reader.read_exact(&mut header) {
531 Ok(_) => {
532 if self.expected_header == header {
533 decode_internal(
534 self.write_schema.get_root_schema(),
535 self.write_schema.get_names(),
536 &None,
537 reader,
538 )
539 } else {
540 Err(
541 Details::SingleObjectHeaderMismatch(self.expected_header.clone(), header)
542 .into(),
543 )
544 }
545 }
546 Err(io_error) => Err(Details::ReadHeader(io_error).into()),
547 }
548 }
549}
550
551pub struct SpecificSingleObjectReader<T>
552where
553 T: AvroSchema,
554{
555 inner: GenericSingleObjectReader,
556 _model: PhantomData<T>,
557}
558
559impl<T> SpecificSingleObjectReader<T>
560where
561 T: AvroSchema,
562{
563 pub fn new() -> AvroResult<SpecificSingleObjectReader<T>> {
564 Ok(SpecificSingleObjectReader {
565 inner: GenericSingleObjectReader::new(T::get_schema())?,
566 _model: PhantomData,
567 })
568 }
569}
570
571impl<T> SpecificSingleObjectReader<T>
572where
573 T: AvroSchema + From<Value>,
574{
575 pub fn read_from_value<R: Read>(&self, reader: &mut R) -> AvroResult<T> {
576 self.inner.read_value(reader).map(|v| v.into())
577 }
578}
579
580impl<T> SpecificSingleObjectReader<T>
581where
582 T: AvroSchema + DeserializeOwned,
583{
584 pub fn read<R: Read>(&self, reader: &mut R) -> AvroResult<T> {
585 from_value::<T>(&self.inner.read_value(reader)?)
586 }
587}
588
589pub fn read_marker(bytes: &[u8]) -> [u8; 16] {
591 assert!(
592 bytes.len() > 16,
593 "The bytes are too short to read a marker from them"
594 );
595 let mut marker = [0_u8; 16];
596 marker.clone_from_slice(&bytes[(bytes.len() - 16)..]);
597 marker
598}
599
600#[cfg(test)]
601mod tests {
602 use super::*;
603 use crate::{encode::encode, headers::GlueSchemaUuidHeader, rabin::Rabin, types::Record};
604 use apache_avro_test_helper::TestResult;
605 use pretty_assertions::assert_eq;
606 use serde::Deserialize;
607 use std::io::Cursor;
608 use uuid::Uuid;
609
610 const SCHEMA: &str = r#"
611 {
612 "type": "record",
613 "name": "test",
614 "fields": [
615 {
616 "name": "a",
617 "type": "long",
618 "default": 42
619 },
620 {
621 "name": "b",
622 "type": "string"
623 }
624 ]
625 }
626 "#;
627 const UNION_SCHEMA: &str = r#"["null", "long"]"#;
628 const ENCODED: &[u8] = &[
629 79u8, 98u8, 106u8, 1u8, 4u8, 22u8, 97u8, 118u8, 114u8, 111u8, 46u8, 115u8, 99u8, 104u8,
630 101u8, 109u8, 97u8, 222u8, 1u8, 123u8, 34u8, 116u8, 121u8, 112u8, 101u8, 34u8, 58u8, 34u8,
631 114u8, 101u8, 99u8, 111u8, 114u8, 100u8, 34u8, 44u8, 34u8, 110u8, 97u8, 109u8, 101u8, 34u8,
632 58u8, 34u8, 116u8, 101u8, 115u8, 116u8, 34u8, 44u8, 34u8, 102u8, 105u8, 101u8, 108u8,
633 100u8, 115u8, 34u8, 58u8, 91u8, 123u8, 34u8, 110u8, 97u8, 109u8, 101u8, 34u8, 58u8, 34u8,
634 97u8, 34u8, 44u8, 34u8, 116u8, 121u8, 112u8, 101u8, 34u8, 58u8, 34u8, 108u8, 111u8, 110u8,
635 103u8, 34u8, 44u8, 34u8, 100u8, 101u8, 102u8, 97u8, 117u8, 108u8, 116u8, 34u8, 58u8, 52u8,
636 50u8, 125u8, 44u8, 123u8, 34u8, 110u8, 97u8, 109u8, 101u8, 34u8, 58u8, 34u8, 98u8, 34u8,
637 44u8, 34u8, 116u8, 121u8, 112u8, 101u8, 34u8, 58u8, 34u8, 115u8, 116u8, 114u8, 105u8,
638 110u8, 103u8, 34u8, 125u8, 93u8, 125u8, 20u8, 97u8, 118u8, 114u8, 111u8, 46u8, 99u8, 111u8,
639 100u8, 101u8, 99u8, 8u8, 110u8, 117u8, 108u8, 108u8, 0u8, 94u8, 61u8, 54u8, 221u8, 190u8,
640 207u8, 108u8, 180u8, 158u8, 57u8, 114u8, 40u8, 173u8, 199u8, 228u8, 239u8, 4u8, 20u8, 54u8,
641 6u8, 102u8, 111u8, 111u8, 84u8, 6u8, 98u8, 97u8, 114u8, 94u8, 61u8, 54u8, 221u8, 190u8,
642 207u8, 108u8, 180u8, 158u8, 57u8, 114u8, 40u8, 173u8, 199u8, 228u8, 239u8,
643 ];
644
645 #[test]
646 fn test_from_avro_datum() -> TestResult {
647 let schema = Schema::parse_str(SCHEMA)?;
648 let mut encoded: &'static [u8] = &[54, 6, 102, 111, 111];
649
650 let mut record = Record::new(&schema).unwrap();
651 record.put("a", 27i64);
652 record.put("b", "foo");
653 let expected = record.into();
654
655 assert_eq!(from_avro_datum(&schema, &mut encoded, None)?, expected);
656
657 Ok(())
658 }
659
660 #[test]
661 fn test_from_avro_datum_with_union_to_struct() -> TestResult {
662 const TEST_RECORD_SCHEMA_3240: &str = r#"
663 {
664 "type": "record",
665 "name": "test",
666 "fields": [
667 {
668 "name": "a",
669 "type": "long",
670 "default": 42
671 },
672 {
673 "name": "b",
674 "type": "string"
675 },
676 {
677 "name": "a_nullable_array",
678 "type": ["null", {"type": "array", "items": {"type": "string"}}],
679 "default": null
680 },
681 {
682 "name": "a_nullable_boolean",
683 "type": ["null", {"type": "boolean"}],
684 "default": null
685 },
686 {
687 "name": "a_nullable_string",
688 "type": ["null", {"type": "string"}],
689 "default": null
690 }
691 ]
692 }
693 "#;
694 #[derive(Default, Debug, Deserialize, PartialEq, Eq)]
695 struct TestRecord3240 {
696 a: i64,
697 b: String,
698 a_nullable_array: Option<Vec<String>>,
699 a_nullable_string: Option<String>,
702 }
703
704 let schema = Schema::parse_str(TEST_RECORD_SCHEMA_3240)?;
705 let mut encoded: &'static [u8] = &[54, 6, 102, 111, 111];
706
707 let expected_record: TestRecord3240 = TestRecord3240 {
708 a: 27i64,
709 b: String::from("foo"),
710 a_nullable_array: None,
711 a_nullable_string: None,
712 };
713
714 let avro_datum = from_avro_datum(&schema, &mut encoded, None)?;
715 let parsed_record: TestRecord3240 = match &avro_datum {
716 Value::Record(_) => from_value::<TestRecord3240>(&avro_datum)?,
717 unexpected => {
718 panic!("could not map avro data to struct, found unexpected: {unexpected:?}")
719 }
720 };
721
722 assert_eq!(parsed_record, expected_record);
723
724 Ok(())
725 }
726
727 #[test]
728 fn test_null_union() -> TestResult {
729 let schema = Schema::parse_str(UNION_SCHEMA)?;
730 let mut encoded: &'static [u8] = &[2, 0];
731
732 assert_eq!(
733 from_avro_datum(&schema, &mut encoded, None)?,
734 Value::Union(1, Box::new(Value::Long(0)))
735 );
736
737 Ok(())
738 }
739
740 #[test]
741 fn test_reader_iterator() -> TestResult {
742 let schema = Schema::parse_str(SCHEMA)?;
743 let reader = Reader::with_schema(&schema, ENCODED)?;
744
745 let mut record1 = Record::new(&schema).unwrap();
746 record1.put("a", 27i64);
747 record1.put("b", "foo");
748
749 let mut record2 = Record::new(&schema).unwrap();
750 record2.put("a", 42i64);
751 record2.put("b", "bar");
752
753 let expected = [record1.into(), record2.into()];
754
755 for (i, value) in reader.enumerate() {
756 assert_eq!(value?, expected[i]);
757 }
758
759 Ok(())
760 }
761
762 #[test]
763 fn test_reader_invalid_header() -> TestResult {
764 let schema = Schema::parse_str(SCHEMA)?;
765 let invalid = ENCODED.iter().copied().skip(1).collect::<Vec<u8>>();
766 assert!(Reader::with_schema(&schema, &invalid[..]).is_err());
767
768 Ok(())
769 }
770
771 #[test]
772 fn test_reader_invalid_block() -> TestResult {
773 let schema = Schema::parse_str(SCHEMA)?;
774 let invalid = ENCODED
775 .iter()
776 .copied()
777 .rev()
778 .skip(19)
779 .collect::<Vec<u8>>()
780 .into_iter()
781 .rev()
782 .collect::<Vec<u8>>();
783 let reader = Reader::with_schema(&schema, &invalid[..])?;
784 for value in reader {
785 assert!(value.is_err());
786 }
787
788 Ok(())
789 }
790
791 #[test]
792 fn test_reader_empty_buffer() -> TestResult {
793 let empty = Cursor::new(Vec::new());
794 assert!(Reader::new(empty).is_err());
795
796 Ok(())
797 }
798
799 #[test]
800 fn test_reader_only_header() -> TestResult {
801 let invalid = ENCODED.iter().copied().take(165).collect::<Vec<u8>>();
802 let reader = Reader::new(&invalid[..])?;
803 for value in reader {
804 assert!(value.is_err());
805 }
806
807 Ok(())
808 }
809
810 #[test]
811 fn test_avro_3405_read_user_metadata_success() -> TestResult {
812 use crate::writer::Writer;
813
814 let schema = Schema::parse_str(SCHEMA)?;
815 let mut writer = Writer::new(&schema, Vec::new());
816
817 let mut user_meta_data: HashMap<String, Vec<u8>> = HashMap::new();
818 user_meta_data.insert(
819 "stringKey".to_string(),
820 "stringValue".to_string().into_bytes(),
821 );
822 user_meta_data.insert("bytesKey".to_string(), b"bytesValue".to_vec());
823 user_meta_data.insert("vecKey".to_string(), vec![1, 2, 3]);
824
825 for (k, v) in user_meta_data.iter() {
826 writer.add_user_metadata(k.to_string(), v)?;
827 }
828
829 let mut record = Record::new(&schema).unwrap();
830 record.put("a", 27i64);
831 record.put("b", "foo");
832
833 writer.append(record.clone())?;
834 writer.append(record.clone())?;
835 writer.flush()?;
836 let result = writer.into_inner()?;
837
838 let reader = Reader::new(&result[..])?;
839 assert_eq!(reader.user_metadata(), &user_meta_data);
840
841 Ok(())
842 }
843
844 #[derive(Deserialize, Clone, PartialEq, Debug)]
845 struct TestSingleObjectReader {
846 a: i64,
847 b: f64,
848 c: Vec<String>,
849 }
850
851 impl AvroSchema for TestSingleObjectReader {
852 fn get_schema() -> Schema {
853 let schema = r#"
854 {
855 "type":"record",
856 "name":"TestSingleObjectWrtierSerialize",
857 "fields":[
858 {
859 "name":"a",
860 "type":"long"
861 },
862 {
863 "name":"b",
864 "type":"double"
865 },
866 {
867 "name":"c",
868 "type":{
869 "type":"array",
870 "items":"string"
871 }
872 }
873 ]
874 }
875 "#;
876 Schema::parse_str(schema).unwrap()
877 }
878 }
879
880 impl From<Value> for TestSingleObjectReader {
881 fn from(obj: Value) -> TestSingleObjectReader {
882 if let Value::Record(fields) = obj {
883 let mut a = None;
884 let mut b = None;
885 let mut c = vec![];
886 for (field_name, v) in fields {
887 match (field_name.as_str(), v) {
888 ("a", Value::Long(i)) => a = Some(i),
889 ("b", Value::Double(d)) => b = Some(d),
890 ("c", Value::Array(v)) => {
891 for inner_val in v {
892 if let Value::String(s) = inner_val {
893 c.push(s);
894 }
895 }
896 }
897 (key, value) => panic!("Unexpected pair: {key:?} -> {value:?}"),
898 }
899 }
900 TestSingleObjectReader {
901 a: a.unwrap(),
902 b: b.unwrap(),
903 c,
904 }
905 } else {
906 panic!("Expected a Value::Record but was {obj:?}")
907 }
908 }
909 }
910
911 impl From<TestSingleObjectReader> for Value {
912 fn from(obj: TestSingleObjectReader) -> Value {
913 Value::Record(vec![
914 ("a".into(), obj.a.into()),
915 ("b".into(), obj.b.into()),
916 (
917 "c".into(),
918 Value::Array(obj.c.into_iter().map(|s| s.into()).collect()),
919 ),
920 ])
921 }
922 }
923
924 #[test]
925 fn test_avro_3507_single_object_reader() -> TestResult {
926 let obj = TestSingleObjectReader {
927 a: 42,
928 b: 3.33,
929 c: vec!["cat".into(), "dog".into()],
930 };
931 let mut to_read = Vec::<u8>::new();
932 to_read.extend_from_slice(&[0xC3, 0x01]);
933 to_read.extend_from_slice(
934 &TestSingleObjectReader::get_schema()
935 .fingerprint::<Rabin>()
936 .bytes[..],
937 );
938 encode(
939 &obj.clone().into(),
940 &TestSingleObjectReader::get_schema(),
941 &mut to_read,
942 )
943 .expect("Encode should succeed");
944 let mut to_read = &to_read[..];
945 let generic_reader = GenericSingleObjectReader::new(TestSingleObjectReader::get_schema())
946 .expect("Schema should resolve");
947 let val = generic_reader
948 .read_value(&mut to_read)
949 .expect("Should read");
950 let expected_value: Value = obj.into();
951 assert_eq!(expected_value, val);
952
953 Ok(())
954 }
955
956 #[test]
957 fn avro_3642_test_single_object_reader_incomplete_reads() -> TestResult {
958 let obj = TestSingleObjectReader {
959 a: 42,
960 b: 3.33,
961 c: vec!["cat".into(), "dog".into()],
962 };
963 let to_read_1 = [0xC3, 0x01];
965 let mut to_read_2 = Vec::<u8>::new();
966 to_read_2.extend_from_slice(
967 &TestSingleObjectReader::get_schema()
968 .fingerprint::<Rabin>()
969 .bytes[..],
970 );
971 let mut to_read_3 = Vec::<u8>::new();
972 encode(
973 &obj.clone().into(),
974 &TestSingleObjectReader::get_schema(),
975 &mut to_read_3,
976 )
977 .expect("Encode should succeed");
978 let mut to_read = (&to_read_1[..]).chain(&to_read_2[..]).chain(&to_read_3[..]);
979 let generic_reader = GenericSingleObjectReader::new(TestSingleObjectReader::get_schema())
980 .expect("Schema should resolve");
981 let val = generic_reader
982 .read_value(&mut to_read)
983 .expect("Should read");
984 let expected_value: Value = obj.into();
985 assert_eq!(expected_value, val);
986
987 Ok(())
988 }
989
990 #[test]
991 fn test_avro_3507_reader_parity() -> TestResult {
992 let obj = TestSingleObjectReader {
993 a: 42,
994 b: 3.33,
995 c: vec!["cat".into(), "dog".into()],
996 };
997
998 let mut to_read = Vec::<u8>::new();
999 to_read.extend_from_slice(&[0xC3, 0x01]);
1000 to_read.extend_from_slice(
1001 &TestSingleObjectReader::get_schema()
1002 .fingerprint::<Rabin>()
1003 .bytes[..],
1004 );
1005 encode(
1006 &obj.clone().into(),
1007 &TestSingleObjectReader::get_schema(),
1008 &mut to_read,
1009 )
1010 .expect("Encode should succeed");
1011 let generic_reader = GenericSingleObjectReader::new(TestSingleObjectReader::get_schema())
1012 .expect("Schema should resolve");
1013 let specific_reader = SpecificSingleObjectReader::<TestSingleObjectReader>::new()
1014 .expect("schema should resolve");
1015 let mut to_read1 = &to_read[..];
1016 let mut to_read2 = &to_read[..];
1017 let mut to_read3 = &to_read[..];
1018
1019 let val = generic_reader
1020 .read_value(&mut to_read1)
1021 .expect("Should read");
1022 let read_obj1 = specific_reader
1023 .read_from_value(&mut to_read2)
1024 .expect("Should read from value");
1025 let read_obj2 = specific_reader
1026 .read(&mut to_read3)
1027 .expect("Should read from deserilize");
1028 let expected_value: Value = obj.clone().into();
1029 assert_eq!(obj, read_obj1);
1030 assert_eq!(obj, read_obj2);
1031 assert_eq!(val, expected_value);
1032
1033 Ok(())
1034 }
1035
1036 #[test]
1037 fn avro_rs_164_generic_reader_alternate_header() -> TestResult {
1038 let schema_uuid = Uuid::parse_str("b2f1cf00-0434-013e-439a-125eb8485a5f")?;
1039 let header_builder = GlueSchemaUuidHeader::from_uuid(schema_uuid);
1040 let generic_reader = GenericSingleObjectReader::new_with_header_builder(
1041 TestSingleObjectReader::get_schema(),
1042 header_builder,
1043 )
1044 .expect("failed to build reader");
1045 let data_to_read: Vec<u8> = vec![
1046 3, 0, 178, 241, 207, 0, 4, 52, 1, 62, 67, 154, 18, 94, 184, 72, 90, 95,
1047 ];
1048 let mut to_read = &data_to_read[..];
1049 let read_result = generic_reader
1050 .read_value(&mut to_read)
1051 .map_err(Error::into_details);
1052 matches!(read_result, Err(Details::ReadBytes(_)));
1053 Ok(())
1054 }
1055
1056 #[cfg(not(feature = "snappy"))]
1057 #[test]
1058 fn test_avro_3549_read_not_enabled_codec() {
1059 let snappy_compressed_avro = vec![
1060 79, 98, 106, 1, 4, 22, 97, 118, 114, 111, 46, 115, 99, 104, 101, 109, 97, 210, 1, 123,
1061 34, 102, 105, 101, 108, 100, 115, 34, 58, 91, 123, 34, 110, 97, 109, 101, 34, 58, 34,
1062 110, 117, 109, 34, 44, 34, 116, 121, 112, 101, 34, 58, 34, 115, 116, 114, 105, 110,
1063 103, 34, 125, 93, 44, 34, 110, 97, 109, 101, 34, 58, 34, 101, 118, 101, 110, 116, 34,
1064 44, 34, 110, 97, 109, 101, 115, 112, 97, 99, 101, 34, 58, 34, 101, 120, 97, 109, 112,
1065 108, 101, 110, 97, 109, 101, 115, 112, 97, 99, 101, 34, 44, 34, 116, 121, 112, 101, 34,
1066 58, 34, 114, 101, 99, 111, 114, 100, 34, 125, 20, 97, 118, 114, 111, 46, 99, 111, 100,
1067 101, 99, 12, 115, 110, 97, 112, 112, 121, 0, 213, 209, 241, 208, 200, 110, 164, 47,
1068 203, 25, 90, 235, 161, 167, 195, 177, 2, 20, 4, 12, 6, 49, 50, 51, 115, 38, 58, 0, 213,
1069 209, 241, 208, 200, 110, 164, 47, 203, 25, 90, 235, 161, 167, 195, 177,
1070 ];
1071
1072 if let Err(err) = Reader::new(snappy_compressed_avro.as_slice()) {
1073 assert_eq!("Codec 'snappy' is not supported/enabled", err.to_string());
1074 } else {
1075 panic!("Expected an error in the reading of the codec!");
1076 }
1077 }
1078}