1use crate::{
20 AvroResult, Codec, Error,
21 encode::{encode, encode_internal, encode_to_vec},
22 error::Details,
23 headers::{HeaderBuilder, RabinFingerprintHeader},
24 schema::{NamesRef, ResolvedOwnedSchema, ResolvedSchema, Schema},
25 serde::{AvroSchema, ser_schema::SchemaAwareWriteSerializer},
26 types::Value,
27};
28use serde::Serialize;
29use std::{
30 collections::HashMap, io::Write, marker::PhantomData, mem::ManuallyDrop, ops::RangeInclusive,
31};
32
33const DEFAULT_BLOCK_SIZE: usize = 16000;
34const AVRO_OBJECT_HEADER: &[u8] = b"Obj\x01";
35
36pub struct Writer<'a, W: Write> {
42 schema: &'a Schema,
43 writer: W,
44 resolved_schema: ResolvedSchema<'a>,
45 codec: Codec,
46 block_size: usize,
47 buffer: Vec<u8>,
48 num_values: usize,
49 marker: [u8; 16],
50 has_header: bool,
51 user_metadata: HashMap<String, Value>,
52}
53
54#[bon::bon]
55impl<'a, W: Write> Writer<'a, W> {
56 #[builder]
57 pub fn builder(
58 schema: &'a Schema,
59 schemata: Option<Vec<&'a Schema>>,
60 writer: W,
61 #[builder(default = Codec::Null)] codec: Codec,
62 #[builder(default = DEFAULT_BLOCK_SIZE)] block_size: usize,
63 #[builder(default = generate_sync_marker())] marker: [u8; 16],
64 #[builder(default = false)]
68 has_header: bool,
69 #[builder(default)] user_metadata: HashMap<String, Value>,
70 ) -> AvroResult<Self> {
71 let resolved_schema = if let Some(schemata) = schemata {
72 ResolvedSchema::try_from(schemata)?
73 } else {
74 ResolvedSchema::try_from(schema)?
75 };
76 Ok(Self {
77 schema,
78 writer,
79 resolved_schema,
80 codec,
81 block_size,
82 buffer: Vec::with_capacity(block_size),
83 num_values: 0,
84 marker,
85 has_header,
86 user_metadata,
87 })
88 }
89}
90
91impl<'a, W: Write> Writer<'a, W> {
92 pub fn new(schema: &'a Schema, writer: W) -> AvroResult<Self> {
96 Writer::with_codec(schema, writer, Codec::Null)
97 }
98
99 pub fn with_codec(schema: &'a Schema, writer: W, codec: Codec) -> AvroResult<Self> {
102 Self::builder()
103 .schema(schema)
104 .writer(writer)
105 .codec(codec)
106 .build()
107 }
108
109 pub fn with_schemata(
114 schema: &'a Schema,
115 schemata: Vec<&'a Schema>,
116 writer: W,
117 codec: Codec,
118 ) -> AvroResult<Self> {
119 Self::builder()
120 .schema(schema)
121 .schemata(schemata)
122 .writer(writer)
123 .codec(codec)
124 .build()
125 }
126
127 pub fn append_to(schema: &'a Schema, writer: W, marker: [u8; 16]) -> AvroResult<Self> {
131 Writer::append_to_with_codec(schema, writer, Codec::Null, marker)
132 }
133
134 pub fn append_to_with_codec(
137 schema: &'a Schema,
138 writer: W,
139 codec: Codec,
140 marker: [u8; 16],
141 ) -> AvroResult<Self> {
142 Self::builder()
143 .schema(schema)
144 .writer(writer)
145 .codec(codec)
146 .marker(marker)
147 .has_header(true)
148 .build()
149 }
150
151 pub fn append_to_with_codec_schemata(
154 schema: &'a Schema,
155 schemata: Vec<&'a Schema>,
156 writer: W,
157 codec: Codec,
158 marker: [u8; 16],
159 ) -> AvroResult<Self> {
160 Self::builder()
161 .schema(schema)
162 .schemata(schemata)
163 .writer(writer)
164 .codec(codec)
165 .marker(marker)
166 .has_header(true)
167 .build()
168 }
169
170 pub fn schema(&self) -> &'a Schema {
172 self.schema
173 }
174
175 #[deprecated(since = "0.22.0", note = "Use `Writer::append_value` instead")]
177 pub fn append<T: Into<Value>>(&mut self, value: T) -> AvroResult<usize> {
178 self.append_value(value)
179 }
180
181 pub fn append_value<T: Into<Value>>(&mut self, value: T) -> AvroResult<usize> {
189 let avro = value.into();
190 self.append_value_ref(&avro)
191 }
192
193 pub fn append_value_ref(&mut self, value: &Value) -> AvroResult<usize> {
201 if let Some(reason) = value.validate_internal(
202 self.schema,
203 self.resolved_schema.get_names(),
204 &self.schema.namespace(),
205 ) {
206 return Err(Details::ValidationWithReason {
207 value: value.clone(),
208 schema: self.schema.clone(),
209 reason,
210 }
211 .into());
212 }
213 self.unvalidated_append_value_ref(value)
214 }
215
216 pub fn unvalidated_append_value<T: Into<Value>>(&mut self, value: T) -> AvroResult<usize> {
228 let value = value.into();
229 self.unvalidated_append_value_ref(&value)
230 }
231
232 pub fn unvalidated_append_value_ref(&mut self, value: &Value) -> AvroResult<usize> {
244 let n = self.maybe_write_header()?;
245 encode_internal(
246 value,
247 self.schema,
248 self.resolved_schema.get_names(),
249 &self.schema.namespace(),
250 &mut self.buffer,
251 )?;
252
253 self.num_values += 1;
254
255 if self.buffer.len() >= self.block_size {
256 return self.flush().map(|b| b + n);
257 }
258
259 Ok(n)
260 }
261
262 pub fn append_ser<S: Serialize>(&mut self, value: S) -> AvroResult<usize> {
272 let n = self.maybe_write_header()?;
273
274 let mut serializer = SchemaAwareWriteSerializer::new(
275 &mut self.buffer,
276 self.schema,
277 self.resolved_schema.get_names(),
278 None,
279 );
280 value.serialize(&mut serializer)?;
281 self.num_values += 1;
282
283 if self.buffer.len() >= self.block_size {
284 return self.flush().map(|b| b + n);
285 }
286
287 Ok(n)
288 }
289
290 pub fn extend<I, T: Into<Value>>(&mut self, values: I) -> AvroResult<usize>
297 where
298 I: IntoIterator<Item = T>,
299 {
300 let mut num_bytes = 0;
315 for value in values {
316 num_bytes += self.append_value(value)?;
317 }
318 num_bytes += self.flush()?;
319
320 Ok(num_bytes)
321 }
322
323 pub fn extend_ser<I, T: Serialize>(&mut self, values: I) -> AvroResult<usize>
332 where
333 I: IntoIterator<Item = T>,
334 {
335 let mut num_bytes = 0;
350 for value in values {
351 num_bytes += self.append_ser(value)?;
352 }
353 num_bytes += self.flush()?;
354
355 Ok(num_bytes)
356 }
357
358 pub fn extend_from_slice(&mut self, values: &[Value]) -> AvroResult<usize> {
366 let mut num_bytes = 0;
367 for value in values {
368 num_bytes += self.append_value_ref(value)?;
369 }
370 num_bytes += self.flush()?;
371
372 Ok(num_bytes)
373 }
374
375 pub fn flush(&mut self) -> AvroResult<usize> {
383 let mut num_bytes = self.maybe_write_header()?;
384 if self.num_values == 0 {
385 return Ok(num_bytes);
386 }
387
388 self.codec.compress(&mut self.buffer)?;
389
390 let num_values = self.num_values;
391 let stream_len = self.buffer.len();
392
393 num_bytes += self.append_raw(&num_values.try_into()?, &Schema::Long)?
394 + self.append_raw(&stream_len.try_into()?, &Schema::Long)?
395 + self
396 .writer
397 .write(self.buffer.as_ref())
398 .map_err(Details::WriteBytes)?
399 + self.append_marker()?;
400
401 self.buffer.clear();
402 self.num_values = 0;
403
404 self.writer.flush().map_err(Details::FlushWriter)?;
405
406 Ok(num_bytes)
407 }
408
409 pub fn into_inner(mut self) -> AvroResult<W> {
414 self.maybe_write_header()?;
415 self.flush()?;
416
417 let mut this = ManuallyDrop::new(self);
418
419 let _buffer = std::mem::take(&mut this.buffer);
421 let _user_metadata = std::mem::take(&mut this.user_metadata);
422 unsafe { std::ptr::drop_in_place(&mut this.resolved_schema) };
424
425 let writer = unsafe { std::ptr::read(&this.writer) };
427
428 Ok(writer)
429 }
430
431 pub fn get_ref(&self) -> &W {
436 &self.writer
437 }
438
439 pub fn get_mut(&mut self) -> &mut W {
446 &mut self.writer
447 }
448
449 fn append_marker(&mut self) -> AvroResult<usize> {
451 self.writer
454 .write(&self.marker)
455 .map_err(|e| Details::WriteMarker(e).into())
456 }
457
458 fn append_raw(&mut self, value: &Value, schema: &Schema) -> AvroResult<usize> {
460 self.append_bytes(encode_to_vec(value, schema)?.as_ref())
461 }
462
463 fn append_bytes(&mut self, bytes: &[u8]) -> AvroResult<usize> {
465 self.writer
466 .write(bytes)
467 .map_err(|e| Details::WriteBytes(e).into())
468 }
469
470 pub fn add_user_metadata<T: AsRef<[u8]>>(&mut self, key: String, value: T) -> AvroResult<()> {
473 if !self.has_header {
474 if key.starts_with("avro.") {
475 return Err(Details::InvalidMetadataKey(key).into());
476 }
477 self.user_metadata
478 .insert(key, Value::Bytes(value.as_ref().to_vec()));
479 Ok(())
480 } else {
481 Err(Details::FileHeaderAlreadyWritten.into())
482 }
483 }
484
485 fn header(&self) -> Result<Vec<u8>, Error> {
487 let schema_bytes = serde_json::to_string(self.schema)
488 .map_err(Details::ConvertJsonToString)?
489 .into_bytes();
490
491 let mut metadata = HashMap::with_capacity(2);
492 metadata.insert("avro.schema", Value::Bytes(schema_bytes));
493 if self.codec != Codec::Null {
494 metadata.insert("avro.codec", self.codec.into());
495 }
496 match self.codec {
497 #[cfg(feature = "bzip")]
498 Codec::Bzip2(settings) => {
499 metadata.insert(
500 "avro.codec.compression_level",
501 Value::Bytes(vec![settings.compression_level]),
502 );
503 }
504 #[cfg(feature = "xz")]
505 Codec::Xz(settings) => {
506 metadata.insert(
507 "avro.codec.compression_level",
508 Value::Bytes(vec![settings.compression_level]),
509 );
510 }
511 #[cfg(feature = "zstandard")]
512 Codec::Zstandard(settings) => {
513 metadata.insert(
514 "avro.codec.compression_level",
515 Value::Bytes(vec![settings.compression_level]),
516 );
517 }
518 _ => {}
519 }
520
521 for (k, v) in &self.user_metadata {
522 metadata.insert(k.as_str(), v.clone());
523 }
524
525 let mut header = Vec::new();
526 header.extend_from_slice(AVRO_OBJECT_HEADER);
527 encode(
528 &metadata.into(),
529 &Schema::map(Schema::Bytes).build(),
530 &mut header,
531 )?;
532 header.extend_from_slice(&self.marker);
533
534 Ok(header)
535 }
536
537 fn maybe_write_header(&mut self) -> AvroResult<usize> {
538 if !self.has_header {
539 let header = self.header()?;
540 let n = self.append_bytes(header.as_ref())?;
541 self.has_header = true;
542 Ok(n)
543 } else {
544 Ok(0)
545 }
546 }
547}
548
549pub trait Clearable {
551 fn clear(&mut self);
553}
554
555impl Clearable for Vec<u8> {
556 fn clear(&mut self) {
557 Vec::clear(self);
558 }
559}
560
561impl<'a, W: Clearable + Write> Writer<'a, W> {
562 pub fn reset(&mut self) {
600 self.buffer.clear();
601 self.writer.clear();
602 self.has_header = false;
603 self.num_values = 0;
604 self.user_metadata.clear();
605 self.marker = generate_sync_marker();
606 }
607}
608
609impl<W: Write> Drop for Writer<'_, W> {
610 fn drop(&mut self) {
612 let _ = self.maybe_write_header();
613 let _ = self.flush();
614 }
615}
616
617fn write_avro_datum<T: Into<Value>, W: Write>(
622 schema: &Schema,
623 value: T,
624 writer: &mut W,
625) -> Result<(), Error> {
626 let avro = value.into();
627 if !avro.validate(schema) {
628 return Err(Details::Validation.into());
629 }
630 encode(&avro, schema, writer)?;
631 Ok(())
632}
633
634fn write_avro_datum_schemata<T: Into<Value>>(
635 schema: &Schema,
636 schemata: Vec<&Schema>,
637 value: T,
638 buffer: &mut Vec<u8>,
639) -> AvroResult<usize> {
640 let avro = value.into();
641 let rs = ResolvedSchema::try_from(schemata)?;
642 let names = rs.get_names();
643 let enclosing_namespace = schema.namespace();
644 if let Some(_err) = avro.validate_internal(schema, names, &enclosing_namespace) {
645 return Err(Details::Validation.into());
646 }
647 encode_internal(&avro, schema, names, &enclosing_namespace, buffer)
648}
649
650pub struct GenericSingleObjectWriter {
654 buffer: Vec<u8>,
655 resolved: ResolvedOwnedSchema,
656}
657
658impl GenericSingleObjectWriter {
659 pub fn new_with_capacity(
660 schema: &Schema,
661 initial_buffer_cap: usize,
662 ) -> AvroResult<GenericSingleObjectWriter> {
663 let header_builder = RabinFingerprintHeader::from_schema(schema);
664 Self::new_with_capacity_and_header_builder(schema, initial_buffer_cap, header_builder)
665 }
666
667 pub fn new_with_capacity_and_header_builder<HB: HeaderBuilder>(
668 schema: &Schema,
669 initial_buffer_cap: usize,
670 header_builder: HB,
671 ) -> AvroResult<GenericSingleObjectWriter> {
672 let mut buffer = Vec::with_capacity(initial_buffer_cap);
673 let header = header_builder.build_header();
674 buffer.extend_from_slice(&header);
675
676 Ok(GenericSingleObjectWriter {
677 buffer,
678 resolved: ResolvedOwnedSchema::try_from(schema.clone())?,
679 })
680 }
681
682 const HEADER_LENGTH_RANGE: RangeInclusive<usize> = 10_usize..=20_usize;
683
684 pub fn write_value_ref<W: Write>(&mut self, v: &Value, writer: &mut W) -> AvroResult<usize> {
686 let original_length = self.buffer.len();
687 if !Self::HEADER_LENGTH_RANGE.contains(&original_length) {
688 Err(Details::IllegalSingleObjectWriterState.into())
689 } else {
690 write_value_ref_owned_resolved(&self.resolved, v, &mut self.buffer)?;
691 writer
692 .write_all(&self.buffer)
693 .map_err(Details::WriteBytes)?;
694 let len = self.buffer.len();
695 self.buffer.truncate(original_length);
696 Ok(len)
697 }
698 }
699
700 pub fn write_value<W: Write>(&mut self, v: Value, writer: &mut W) -> AvroResult<usize> {
702 self.write_value_ref(&v, writer)
703 }
704}
705
706pub struct SpecificSingleObjectWriter<T>
708where
709 T: AvroSchema,
710{
711 resolved: ResolvedOwnedSchema,
712 header: Vec<u8>,
713 _model: PhantomData<T>,
714}
715
716impl<T> SpecificSingleObjectWriter<T>
717where
718 T: AvroSchema,
719{
720 pub fn new() -> AvroResult<Self> {
721 let schema = T::get_schema();
722 let header = RabinFingerprintHeader::from_schema(&schema).build_header();
723 let resolved = ResolvedOwnedSchema::new(schema)?;
724 Ok(Self {
726 resolved,
727 header,
728 _model: PhantomData,
729 })
730 }
731
732 pub fn new_with_header_builder(header_builder: impl HeaderBuilder) -> AvroResult<Self> {
733 let header = header_builder.build_header();
734 let resolved = ResolvedOwnedSchema::new(T::get_schema())?;
735 Ok(Self {
736 resolved,
737 header,
738 _model: PhantomData,
739 })
740 }
741
742 #[deprecated(since = "0.22.0", note = "Use new() instead")]
744 pub fn with_capacity(_buffer_cap: usize) -> AvroResult<Self> {
745 Self::new()
746 }
747}
748
749impl<T> SpecificSingleObjectWriter<T>
750where
751 T: AvroSchema + Into<Value>,
752{
753 pub fn write_value<W: Write>(&self, data: T, writer: &mut W) -> AvroResult<usize> {
760 writer
761 .write_all(&self.header)
762 .map_err(Details::WriteBytes)?;
763 let value: Value = data.into();
764 let bytes = write_value_ref_owned_resolved(&self.resolved, &value, writer)?;
765 Ok(bytes + self.header.len())
766 }
767}
768
769impl<T> SpecificSingleObjectWriter<T>
770where
771 T: AvroSchema + Serialize,
772{
773 pub fn write_ref<W: Write>(&self, data: &T, writer: &mut W) -> AvroResult<usize> {
780 writer
781 .write_all(&self.header)
782 .map_err(Details::WriteBytes)?;
783
784 let bytes = write_avro_datum_ref(
785 self.resolved.get_root_schema(),
786 self.resolved.get_names(),
787 data,
788 writer,
789 )?;
790
791 Ok(bytes + self.header.len())
792 }
793
794 pub fn write<W: Write>(&self, data: T, writer: &mut W) -> AvroResult<usize> {
801 self.write_ref(&data, writer)
802 }
803}
804
805fn write_value_ref_owned_resolved<W: Write>(
806 resolved_schema: &ResolvedOwnedSchema,
807 value: &Value,
808 writer: &mut W,
809) -> AvroResult<usize> {
810 let root_schema = resolved_schema.get_root_schema();
811 if let Some(reason) = value.validate_internal(
812 root_schema,
813 resolved_schema.get_names(),
814 &root_schema.namespace(),
815 ) {
816 return Err(Details::ValidationWithReason {
817 value: value.clone(),
818 schema: root_schema.clone(),
819 reason,
820 }
821 .into());
822 }
823 encode_internal(
824 value,
825 root_schema,
826 resolved_schema.get_names(),
827 &root_schema.namespace(),
828 writer,
829 )
830}
831
832pub fn to_avro_datum<T: Into<Value>>(schema: &Schema, value: T) -> AvroResult<Vec<u8>> {
838 let mut buffer = Vec::new();
839 write_avro_datum(schema, value, &mut buffer)?;
840 Ok(buffer)
841}
842
843pub fn write_avro_datum_ref<T: Serialize, W: Write>(
851 schema: &Schema,
852 names: &NamesRef,
853 data: &T,
854 writer: &mut W,
855) -> AvroResult<usize> {
856 let mut serializer = SchemaAwareWriteSerializer::new(writer, schema, names, None);
857 data.serialize(&mut serializer)
858}
859
860pub fn to_avro_datum_schemata<T: Into<Value>>(
865 schema: &Schema,
866 schemata: Vec<&Schema>,
867 value: T,
868) -> AvroResult<Vec<u8>> {
869 let mut buffer = Vec::new();
870 write_avro_datum_schemata(schema, schemata, value, &mut buffer)?;
871 Ok(buffer)
872}
873
874#[cfg(not(target_arch = "wasm32"))]
875fn generate_sync_marker() -> [u8; 16] {
876 rand::random()
877}
878
879#[cfg(target_arch = "wasm32")]
880fn generate_sync_marker() -> [u8; 16] {
881 let mut marker = [0_u8; 16];
882 std::iter::repeat_with(quad_rand::rand)
883 .take(4)
884 .flat_map(|i| i.to_be_bytes())
885 .enumerate()
886 .for_each(|(i, n)| marker[i] = n);
887 marker
888}
889
890#[cfg(test)]
891mod tests {
892 use std::{cell::RefCell, rc::Rc};
893
894 use super::*;
895 use crate::{
896 Reader,
897 decimal::Decimal,
898 duration::{Days, Duration, Millis, Months},
899 headers::GlueSchemaUuidHeader,
900 rabin::Rabin,
901 schema::{DecimalSchema, FixedSchema, Name},
902 types::Record,
903 util::zig_i64,
904 };
905 use pretty_assertions::assert_eq;
906 use serde::{Deserialize, Serialize};
907 use uuid::Uuid;
908
909 use crate::schema::InnerDecimalSchema;
910 use crate::{codec::DeflateSettings, error::Details};
911 use apache_avro_test_helper::TestResult;
912
913 const AVRO_OBJECT_HEADER_LEN: usize = AVRO_OBJECT_HEADER.len();
914
915 const SCHEMA: &str = r#"
916 {
917 "type": "record",
918 "name": "test",
919 "fields": [
920 {
921 "name": "a",
922 "type": "long",
923 "default": 42
924 },
925 {
926 "name": "b",
927 "type": "string"
928 }
929 ]
930 }
931 "#;
932
933 const UNION_SCHEMA: &str = r#"["null", "long"]"#;
934
935 #[test]
936 fn test_to_avro_datum() -> TestResult {
937 let schema = Schema::parse_str(SCHEMA)?;
938 let mut record = Record::new(&schema).unwrap();
939 record.put("a", 27i64);
940 record.put("b", "foo");
941
942 let mut expected = Vec::new();
943 zig_i64(27, &mut expected)?;
944 zig_i64(3, &mut expected)?;
945 expected.extend([b'f', b'o', b'o']);
946
947 assert_eq!(to_avro_datum(&schema, record)?, expected);
948
949 Ok(())
950 }
951
952 #[test]
953 fn avro_rs_193_write_avro_datum_ref() -> TestResult {
954 #[derive(Serialize)]
955 struct TestStruct {
956 a: i64,
957 b: String,
958 }
959
960 let schema = Schema::parse_str(SCHEMA)?;
961 let mut writer: Vec<u8> = Vec::new();
962 let data = TestStruct {
963 a: 27,
964 b: "foo".to_string(),
965 };
966
967 let mut expected = Vec::new();
968 zig_i64(27, &mut expected)?;
969 zig_i64(3, &mut expected)?;
970 expected.extend([b'f', b'o', b'o']);
971
972 let bytes = write_avro_datum_ref(&schema, &HashMap::new(), &data, &mut writer)?;
973
974 assert_eq!(bytes, expected.len());
975 assert_eq!(writer, expected);
976
977 Ok(())
978 }
979
980 #[test]
981 fn avro_rs_220_flush_write_header() -> TestResult {
982 let schema = Schema::parse_str(SCHEMA)?;
983
984 let mut writer = Writer::new(&schema, Vec::new())?;
986 writer.flush()?;
987 let result = writer.into_inner()?;
988 assert_eq!(result.len(), 147);
989
990 let mut writer = Writer::builder()
992 .has_header(true)
993 .schema(&schema)
994 .writer(Vec::new())
995 .build()?;
996 writer.flush()?;
997 let result = writer.into_inner()?;
998 assert_eq!(result.len(), 0);
999
1000 Ok(())
1001 }
1002
1003 #[test]
1004 fn test_union_not_null() -> TestResult {
1005 let schema = Schema::parse_str(UNION_SCHEMA)?;
1006 let union = Value::Union(1, Box::new(Value::Long(3)));
1007
1008 let mut expected = Vec::new();
1009 zig_i64(1, &mut expected)?;
1010 zig_i64(3, &mut expected)?;
1011
1012 assert_eq!(to_avro_datum(&schema, union)?, expected);
1013
1014 Ok(())
1015 }
1016
1017 #[test]
1018 fn test_union_null() -> TestResult {
1019 let schema = Schema::parse_str(UNION_SCHEMA)?;
1020 let union = Value::Union(0, Box::new(Value::Null));
1021
1022 let mut expected = Vec::new();
1023 zig_i64(0, &mut expected)?;
1024
1025 assert_eq!(to_avro_datum(&schema, union)?, expected);
1026
1027 Ok(())
1028 }
1029
1030 fn logical_type_test<T: Into<Value> + Clone>(
1031 schema_str: &'static str,
1032
1033 expected_schema: &Schema,
1034 value: Value,
1035
1036 raw_schema: &Schema,
1037 raw_value: T,
1038 ) -> TestResult {
1039 let schema = Schema::parse_str(schema_str)?;
1040 assert_eq!(&schema, expected_schema);
1041 let ser = to_avro_datum(&schema, value.clone())?;
1043 let raw_ser = to_avro_datum(raw_schema, raw_value)?;
1044 assert_eq!(ser, raw_ser);
1045
1046 let mut r = ser.as_slice();
1048 let de = crate::from_avro_datum(&schema, &mut r, None)?;
1049 assert_eq!(de, value);
1050 Ok(())
1051 }
1052
1053 #[test]
1054 fn date() -> TestResult {
1055 logical_type_test(
1056 r#"{"type": "int", "logicalType": "date"}"#,
1057 &Schema::Date,
1058 Value::Date(1_i32),
1059 &Schema::Int,
1060 1_i32,
1061 )
1062 }
1063
1064 #[test]
1065 fn time_millis() -> TestResult {
1066 logical_type_test(
1067 r#"{"type": "int", "logicalType": "time-millis"}"#,
1068 &Schema::TimeMillis,
1069 Value::TimeMillis(1_i32),
1070 &Schema::Int,
1071 1_i32,
1072 )
1073 }
1074
1075 #[test]
1076 fn time_micros() -> TestResult {
1077 logical_type_test(
1078 r#"{"type": "long", "logicalType": "time-micros"}"#,
1079 &Schema::TimeMicros,
1080 Value::TimeMicros(1_i64),
1081 &Schema::Long,
1082 1_i64,
1083 )
1084 }
1085
1086 #[test]
1087 fn timestamp_millis() -> TestResult {
1088 logical_type_test(
1089 r#"{"type": "long", "logicalType": "timestamp-millis"}"#,
1090 &Schema::TimestampMillis,
1091 Value::TimestampMillis(1_i64),
1092 &Schema::Long,
1093 1_i64,
1094 )
1095 }
1096
1097 #[test]
1098 fn timestamp_micros() -> TestResult {
1099 logical_type_test(
1100 r#"{"type": "long", "logicalType": "timestamp-micros"}"#,
1101 &Schema::TimestampMicros,
1102 Value::TimestampMicros(1_i64),
1103 &Schema::Long,
1104 1_i64,
1105 )
1106 }
1107
1108 #[test]
1109 fn decimal_fixed() -> TestResult {
1110 let size = 30;
1111 let fixed = FixedSchema {
1112 name: Name::new("decimal")?,
1113 aliases: None,
1114 doc: None,
1115 size,
1116 attributes: Default::default(),
1117 };
1118 let inner = InnerDecimalSchema::Fixed(fixed.clone());
1119 let value = vec![0u8; size];
1120 logical_type_test(
1121 r#"{"type": {"type": "fixed", "size": 30, "name": "decimal"}, "logicalType": "decimal", "precision": 20, "scale": 5}"#,
1122 &Schema::Decimal(DecimalSchema {
1123 precision: 20,
1124 scale: 5,
1125 inner,
1126 }),
1127 Value::Decimal(Decimal::from(value.clone())),
1128 &Schema::Fixed(fixed),
1129 Value::Fixed(size, value),
1130 )
1131 }
1132
1133 #[test]
1134 fn decimal_bytes() -> TestResult {
1135 let value = vec![0u8; 10];
1136 logical_type_test(
1137 r#"{"type": "bytes", "logicalType": "decimal", "precision": 4, "scale": 3}"#,
1138 &Schema::Decimal(DecimalSchema {
1139 precision: 4,
1140 scale: 3,
1141 inner: InnerDecimalSchema::Bytes,
1142 }),
1143 Value::Decimal(Decimal::from(value.clone())),
1144 &Schema::Bytes,
1145 value,
1146 )
1147 }
1148
1149 #[test]
1150 fn duration() -> TestResult {
1151 let inner = Schema::Fixed(FixedSchema {
1152 name: Name::new("duration")?,
1153 aliases: None,
1154 doc: None,
1155 size: 12,
1156 attributes: Default::default(),
1157 });
1158 let value = Value::Duration(Duration::new(
1159 Months::new(256),
1160 Days::new(512),
1161 Millis::new(1024),
1162 ));
1163 logical_type_test(
1164 r#"{"type": {"type": "fixed", "name": "duration", "size": 12}, "logicalType": "duration"}"#,
1165 &Schema::Duration(FixedSchema {
1166 name: Name::try_from("duration").expect("Name is valid"),
1167 aliases: None,
1168 doc: None,
1169 size: 12,
1170 attributes: Default::default(),
1171 }),
1172 value,
1173 &inner,
1174 Value::Fixed(12, vec![0, 1, 0, 0, 0, 2, 0, 0, 0, 4, 0, 0]),
1175 )
1176 }
1177
1178 #[test]
1179 fn test_writer_append() -> TestResult {
1180 let schema = Schema::parse_str(SCHEMA)?;
1181 let mut writer = Writer::new(&schema, Vec::new())?;
1182
1183 let mut record = Record::new(&schema).unwrap();
1184 record.put("a", 27i64);
1185 record.put("b", "foo");
1186
1187 let n1 = writer.append_value(record.clone())?;
1188 let n2 = writer.append_value(record.clone())?;
1189 let n3 = writer.flush()?;
1190 let result = writer.into_inner()?;
1191
1192 assert_eq!(n1 + n2 + n3, result.len());
1193
1194 let mut data = Vec::new();
1195 zig_i64(27, &mut data)?;
1196 zig_i64(3, &mut data)?;
1197 data.extend(b"foo");
1198 data.extend(data.clone());
1199
1200 assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1202 let last_data_byte = result.len() - 16;
1204 assert_eq!(
1205 &result[last_data_byte - data.len()..last_data_byte],
1206 data.as_slice()
1207 );
1208
1209 Ok(())
1210 }
1211
1212 #[test]
1213 fn test_writer_extend() -> TestResult {
1214 let schema = Schema::parse_str(SCHEMA)?;
1215 let mut writer = Writer::new(&schema, Vec::new())?;
1216
1217 let mut record = Record::new(&schema).unwrap();
1218 record.put("a", 27i64);
1219 record.put("b", "foo");
1220 let record_copy = record.clone();
1221 let records = vec![record, record_copy];
1222
1223 let n1 = writer.extend(records)?;
1224 let n2 = writer.flush()?;
1225 let result = writer.into_inner()?;
1226
1227 assert_eq!(n1 + n2, result.len());
1228
1229 let mut data = Vec::new();
1230 zig_i64(27, &mut data)?;
1231 zig_i64(3, &mut data)?;
1232 data.extend(b"foo");
1233 data.extend(data.clone());
1234
1235 assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1237 let last_data_byte = result.len() - 16;
1239 assert_eq!(
1240 &result[last_data_byte - data.len()..last_data_byte],
1241 data.as_slice()
1242 );
1243
1244 Ok(())
1245 }
1246
1247 #[derive(Debug, Clone, Deserialize, Serialize)]
1248 struct TestSerdeSerialize {
1249 a: i64,
1250 b: String,
1251 }
1252
1253 #[test]
1254 fn test_writer_append_ser() -> TestResult {
1255 let schema = Schema::parse_str(SCHEMA)?;
1256 let mut writer = Writer::new(&schema, Vec::new())?;
1257
1258 let record = TestSerdeSerialize {
1259 a: 27,
1260 b: "foo".to_owned(),
1261 };
1262
1263 let n1 = writer.append_ser(record)?;
1264 let n2 = writer.flush()?;
1265 let result = writer.into_inner()?;
1266
1267 assert_eq!(n1 + n2, result.len());
1268
1269 let mut data = Vec::new();
1270 zig_i64(27, &mut data)?;
1271 zig_i64(3, &mut data)?;
1272 data.extend(b"foo");
1273
1274 assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1276 let last_data_byte = result.len() - 16;
1278 assert_eq!(
1279 &result[last_data_byte - data.len()..last_data_byte],
1280 data.as_slice()
1281 );
1282
1283 Ok(())
1284 }
1285
1286 #[test]
1287 fn test_writer_extend_ser() -> TestResult {
1288 let schema = Schema::parse_str(SCHEMA)?;
1289 let mut writer = Writer::new(&schema, Vec::new())?;
1290
1291 let record = TestSerdeSerialize {
1292 a: 27,
1293 b: "foo".to_owned(),
1294 };
1295 let record_copy = record.clone();
1296 let records = vec![record, record_copy];
1297
1298 let n1 = writer.extend_ser(records)?;
1299 let n2 = writer.flush()?;
1300 let result = writer.into_inner()?;
1301
1302 assert_eq!(n1 + n2, result.len());
1303
1304 let mut data = Vec::new();
1305 zig_i64(27, &mut data)?;
1306 zig_i64(3, &mut data)?;
1307 data.extend(b"foo");
1308 data.extend(data.clone());
1309
1310 assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1312 let last_data_byte = result.len() - 16;
1314 assert_eq!(
1315 &result[last_data_byte - data.len()..last_data_byte],
1316 data.as_slice()
1317 );
1318
1319 Ok(())
1320 }
1321
1322 fn make_writer_with_codec(schema: &Schema) -> AvroResult<Writer<'_, Vec<u8>>> {
1323 Writer::with_codec(
1324 schema,
1325 Vec::new(),
1326 Codec::Deflate(DeflateSettings::default()),
1327 )
1328 }
1329
1330 fn make_writer_with_builder(schema: &Schema) -> AvroResult<Writer<'_, Vec<u8>>> {
1331 Writer::builder()
1332 .writer(Vec::new())
1333 .schema(schema)
1334 .codec(Codec::Deflate(DeflateSettings::default()))
1335 .block_size(100)
1336 .build()
1337 }
1338
1339 fn check_writer(mut writer: Writer<'_, Vec<u8>>, schema: &Schema) -> TestResult {
1340 let mut record = Record::new(schema).unwrap();
1341 record.put("a", 27i64);
1342 record.put("b", "foo");
1343
1344 let n1 = writer.append_value(record.clone())?;
1345 let n2 = writer.append_value(record.clone())?;
1346 let n3 = writer.flush()?;
1347 let result = writer.into_inner()?;
1348
1349 assert_eq!(n1 + n2 + n3, result.len());
1350
1351 let mut data = Vec::new();
1352 zig_i64(27, &mut data)?;
1353 zig_i64(3, &mut data)?;
1354 data.extend(b"foo");
1355 data.extend(data.clone());
1356 Codec::Deflate(DeflateSettings::default()).compress(&mut data)?;
1357
1358 assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1360 let last_data_byte = result.len() - 16;
1362 assert_eq!(
1363 &result[last_data_byte - data.len()..last_data_byte],
1364 data.as_slice()
1365 );
1366
1367 Ok(())
1368 }
1369
1370 #[test]
1371 fn test_writer_with_codec() -> TestResult {
1372 let schema = Schema::parse_str(SCHEMA)?;
1373 let writer = make_writer_with_codec(&schema)?;
1374 check_writer(writer, &schema)
1375 }
1376
1377 #[test]
1378 fn test_writer_with_builder() -> TestResult {
1379 let schema = Schema::parse_str(SCHEMA)?;
1380 let writer = make_writer_with_builder(&schema)?;
1381 check_writer(writer, &schema)
1382 }
1383
1384 #[test]
1385 fn test_logical_writer() -> TestResult {
1386 const LOGICAL_TYPE_SCHEMA: &str = r#"
1387 {
1388 "type": "record",
1389 "name": "logical_type_test",
1390 "fields": [
1391 {
1392 "name": "a",
1393 "type": [
1394 "null",
1395 {
1396 "type": "long",
1397 "logicalType": "timestamp-micros"
1398 }
1399 ]
1400 }
1401 ]
1402 }
1403 "#;
1404 let codec = Codec::Deflate(DeflateSettings::default());
1405 let schema = Schema::parse_str(LOGICAL_TYPE_SCHEMA)?;
1406 let mut writer = Writer::builder()
1407 .schema(&schema)
1408 .codec(codec)
1409 .writer(Vec::new())
1410 .build()?;
1411
1412 let mut record1 = Record::new(&schema).unwrap();
1413 record1.put(
1414 "a",
1415 Value::Union(1, Box::new(Value::TimestampMicros(1234_i64))),
1416 );
1417
1418 let mut record2 = Record::new(&schema).unwrap();
1419 record2.put("a", Value::Union(0, Box::new(Value::Null)));
1420
1421 let n1 = writer.append_value(record1)?;
1422 let n2 = writer.append_value(record2)?;
1423 let n3 = writer.flush()?;
1424 let result = writer.into_inner()?;
1425
1426 assert_eq!(n1 + n2 + n3, result.len());
1427
1428 let mut data = Vec::new();
1429 zig_i64(1, &mut data)?;
1431 zig_i64(1234, &mut data)?;
1432
1433 zig_i64(0, &mut data)?;
1435 codec.compress(&mut data)?;
1436
1437 assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1439 let last_data_byte = result.len() - 16;
1441 assert_eq!(
1442 &result[last_data_byte - data.len()..last_data_byte],
1443 data.as_slice()
1444 );
1445
1446 Ok(())
1447 }
1448
1449 #[test]
1450 fn test_avro_3405_writer_add_metadata_success() -> TestResult {
1451 let schema = Schema::parse_str(SCHEMA)?;
1452 let mut writer = Writer::new(&schema, Vec::new())?;
1453
1454 writer.add_user_metadata("stringKey".to_string(), String::from("stringValue"))?;
1455 writer.add_user_metadata("strKey".to_string(), "strValue")?;
1456 writer.add_user_metadata("bytesKey".to_string(), b"bytesValue")?;
1457 writer.add_user_metadata("vecKey".to_string(), vec![1, 2, 3])?;
1458
1459 let mut record = Record::new(&schema).unwrap();
1460 record.put("a", 27i64);
1461 record.put("b", "foo");
1462
1463 writer.append_value(record.clone())?;
1464 writer.append_value(record.clone())?;
1465 writer.flush()?;
1466 let result = writer.into_inner()?;
1467
1468 assert_eq!(result.len(), 244);
1469
1470 Ok(())
1471 }
1472
1473 #[test]
1474 fn test_avro_3881_metadata_empty_body() -> TestResult {
1475 let schema = Schema::parse_str(SCHEMA)?;
1476 let mut writer = Writer::new(&schema, Vec::new())?;
1477 writer.add_user_metadata("a".to_string(), "b")?;
1478 let result = writer.into_inner()?;
1479
1480 let reader = Reader::builder(&result[..])
1481 .reader_schema(&schema)
1482 .build()?;
1483 let mut expected = HashMap::new();
1484 expected.insert("a".to_string(), vec![b'b']);
1485 assert_eq!(reader.user_metadata(), &expected);
1486 assert_eq!(reader.into_iter().count(), 0);
1487
1488 Ok(())
1489 }
1490
1491 #[test]
1492 fn test_avro_3405_writer_add_metadata_failure() -> TestResult {
1493 let schema = Schema::parse_str(SCHEMA)?;
1494 let mut writer = Writer::new(&schema, Vec::new())?;
1495
1496 let mut record = Record::new(&schema).unwrap();
1497 record.put("a", 27i64);
1498 record.put("b", "foo");
1499 writer.append_value(record.clone())?;
1500
1501 match writer
1502 .add_user_metadata("stringKey".to_string(), String::from("value2"))
1503 .map_err(Error::into_details)
1504 {
1505 Err(e @ Details::FileHeaderAlreadyWritten) => {
1506 assert_eq!(e.to_string(), "The file metadata is already flushed.")
1507 }
1508 Err(e) => panic!("Unexpected error occurred while writing user metadata: {e:?}"),
1509 Ok(_) => panic!("Expected an error that metadata cannot be added after adding data"),
1510 }
1511
1512 Ok(())
1513 }
1514
1515 #[test]
1516 fn test_avro_3405_writer_add_metadata_reserved_prefix_failure() -> TestResult {
1517 let schema = Schema::parse_str(SCHEMA)?;
1518 let mut writer = Writer::new(&schema, Vec::new())?;
1519
1520 let key = "avro.stringKey".to_string();
1521 match writer
1522 .add_user_metadata(key.clone(), "value")
1523 .map_err(Error::into_details)
1524 {
1525 Err(ref e @ Details::InvalidMetadataKey(_)) => {
1526 assert_eq!(
1527 e.to_string(),
1528 format!(
1529 "Metadata keys starting with 'avro.' are reserved for internal usage: {key}."
1530 )
1531 )
1532 }
1533 Err(e) => panic!(
1534 "Unexpected error occurred while writing user metadata with reserved prefix ('avro.'): {e:?}"
1535 ),
1536 Ok(_) => {
1537 panic!("Expected an error that the metadata key cannot be prefixed with 'avro.'")
1538 }
1539 }
1540
1541 Ok(())
1542 }
1543
1544 #[test]
1545 fn test_avro_3405_writer_add_metadata_with_builder_api_success() -> TestResult {
1546 let schema = Schema::parse_str(SCHEMA)?;
1547
1548 let mut user_meta_data: HashMap<String, Value> = HashMap::new();
1549 user_meta_data.insert(
1550 "stringKey".to_string(),
1551 Value::String("stringValue".to_string()),
1552 );
1553 user_meta_data.insert("bytesKey".to_string(), Value::Bytes(b"bytesValue".to_vec()));
1554 user_meta_data.insert("vecKey".to_string(), Value::Bytes(vec![1, 2, 3]));
1555
1556 let writer: Writer<'_, Vec<u8>> = Writer::builder()
1557 .writer(Vec::new())
1558 .schema(&schema)
1559 .user_metadata(user_meta_data.clone())
1560 .build()?;
1561
1562 assert_eq!(writer.user_metadata, user_meta_data);
1563
1564 Ok(())
1565 }
1566
1567 #[derive(Serialize, Clone)]
1568 struct TestSingleObjectWriter {
1569 a: i64,
1570 b: f64,
1571 c: Vec<String>,
1572 }
1573
1574 impl AvroSchema for TestSingleObjectWriter {
1575 fn get_schema() -> Schema {
1576 let schema = r#"
1577 {
1578 "type":"record",
1579 "name":"TestSingleObjectWrtierSerialize",
1580 "fields":[
1581 {
1582 "name":"a",
1583 "type":"long"
1584 },
1585 {
1586 "name":"b",
1587 "type":"double"
1588 },
1589 {
1590 "name":"c",
1591 "type":{
1592 "type":"array",
1593 "items":"string"
1594 }
1595 }
1596 ]
1597 }
1598 "#;
1599 Schema::parse_str(schema).unwrap()
1600 }
1601 }
1602
1603 impl From<TestSingleObjectWriter> for Value {
1604 fn from(obj: TestSingleObjectWriter) -> Value {
1605 Value::Record(vec![
1606 ("a".into(), obj.a.into()),
1607 ("b".into(), obj.b.into()),
1608 (
1609 "c".into(),
1610 Value::Array(obj.c.into_iter().map(|s| s.into()).collect()),
1611 ),
1612 ])
1613 }
1614 }
1615
1616 #[test]
1617 fn test_single_object_writer() -> TestResult {
1618 let mut buf: Vec<u8> = Vec::new();
1619 let obj = TestSingleObjectWriter {
1620 a: 300,
1621 b: 34.555,
1622 c: vec!["cat".into(), "dog".into()],
1623 };
1624 let mut writer = GenericSingleObjectWriter::new_with_capacity(
1625 &TestSingleObjectWriter::get_schema(),
1626 1024,
1627 )
1628 .expect("Should resolve schema");
1629 let value = obj.into();
1630 let written_bytes = writer
1631 .write_value_ref(&value, &mut buf)
1632 .expect("Error serializing properly");
1633
1634 assert!(buf.len() > 10, "no bytes written");
1635 assert_eq!(buf.len(), written_bytes);
1636 assert_eq!(buf[0], 0xC3);
1637 assert_eq!(buf[1], 0x01);
1638 assert_eq!(
1639 &buf[2..10],
1640 &TestSingleObjectWriter::get_schema()
1641 .fingerprint::<Rabin>()
1642 .bytes[..]
1643 );
1644 let mut msg_binary = Vec::new();
1645 encode(
1646 &value,
1647 &TestSingleObjectWriter::get_schema(),
1648 &mut msg_binary,
1649 )
1650 .expect("encode should have failed by here as a dependency of any writing");
1651 assert_eq!(&buf[10..], &msg_binary[..]);
1652
1653 Ok(())
1654 }
1655
1656 #[test]
1657 fn test_single_object_writer_with_header_builder() -> TestResult {
1658 let mut buf: Vec<u8> = Vec::new();
1659 let obj = TestSingleObjectWriter {
1660 a: 300,
1661 b: 34.555,
1662 c: vec!["cat".into(), "dog".into()],
1663 };
1664 let schema_uuid = Uuid::parse_str("b2f1cf00-0434-013e-439a-125eb8485a5f")?;
1665 let header_builder = GlueSchemaUuidHeader::from_uuid(schema_uuid);
1666 let mut writer = GenericSingleObjectWriter::new_with_capacity_and_header_builder(
1667 &TestSingleObjectWriter::get_schema(),
1668 1024,
1669 header_builder,
1670 )
1671 .expect("Should resolve schema");
1672 let value = obj.into();
1673 writer
1674 .write_value_ref(&value, &mut buf)
1675 .expect("Error serializing properly");
1676
1677 assert_eq!(buf[0], 0x03);
1678 assert_eq!(buf[1], 0x00);
1679 assert_eq!(buf[2..18], schema_uuid.into_bytes()[..]);
1680 Ok(())
1681 }
1682
1683 #[test]
1684 fn test_writer_parity() -> TestResult {
1685 let obj1 = TestSingleObjectWriter {
1686 a: 300,
1687 b: 34.555,
1688 c: vec!["cat".into(), "dog".into()],
1689 };
1690
1691 let mut buf1: Vec<u8> = Vec::new();
1692 let mut buf2: Vec<u8> = Vec::new();
1693 let mut buf3: Vec<u8> = Vec::new();
1694 let mut buf4: Vec<u8> = Vec::new();
1695
1696 let mut generic_writer = GenericSingleObjectWriter::new_with_capacity(
1697 &TestSingleObjectWriter::get_schema(),
1698 1024,
1699 )
1700 .expect("Should resolve schema");
1701 let specific_writer = SpecificSingleObjectWriter::<TestSingleObjectWriter>::new()
1702 .expect("Resolved should pass");
1703 specific_writer
1704 .write_ref(&obj1, &mut buf1)
1705 .expect("Serialization expected");
1706 specific_writer
1707 .write_ref(&obj1, &mut buf2)
1708 .expect("Serialization expected");
1709 specific_writer
1710 .write_value(obj1.clone(), &mut buf3)
1711 .expect("Serialization expected");
1712
1713 generic_writer
1714 .write_value(obj1.into(), &mut buf4)
1715 .expect("Serialization expected");
1716
1717 assert_eq!(buf1, buf2);
1718 assert_eq!(buf2, buf3);
1719 assert_eq!(buf3, buf4);
1720
1721 Ok(())
1722 }
1723
1724 #[test]
1725 fn avro_3894_take_aliases_into_account_when_serializing() -> TestResult {
1726 const SCHEMA: &str = r#"
1727 {
1728 "type": "record",
1729 "name": "Conference",
1730 "fields": [
1731 {"type": "string", "name": "name"},
1732 {"type": ["null", "long"], "name": "date", "aliases" : [ "time2", "time" ]}
1733 ]
1734 }"#;
1735
1736 #[derive(Debug, PartialEq, Eq, Clone, Serialize)]
1737 pub struct Conference {
1738 pub name: String,
1739 pub time: Option<i64>,
1740 }
1741
1742 let conf = Conference {
1743 name: "RustConf".to_string(),
1744 time: Some(1234567890),
1745 };
1746
1747 let schema = Schema::parse_str(SCHEMA)?;
1748 let mut writer = Writer::new(&schema, Vec::new())?;
1749
1750 let bytes = writer.append_ser(conf)?;
1751
1752 assert_eq!(182, bytes);
1753
1754 Ok(())
1755 }
1756
1757 #[test]
1758 fn avro_4014_validation_returns_a_detailed_error() -> TestResult {
1759 const SCHEMA: &str = r#"
1760 {
1761 "type": "record",
1762 "name": "Conference",
1763 "fields": [
1764 {"type": "string", "name": "name"},
1765 {"type": ["null", "long"], "name": "date", "aliases" : [ "time2", "time" ]}
1766 ]
1767 }"#;
1768
1769 #[derive(Debug, PartialEq, Clone, Serialize)]
1770 pub struct Conference {
1771 pub name: String,
1772 pub time: Option<f64>, }
1774
1775 let conf = Conference {
1776 name: "RustConf".to_string(),
1777 time: Some(12345678.90),
1778 };
1779
1780 let schema = Schema::parse_str(SCHEMA)?;
1781 let mut writer = Writer::new(&schema, Vec::new())?;
1782
1783 match writer.append_ser(conf) {
1784 Ok(bytes) => panic!("Expected an error, but got {bytes} bytes written"),
1785 Err(e) => {
1786 assert_eq!(
1787 e.to_string(),
1788 r#"Failed to serialize field 'time' for record Record(RecordSchema { name: Name { name: "Conference", namespace: None }, fields: [RecordField { name: "name", schema: String, .. }, RecordField { name: "date", aliases: ["time2", "time"], schema: Union(UnionSchema { schemas: [Null, Long] }), .. }], .. }): Failed to serialize value of type f64 using schema Union(UnionSchema { schemas: [Null, Long] }): 12345678.9. Cause: Cannot find a Double schema in [Null, Long]"#
1789 );
1790 }
1791 }
1792 Ok(())
1793 }
1794
1795 #[test]
1796 fn avro_4063_flush_applies_to_inner_writer() -> TestResult {
1797 const SCHEMA: &str = r#"
1798 {
1799 "type": "record",
1800 "name": "ExampleSchema",
1801 "fields": [
1802 {"name": "exampleField", "type": "string"}
1803 ]
1804 }
1805 "#;
1806
1807 #[derive(Clone, Default)]
1808 struct TestBuffer(Rc<RefCell<Vec<u8>>>);
1809
1810 impl TestBuffer {
1811 fn len(&self) -> usize {
1812 self.0.borrow().len()
1813 }
1814 }
1815
1816 impl Write for TestBuffer {
1817 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
1818 self.0.borrow_mut().write(buf)
1819 }
1820
1821 fn flush(&mut self) -> std::io::Result<()> {
1822 Ok(())
1823 }
1824 }
1825
1826 let shared_buffer = TestBuffer::default();
1827
1828 let buffered_writer = std::io::BufWriter::new(shared_buffer.clone());
1829
1830 let schema = Schema::parse_str(SCHEMA)?;
1831
1832 let mut writer = Writer::new(&schema, buffered_writer)?;
1833
1834 let mut record = Record::new(writer.schema()).unwrap();
1835 record.put("exampleField", "value");
1836
1837 writer.append_value(record)?;
1838 writer.flush()?;
1839
1840 assert_eq!(
1841 shared_buffer.len(),
1842 151,
1843 "the test buffer was not fully written to after Writer::flush was called"
1844 );
1845
1846 Ok(())
1847 }
1848
1849 #[test]
1850 fn avro_rs_439_specific_single_object_writer_ref() -> TestResult {
1851 #[derive(Serialize)]
1852 struct Recursive {
1853 field: bool,
1854 recurse: Option<Box<Recursive>>,
1855 }
1856
1857 impl AvroSchema for Recursive {
1858 fn get_schema() -> Schema {
1859 Schema::parse_str(
1860 r#"{
1861 "name": "Recursive",
1862 "type": "record",
1863 "fields": [
1864 { "name": "field", "type": "boolean" },
1865 { "name": "recurse", "type": ["null", "Recursive"] }
1866 ]
1867 }"#,
1868 )
1869 .unwrap()
1870 }
1871 }
1872
1873 let mut buffer = Vec::new();
1874 let writer = SpecificSingleObjectWriter::new()?;
1875
1876 writer.write(
1877 Recursive {
1878 field: true,
1879 recurse: Some(Box::new(Recursive {
1880 field: false,
1881 recurse: None,
1882 })),
1883 },
1884 &mut buffer,
1885 )?;
1886 assert_eq!(
1887 buffer,
1888 &[195, 1, 83, 223, 43, 26, 181, 179, 227, 224, 1, 2, 0, 0][..]
1889 );
1890
1891 Ok(())
1892 }
1893
1894 #[test]
1895 fn avro_rs_310_append_unvalidated_value() -> TestResult {
1896 let schema = Schema::String;
1897 let value = Value::Int(1);
1898
1899 let mut writer = Writer::new(&schema, Vec::new())?;
1900 writer.unvalidated_append_value_ref(&value)?;
1901 writer.unvalidated_append_value(value)?;
1902 let buffer = writer.into_inner()?;
1903
1904 assert_eq!(&buffer[buffer.len() - 18..buffer.len() - 16], &[2, 2]);
1906
1907 let mut writer = Writer::new(&schema, Vec::new())?;
1908 let value = Value::Int(1);
1909 let err = writer.append_value_ref(&value).unwrap_err();
1910 assert_eq!(
1911 err.to_string(),
1912 "Value Int(1) does not match schema String: Reason: Unsupported value-schema combination! Value: Int(1), schema: String"
1913 );
1914 let err = writer.append_value(value).unwrap_err();
1915 assert_eq!(
1916 err.to_string(),
1917 "Value Int(1) does not match schema String: Reason: Unsupported value-schema combination! Value: Int(1), schema: String"
1918 );
1919
1920 Ok(())
1921 }
1922
1923 #[test]
1924 fn avro_rs_469_reset_writer() -> TestResult {
1925 let schema = Schema::Boolean;
1926 let values = [true, false, true, false];
1927 let mut writer = Writer::new(&schema, Vec::new())?;
1928
1929 for value in values {
1930 writer.append_value(value)?;
1931 }
1932
1933 writer.flush()?;
1934 let first_buffer = writer.get_ref().clone();
1935
1936 writer.reset();
1937 assert_eq!(writer.get_ref().len(), 0);
1938
1939 for value in values {
1940 writer.append_value(value)?;
1941 }
1942
1943 writer.flush()?;
1944 let second_buffer = writer.get_ref().clone();
1945 assert_eq!(first_buffer.len(), second_buffer.len());
1946 let len = first_buffer.len();
1952 let header = len - 16 - 6 - 16;
1953 let data = header + 16;
1954 assert_eq!(
1955 first_buffer[..header],
1956 second_buffer[..header],
1957 "Written header must be the same, excluding sync marker"
1958 );
1959 assert_ne!(
1960 first_buffer[header..data],
1961 second_buffer[header..data],
1962 "Sync markers should be different"
1963 );
1964 assert_eq!(
1965 first_buffer[data..data + 6],
1966 second_buffer[data..data + 6],
1967 "Written data must be the same"
1968 );
1969 assert_ne!(
1970 first_buffer[len - 16..],
1971 second_buffer[len - 16..],
1972 "Sync markers should be different"
1973 );
1974
1975 Ok(())
1976 }
1977}