1use crate::{
20 decode::{decode, decode_internal},
21 from_value,
22 rabin::Rabin,
23 schema::{
24 resolve_names, resolve_names_with_schemata, AvroSchema, Names, ResolvedOwnedSchema,
25 ResolvedSchema, Schema,
26 },
27 types::Value,
28 util, AvroResult, Codec, Error,
29};
30use log::warn;
31use serde::de::DeserializeOwned;
32use serde_json::from_slice;
33use std::{
34 collections::HashMap,
35 io::{ErrorKind, Read},
36 marker::PhantomData,
37 str::FromStr,
38};
39
40#[derive(Debug, Clone)]
42struct Block<'r, R> {
43 reader: R,
44 buf: Vec<u8>,
46 buf_idx: usize,
47 message_count: usize,
49 marker: [u8; 16],
50 codec: Codec,
51 writer_schema: Schema,
52 schemata: Vec<&'r Schema>,
53 user_metadata: HashMap<String, Vec<u8>>,
54 names_refs: Names,
55}
56
57impl<'r, R: Read> Block<'r, R> {
58 fn new(reader: R, schemata: Vec<&'r Schema>) -> AvroResult<Block<'r, R>> {
59 let mut block = Block {
60 reader,
61 codec: Codec::Null,
62 writer_schema: Schema::Null,
63 schemata,
64 buf: vec![],
65 buf_idx: 0,
66 message_count: 0,
67 marker: [0; 16],
68 user_metadata: Default::default(),
69 names_refs: Default::default(),
70 };
71
72 block.read_header()?;
73 Ok(block)
74 }
75
76 fn read_header(&mut self) -> AvroResult<()> {
79 let mut buf = [0u8; 4];
80 self.reader
81 .read_exact(&mut buf)
82 .map_err(Error::ReadHeader)?;
83
84 if buf != [b'O', b'b', b'j', 1u8] {
85 return Err(Error::HeaderMagic);
86 }
87
88 let meta_schema = Schema::map(Schema::Bytes);
89 if let Value::Map(metadata) = decode(&meta_schema, &mut self.reader)? {
90 self.read_writer_schema(&metadata)?;
91 self.codec = read_codec(&metadata)?;
92
93 for (key, value) in metadata {
94 if key == "avro.schema"
95 || key == "avro.codec"
96 || key == "avro.codec.compression_level"
97 {
98 } else if key.starts_with("avro.") {
100 warn!("Ignoring unknown metadata key: {}", key);
101 } else {
102 self.read_user_metadata(key, value);
103 }
104 }
105 } else {
106 return Err(Error::GetHeaderMetadata);
107 }
108
109 self.reader
110 .read_exact(&mut self.marker)
111 .map_err(Error::ReadMarker)
112 }
113
114 fn fill_buf(&mut self, n: usize) -> AvroResult<()> {
115 self.buf.resize(util::safe_len(n)?, 0);
127 self.reader
128 .read_exact(&mut self.buf)
129 .map_err(Error::ReadIntoBuf)?;
130 self.buf_idx = 0;
131 Ok(())
132 }
133
134 fn read_block_next(&mut self) -> AvroResult<()> {
137 assert!(self.is_empty(), "Expected self to be empty!");
138 match util::read_long(&mut self.reader) {
139 Ok(block_len) => {
140 self.message_count = block_len as usize;
141 let block_bytes = util::read_long(&mut self.reader)?;
142 self.fill_buf(block_bytes as usize)?;
143 let mut marker = [0u8; 16];
144 self.reader
145 .read_exact(&mut marker)
146 .map_err(Error::ReadBlockMarker)?;
147
148 if marker != self.marker {
149 return Err(Error::GetBlockMarker);
150 }
151
152 self.codec.decompress(&mut self.buf)
159 }
160 Err(Error::ReadVariableIntegerBytes(io_err)) => {
161 if let ErrorKind::UnexpectedEof = io_err.kind() {
162 Ok(())
164 } else {
165 Err(Error::ReadVariableIntegerBytes(io_err))
166 }
167 }
168 Err(e) => Err(e),
169 }
170 }
171
172 fn len(&self) -> usize {
173 self.message_count
174 }
175
176 fn is_empty(&self) -> bool {
177 self.len() == 0
178 }
179
180 fn read_next(&mut self, read_schema: Option<&Schema>) -> AvroResult<Option<Value>> {
181 if self.is_empty() {
182 self.read_block_next()?;
183 if self.is_empty() {
184 return Ok(None);
185 }
186 }
187
188 let mut block_bytes = &self.buf[self.buf_idx..];
189 let b_original = block_bytes.len();
190
191 let item = decode_internal(
192 &self.writer_schema,
193 &self.names_refs,
194 &None,
195 &mut block_bytes,
196 )?;
197 let item = match read_schema {
198 Some(schema) => item.resolve(schema)?,
199 None => item,
200 };
201
202 if b_original != 0 && b_original == block_bytes.len() {
203 return Err(Error::ReadBlock);
205 }
206 self.buf_idx += b_original - block_bytes.len();
207 self.message_count -= 1;
208 Ok(Some(item))
209 }
210
211 fn read_writer_schema(&mut self, metadata: &HashMap<String, Value>) -> AvroResult<()> {
212 let json: serde_json::Value = metadata
213 .get("avro.schema")
214 .and_then(|bytes| {
215 if let Value::Bytes(ref bytes) = *bytes {
216 from_slice(bytes.as_ref()).ok()
217 } else {
218 None
219 }
220 })
221 .ok_or(Error::GetAvroSchemaFromMap)?;
222 if !self.schemata.is_empty() {
223 let rs = ResolvedSchema::try_from(self.schemata.clone())?;
224 let names: Names = rs
225 .get_names()
226 .iter()
227 .map(|(name, schema)| (name.clone(), (*schema).clone()))
228 .collect();
229 self.writer_schema = Schema::parse_with_names(&json, names)?;
230 resolve_names_with_schemata(&self.schemata, &mut self.names_refs, &None)?;
231 } else {
232 self.writer_schema = Schema::parse(&json)?;
233 resolve_names(&self.writer_schema, &mut self.names_refs, &None)?;
234 }
235 Ok(())
236 }
237
238 fn read_user_metadata(&mut self, key: String, value: Value) {
239 match value {
240 Value::Bytes(ref vec) => {
241 self.user_metadata.insert(key, vec.clone());
242 }
243 wrong => {
244 warn!(
245 "User metadata values must be Value::Bytes, found {:?}",
246 wrong
247 );
248 }
249 }
250 }
251}
252
253fn read_codec(metadata: &HashMap<String, Value>) -> AvroResult<Codec> {
254 let result = metadata
255 .get("avro.codec")
256 .map(|codec| {
257 if let Value::Bytes(ref bytes) = *codec {
258 match std::str::from_utf8(bytes.as_ref()) {
259 Ok(utf8) => Ok(utf8),
260 Err(utf8_error) => Err(Error::ConvertToUtf8Error(utf8_error)),
261 }
262 } else {
263 Err(Error::BadCodecMetadata)
264 }
265 })
266 .map(|codec_res| match codec_res {
267 Ok(codec) => match Codec::from_str(codec) {
268 Ok(codec) => match codec {
269 #[cfg(feature = "bzip")]
270 Codec::Bzip2(_) => {
271 use crate::Bzip2Settings;
272 if let Some(Value::Bytes(bytes)) =
273 metadata.get("avro.codec.compression_level")
274 {
275 Ok(Codec::Bzip2(Bzip2Settings::new(bytes[0])))
276 } else {
277 Ok(codec)
278 }
279 }
280 #[cfg(feature = "xz")]
281 Codec::Xz(_) => {
282 use crate::XzSettings;
283 if let Some(Value::Bytes(bytes)) =
284 metadata.get("avro.codec.compression_level")
285 {
286 Ok(Codec::Xz(XzSettings::new(bytes[0])))
287 } else {
288 Ok(codec)
289 }
290 }
291 #[cfg(feature = "zstandard")]
292 Codec::Zstandard(_) => {
293 use crate::ZstandardSettings;
294 if let Some(Value::Bytes(bytes)) =
295 metadata.get("avro.codec.compression_level")
296 {
297 Ok(Codec::Zstandard(ZstandardSettings::new(bytes[0])))
298 } else {
299 Ok(codec)
300 }
301 }
302 _ => Ok(codec),
303 },
304 Err(_) => Err(Error::CodecNotSupported(codec.to_owned())),
305 },
306 Err(err) => Err(err),
307 });
308
309 result.unwrap_or(Ok(Codec::Null))
310}
311
312pub struct Reader<'a, R> {
328 block: Block<'a, R>,
329 reader_schema: Option<&'a Schema>,
330 errored: bool,
331 should_resolve_schema: bool,
332}
333
334impl<'a, R: Read> Reader<'a, R> {
335 pub fn new(reader: R) -> AvroResult<Reader<'a, R>> {
340 let block = Block::new(reader, vec![])?;
341 let reader = Reader {
342 block,
343 reader_schema: None,
344 errored: false,
345 should_resolve_schema: false,
346 };
347 Ok(reader)
348 }
349
350 pub fn with_schema(schema: &'a Schema, reader: R) -> AvroResult<Reader<'a, R>> {
355 let block = Block::new(reader, vec![schema])?;
356 let mut reader = Reader {
357 block,
358 reader_schema: Some(schema),
359 errored: false,
360 should_resolve_schema: false,
361 };
362 reader.should_resolve_schema = reader.writer_schema() != schema;
364 Ok(reader)
365 }
366
367 pub fn with_schemata(
372 schema: &'a Schema,
373 schemata: Vec<&'a Schema>,
374 reader: R,
375 ) -> AvroResult<Reader<'a, R>> {
376 let block = Block::new(reader, schemata)?;
377 let mut reader = Reader {
378 block,
379 reader_schema: Some(schema),
380 errored: false,
381 should_resolve_schema: false,
382 };
383 reader.should_resolve_schema = reader.writer_schema() != schema;
385 Ok(reader)
386 }
387
388 #[inline]
390 pub fn writer_schema(&self) -> &Schema {
391 &self.block.writer_schema
392 }
393
394 #[inline]
396 pub fn reader_schema(&self) -> Option<&Schema> {
397 self.reader_schema
398 }
399
400 #[inline]
402 pub fn user_metadata(&self) -> &HashMap<String, Vec<u8>> {
403 &self.block.user_metadata
404 }
405
406 #[inline]
407 fn read_next(&mut self) -> AvroResult<Option<Value>> {
408 let read_schema = if self.should_resolve_schema {
409 self.reader_schema
410 } else {
411 None
412 };
413
414 self.block.read_next(read_schema)
415 }
416}
417
418impl<R: Read> Iterator for Reader<'_, R> {
419 type Item = AvroResult<Value>;
420
421 fn next(&mut self) -> Option<Self::Item> {
422 if self.errored {
424 return None;
425 };
426 match self.read_next() {
427 Ok(opt) => opt.map(Ok),
428 Err(e) => {
429 self.errored = true;
430 Some(Err(e))
431 }
432 }
433 }
434}
435
436pub fn from_avro_datum<R: Read>(
445 writer_schema: &Schema,
446 reader: &mut R,
447 reader_schema: Option<&Schema>,
448) -> AvroResult<Value> {
449 let value = decode(writer_schema, reader)?;
450 match reader_schema {
451 Some(schema) => value.resolve(schema),
452 None => Ok(value),
453 }
454}
455
456pub fn from_avro_datum_schemata<R: Read>(
463 writer_schema: &Schema,
464 writer_schemata: Vec<&Schema>,
465 reader: &mut R,
466 reader_schema: Option<&Schema>,
467) -> AvroResult<Value> {
468 from_avro_datum_reader_schemata(
469 writer_schema,
470 writer_schemata,
471 reader,
472 reader_schema,
473 Vec::with_capacity(0),
474 )
475}
476
477pub fn from_avro_datum_reader_schemata<R: Read>(
484 writer_schema: &Schema,
485 writer_schemata: Vec<&Schema>,
486 reader: &mut R,
487 reader_schema: Option<&Schema>,
488 reader_schemata: Vec<&Schema>,
489) -> AvroResult<Value> {
490 let rs = ResolvedSchema::try_from(writer_schemata)?;
491 let value = decode_internal(writer_schema, rs.get_names(), &None, reader)?;
492 match reader_schema {
493 Some(schema) => {
494 if reader_schemata.is_empty() {
495 value.resolve(schema)
496 } else {
497 value.resolve_schemata(schema, reader_schemata)
498 }
499 }
500 None => Ok(value),
501 }
502}
503
504pub struct GenericSingleObjectReader {
505 write_schema: ResolvedOwnedSchema,
506 expected_header: [u8; 10],
507}
508
509impl GenericSingleObjectReader {
510 pub fn new(schema: Schema) -> AvroResult<GenericSingleObjectReader> {
511 let fingerprint = schema.fingerprint::<Rabin>();
512 let expected_header = [
513 0xC3,
514 0x01,
515 fingerprint.bytes[0],
516 fingerprint.bytes[1],
517 fingerprint.bytes[2],
518 fingerprint.bytes[3],
519 fingerprint.bytes[4],
520 fingerprint.bytes[5],
521 fingerprint.bytes[6],
522 fingerprint.bytes[7],
523 ];
524 Ok(GenericSingleObjectReader {
525 write_schema: ResolvedOwnedSchema::try_from(schema)?,
526 expected_header,
527 })
528 }
529
530 pub fn read_value<R: Read>(&self, reader: &mut R) -> AvroResult<Value> {
531 let mut header: [u8; 10] = [0; 10];
532 match reader.read_exact(&mut header) {
533 Ok(_) => {
534 if self.expected_header == header {
535 decode_internal(
536 self.write_schema.get_root_schema(),
537 self.write_schema.get_names(),
538 &None,
539 reader,
540 )
541 } else {
542 Err(Error::SingleObjectHeaderMismatch(
543 self.expected_header,
544 header,
545 ))
546 }
547 }
548 Err(io_error) => Err(Error::ReadHeader(io_error)),
549 }
550 }
551}
552
553pub struct SpecificSingleObjectReader<T>
554where
555 T: AvroSchema,
556{
557 inner: GenericSingleObjectReader,
558 _model: PhantomData<T>,
559}
560
561impl<T> SpecificSingleObjectReader<T>
562where
563 T: AvroSchema,
564{
565 pub fn new() -> AvroResult<SpecificSingleObjectReader<T>> {
566 Ok(SpecificSingleObjectReader {
567 inner: GenericSingleObjectReader::new(T::get_schema())?,
568 _model: PhantomData,
569 })
570 }
571}
572
573impl<T> SpecificSingleObjectReader<T>
574where
575 T: AvroSchema + From<Value>,
576{
577 pub fn read_from_value<R: Read>(&self, reader: &mut R) -> AvroResult<T> {
578 self.inner.read_value(reader).map(|v| v.into())
579 }
580}
581
582impl<T> SpecificSingleObjectReader<T>
583where
584 T: AvroSchema + DeserializeOwned,
585{
586 pub fn read<R: Read>(&self, reader: &mut R) -> AvroResult<T> {
587 from_value::<T>(&self.inner.read_value(reader)?)
588 }
589}
590
591pub fn read_marker(bytes: &[u8]) -> [u8; 16] {
593 assert!(
594 bytes.len() > 16,
595 "The bytes are too short to read a marker from them"
596 );
597 let mut marker = [0_u8; 16];
598 marker.clone_from_slice(&bytes[(bytes.len() - 16)..]);
599 marker
600}
601
602#[cfg(test)]
603mod tests {
604 use super::*;
605 use crate::{encode::encode, types::Record};
606 use apache_avro_test_helper::TestResult;
607 use pretty_assertions::assert_eq;
608 use serde::Deserialize;
609 use std::io::Cursor;
610
611 const SCHEMA: &str = r#"
612 {
613 "type": "record",
614 "name": "test",
615 "fields": [
616 {
617 "name": "a",
618 "type": "long",
619 "default": 42
620 },
621 {
622 "name": "b",
623 "type": "string"
624 }
625 ]
626 }
627 "#;
628 const UNION_SCHEMA: &str = r#"["null", "long"]"#;
629 const ENCODED: &[u8] = &[
630 79u8, 98u8, 106u8, 1u8, 4u8, 22u8, 97u8, 118u8, 114u8, 111u8, 46u8, 115u8, 99u8, 104u8,
631 101u8, 109u8, 97u8, 222u8, 1u8, 123u8, 34u8, 116u8, 121u8, 112u8, 101u8, 34u8, 58u8, 34u8,
632 114u8, 101u8, 99u8, 111u8, 114u8, 100u8, 34u8, 44u8, 34u8, 110u8, 97u8, 109u8, 101u8, 34u8,
633 58u8, 34u8, 116u8, 101u8, 115u8, 116u8, 34u8, 44u8, 34u8, 102u8, 105u8, 101u8, 108u8,
634 100u8, 115u8, 34u8, 58u8, 91u8, 123u8, 34u8, 110u8, 97u8, 109u8, 101u8, 34u8, 58u8, 34u8,
635 97u8, 34u8, 44u8, 34u8, 116u8, 121u8, 112u8, 101u8, 34u8, 58u8, 34u8, 108u8, 111u8, 110u8,
636 103u8, 34u8, 44u8, 34u8, 100u8, 101u8, 102u8, 97u8, 117u8, 108u8, 116u8, 34u8, 58u8, 52u8,
637 50u8, 125u8, 44u8, 123u8, 34u8, 110u8, 97u8, 109u8, 101u8, 34u8, 58u8, 34u8, 98u8, 34u8,
638 44u8, 34u8, 116u8, 121u8, 112u8, 101u8, 34u8, 58u8, 34u8, 115u8, 116u8, 114u8, 105u8,
639 110u8, 103u8, 34u8, 125u8, 93u8, 125u8, 20u8, 97u8, 118u8, 114u8, 111u8, 46u8, 99u8, 111u8,
640 100u8, 101u8, 99u8, 8u8, 110u8, 117u8, 108u8, 108u8, 0u8, 94u8, 61u8, 54u8, 221u8, 190u8,
641 207u8, 108u8, 180u8, 158u8, 57u8, 114u8, 40u8, 173u8, 199u8, 228u8, 239u8, 4u8, 20u8, 54u8,
642 6u8, 102u8, 111u8, 111u8, 84u8, 6u8, 98u8, 97u8, 114u8, 94u8, 61u8, 54u8, 221u8, 190u8,
643 207u8, 108u8, 180u8, 158u8, 57u8, 114u8, 40u8, 173u8, 199u8, 228u8, 239u8,
644 ];
645
646 #[test]
647 fn test_from_avro_datum() -> TestResult {
648 let schema = Schema::parse_str(SCHEMA)?;
649 let mut encoded: &'static [u8] = &[54, 6, 102, 111, 111];
650
651 let mut record = Record::new(&schema).unwrap();
652 record.put("a", 27i64);
653 record.put("b", "foo");
654 let expected = record.into();
655
656 assert_eq!(from_avro_datum(&schema, &mut encoded, None)?, expected);
657
658 Ok(())
659 }
660
661 #[test]
662 fn test_from_avro_datum_with_union_to_struct() -> TestResult {
663 const TEST_RECORD_SCHEMA_3240: &str = r#"
664 {
665 "type": "record",
666 "name": "test",
667 "fields": [
668 {
669 "name": "a",
670 "type": "long",
671 "default": 42
672 },
673 {
674 "name": "b",
675 "type": "string"
676 },
677 {
678 "name": "a_nullable_array",
679 "type": ["null", {"type": "array", "items": {"type": "string"}}],
680 "default": null
681 },
682 {
683 "name": "a_nullable_boolean",
684 "type": ["null", {"type": "boolean"}],
685 "default": null
686 },
687 {
688 "name": "a_nullable_string",
689 "type": ["null", {"type": "string"}],
690 "default": null
691 }
692 ]
693 }
694 "#;
695 #[derive(Default, Debug, Deserialize, PartialEq, Eq)]
696 struct TestRecord3240 {
697 a: i64,
698 b: String,
699 a_nullable_array: Option<Vec<String>>,
700 a_nullable_string: Option<String>,
703 }
704
705 let schema = Schema::parse_str(TEST_RECORD_SCHEMA_3240)?;
706 let mut encoded: &'static [u8] = &[54, 6, 102, 111, 111];
707
708 let expected_record: TestRecord3240 = TestRecord3240 {
709 a: 27i64,
710 b: String::from("foo"),
711 a_nullable_array: None,
712 a_nullable_string: None,
713 };
714
715 let avro_datum = from_avro_datum(&schema, &mut encoded, None)?;
716 let parsed_record: TestRecord3240 = match &avro_datum {
717 Value::Record(_) => from_value::<TestRecord3240>(&avro_datum)?,
718 unexpected => {
719 panic!("could not map avro data to struct, found unexpected: {unexpected:?}")
720 }
721 };
722
723 assert_eq!(parsed_record, expected_record);
724
725 Ok(())
726 }
727
728 #[test]
729 fn test_null_union() -> TestResult {
730 let schema = Schema::parse_str(UNION_SCHEMA)?;
731 let mut encoded: &'static [u8] = &[2, 0];
732
733 assert_eq!(
734 from_avro_datum(&schema, &mut encoded, None)?,
735 Value::Union(1, Box::new(Value::Long(0)))
736 );
737
738 Ok(())
739 }
740
741 #[test]
742 fn test_reader_iterator() -> TestResult {
743 let schema = Schema::parse_str(SCHEMA)?;
744 let reader = Reader::with_schema(&schema, ENCODED)?;
745
746 let mut record1 = Record::new(&schema).unwrap();
747 record1.put("a", 27i64);
748 record1.put("b", "foo");
749
750 let mut record2 = Record::new(&schema).unwrap();
751 record2.put("a", 42i64);
752 record2.put("b", "bar");
753
754 let expected = [record1.into(), record2.into()];
755
756 for (i, value) in reader.enumerate() {
757 assert_eq!(value?, expected[i]);
758 }
759
760 Ok(())
761 }
762
763 #[test]
764 fn test_reader_invalid_header() -> TestResult {
765 let schema = Schema::parse_str(SCHEMA)?;
766 let invalid = ENCODED.iter().copied().skip(1).collect::<Vec<u8>>();
767 assert!(Reader::with_schema(&schema, &invalid[..]).is_err());
768
769 Ok(())
770 }
771
772 #[test]
773 fn test_reader_invalid_block() -> TestResult {
774 let schema = Schema::parse_str(SCHEMA)?;
775 let invalid = ENCODED
776 .iter()
777 .copied()
778 .rev()
779 .skip(19)
780 .collect::<Vec<u8>>()
781 .into_iter()
782 .rev()
783 .collect::<Vec<u8>>();
784 let reader = Reader::with_schema(&schema, &invalid[..])?;
785 for value in reader {
786 assert!(value.is_err());
787 }
788
789 Ok(())
790 }
791
792 #[test]
793 fn test_reader_empty_buffer() -> TestResult {
794 let empty = Cursor::new(Vec::new());
795 assert!(Reader::new(empty).is_err());
796
797 Ok(())
798 }
799
800 #[test]
801 fn test_reader_only_header() -> TestResult {
802 let invalid = ENCODED.iter().copied().take(165).collect::<Vec<u8>>();
803 let reader = Reader::new(&invalid[..])?;
804 for value in reader {
805 assert!(value.is_err());
806 }
807
808 Ok(())
809 }
810
811 #[test]
812 fn test_avro_3405_read_user_metadata_success() -> TestResult {
813 use crate::writer::Writer;
814
815 let schema = Schema::parse_str(SCHEMA)?;
816 let mut writer = Writer::new(&schema, Vec::new());
817
818 let mut user_meta_data: HashMap<String, Vec<u8>> = HashMap::new();
819 user_meta_data.insert(
820 "stringKey".to_string(),
821 "stringValue".to_string().into_bytes(),
822 );
823 user_meta_data.insert("bytesKey".to_string(), b"bytesValue".to_vec());
824 user_meta_data.insert("vecKey".to_string(), vec![1, 2, 3]);
825
826 for (k, v) in user_meta_data.iter() {
827 writer.add_user_metadata(k.to_string(), v)?;
828 }
829
830 let mut record = Record::new(&schema).unwrap();
831 record.put("a", 27i64);
832 record.put("b", "foo");
833
834 writer.append(record.clone())?;
835 writer.append(record.clone())?;
836 writer.flush()?;
837 let result = writer.into_inner()?;
838
839 let reader = Reader::new(&result[..])?;
840 assert_eq!(reader.user_metadata(), &user_meta_data);
841
842 Ok(())
843 }
844
845 #[derive(Deserialize, Clone, PartialEq, Debug)]
846 struct TestSingleObjectReader {
847 a: i64,
848 b: f64,
849 c: Vec<String>,
850 }
851
852 impl AvroSchema for TestSingleObjectReader {
853 fn get_schema() -> Schema {
854 let schema = r#"
855 {
856 "type":"record",
857 "name":"TestSingleObjectWrtierSerialize",
858 "fields":[
859 {
860 "name":"a",
861 "type":"long"
862 },
863 {
864 "name":"b",
865 "type":"double"
866 },
867 {
868 "name":"c",
869 "type":{
870 "type":"array",
871 "items":"string"
872 }
873 }
874 ]
875 }
876 "#;
877 Schema::parse_str(schema).unwrap()
878 }
879 }
880
881 impl From<Value> for TestSingleObjectReader {
882 fn from(obj: Value) -> TestSingleObjectReader {
883 if let Value::Record(fields) = obj {
884 let mut a = None;
885 let mut b = None;
886 let mut c = vec![];
887 for (field_name, v) in fields {
888 match (field_name.as_str(), v) {
889 ("a", Value::Long(i)) => a = Some(i),
890 ("b", Value::Double(d)) => b = Some(d),
891 ("c", Value::Array(v)) => {
892 for inner_val in v {
893 if let Value::String(s) = inner_val {
894 c.push(s);
895 }
896 }
897 }
898 (key, value) => panic!("Unexpected pair: {key:?} -> {value:?}"),
899 }
900 }
901 TestSingleObjectReader {
902 a: a.unwrap(),
903 b: b.unwrap(),
904 c,
905 }
906 } else {
907 panic!("Expected a Value::Record but was {obj:?}")
908 }
909 }
910 }
911
912 impl From<TestSingleObjectReader> for Value {
913 fn from(obj: TestSingleObjectReader) -> Value {
914 Value::Record(vec![
915 ("a".into(), obj.a.into()),
916 ("b".into(), obj.b.into()),
917 (
918 "c".into(),
919 Value::Array(obj.c.into_iter().map(|s| s.into()).collect()),
920 ),
921 ])
922 }
923 }
924
925 #[test]
926 fn test_avro_3507_single_object_reader() -> TestResult {
927 let obj = TestSingleObjectReader {
928 a: 42,
929 b: 3.33,
930 c: vec!["cat".into(), "dog".into()],
931 };
932 let mut to_read = Vec::<u8>::new();
933 to_read.extend_from_slice(&[0xC3, 0x01]);
934 to_read.extend_from_slice(
935 &TestSingleObjectReader::get_schema()
936 .fingerprint::<Rabin>()
937 .bytes[..],
938 );
939 encode(
940 &obj.clone().into(),
941 &TestSingleObjectReader::get_schema(),
942 &mut to_read,
943 )
944 .expect("Encode should succeed");
945 let mut to_read = &to_read[..];
946 let generic_reader = GenericSingleObjectReader::new(TestSingleObjectReader::get_schema())
947 .expect("Schema should resolve");
948 let val = generic_reader
949 .read_value(&mut to_read)
950 .expect("Should read");
951 let expected_value: Value = obj.into();
952 assert_eq!(expected_value, val);
953
954 Ok(())
955 }
956
957 #[test]
958 fn avro_3642_test_single_object_reader_incomplete_reads() -> TestResult {
959 let obj = TestSingleObjectReader {
960 a: 42,
961 b: 3.33,
962 c: vec!["cat".into(), "dog".into()],
963 };
964 let to_read_1 = [0xC3, 0x01];
966 let mut to_read_2 = Vec::<u8>::new();
967 to_read_2.extend_from_slice(
968 &TestSingleObjectReader::get_schema()
969 .fingerprint::<Rabin>()
970 .bytes[..],
971 );
972 let mut to_read_3 = Vec::<u8>::new();
973 encode(
974 &obj.clone().into(),
975 &TestSingleObjectReader::get_schema(),
976 &mut to_read_3,
977 )
978 .expect("Encode should succeed");
979 let mut to_read = (&to_read_1[..]).chain(&to_read_2[..]).chain(&to_read_3[..]);
980 let generic_reader = GenericSingleObjectReader::new(TestSingleObjectReader::get_schema())
981 .expect("Schema should resolve");
982 let val = generic_reader
983 .read_value(&mut to_read)
984 .expect("Should read");
985 let expected_value: Value = obj.into();
986 assert_eq!(expected_value, val);
987
988 Ok(())
989 }
990
991 #[test]
992 fn test_avro_3507_reader_parity() -> TestResult {
993 let obj = TestSingleObjectReader {
994 a: 42,
995 b: 3.33,
996 c: vec!["cat".into(), "dog".into()],
997 };
998
999 let mut to_read = Vec::<u8>::new();
1000 to_read.extend_from_slice(&[0xC3, 0x01]);
1001 to_read.extend_from_slice(
1002 &TestSingleObjectReader::get_schema()
1003 .fingerprint::<Rabin>()
1004 .bytes[..],
1005 );
1006 encode(
1007 &obj.clone().into(),
1008 &TestSingleObjectReader::get_schema(),
1009 &mut to_read,
1010 )
1011 .expect("Encode should succeed");
1012 let generic_reader = GenericSingleObjectReader::new(TestSingleObjectReader::get_schema())
1013 .expect("Schema should resolve");
1014 let specific_reader = SpecificSingleObjectReader::<TestSingleObjectReader>::new()
1015 .expect("schema should resolve");
1016 let mut to_read1 = &to_read[..];
1017 let mut to_read2 = &to_read[..];
1018 let mut to_read3 = &to_read[..];
1019
1020 let val = generic_reader
1021 .read_value(&mut to_read1)
1022 .expect("Should read");
1023 let read_obj1 = specific_reader
1024 .read_from_value(&mut to_read2)
1025 .expect("Should read from value");
1026 let read_obj2 = specific_reader
1027 .read(&mut to_read3)
1028 .expect("Should read from deserilize");
1029 let expected_value: Value = obj.clone().into();
1030 assert_eq!(obj, read_obj1);
1031 assert_eq!(obj, read_obj2);
1032 assert_eq!(val, expected_value);
1033
1034 Ok(())
1035 }
1036
1037 #[cfg(not(feature = "snappy"))]
1038 #[test]
1039 fn test_avro_3549_read_not_enabled_codec() {
1040 let snappy_compressed_avro = vec![
1041 79, 98, 106, 1, 4, 22, 97, 118, 114, 111, 46, 115, 99, 104, 101, 109, 97, 210, 1, 123,
1042 34, 102, 105, 101, 108, 100, 115, 34, 58, 91, 123, 34, 110, 97, 109, 101, 34, 58, 34,
1043 110, 117, 109, 34, 44, 34, 116, 121, 112, 101, 34, 58, 34, 115, 116, 114, 105, 110,
1044 103, 34, 125, 93, 44, 34, 110, 97, 109, 101, 34, 58, 34, 101, 118, 101, 110, 116, 34,
1045 44, 34, 110, 97, 109, 101, 115, 112, 97, 99, 101, 34, 58, 34, 101, 120, 97, 109, 112,
1046 108, 101, 110, 97, 109, 101, 115, 112, 97, 99, 101, 34, 44, 34, 116, 121, 112, 101, 34,
1047 58, 34, 114, 101, 99, 111, 114, 100, 34, 125, 20, 97, 118, 114, 111, 46, 99, 111, 100,
1048 101, 99, 12, 115, 110, 97, 112, 112, 121, 0, 213, 209, 241, 208, 200, 110, 164, 47,
1049 203, 25, 90, 235, 161, 167, 195, 177, 2, 20, 4, 12, 6, 49, 50, 51, 115, 38, 58, 0, 213,
1050 209, 241, 208, 200, 110, 164, 47, 203, 25, 90, 235, 161, 167, 195, 177,
1051 ];
1052
1053 if let Err(err) = Reader::new(snappy_compressed_avro.as_slice()) {
1054 assert_eq!("Codec 'snappy' is not supported/enabled", err.to_string());
1055 } else {
1056 panic!("Expected an error in the reading of the codec!");
1057 }
1058 }
1059}