1use crate::{
20 AvroResult, Codec, Error,
21 encode::{encode, encode_internal, encode_to_vec},
22 error::Details,
23 headers::{HeaderBuilder, RabinFingerprintHeader},
24 schema::{NamesRef, ResolvedOwnedSchema, ResolvedSchema, Schema},
25 serde::{AvroSchema, ser_schema::SchemaAwareWriteSerializer},
26 types::Value,
27};
28use serde::Serialize;
29use std::{
30 collections::HashMap, io::Write, marker::PhantomData, mem::ManuallyDrop, ops::RangeInclusive,
31};
32
33const DEFAULT_BLOCK_SIZE: usize = 16000;
34const AVRO_OBJECT_HEADER: &[u8] = b"Obj\x01";
35
36pub struct Writer<'a, W: Write> {
42 schema: &'a Schema,
43 writer: W,
44 resolved_schema: ResolvedSchema<'a>,
45 codec: Codec,
46 block_size: usize,
47 buffer: Vec<u8>,
48 num_values: usize,
49 marker: [u8; 16],
50 has_header: bool,
51 user_metadata: HashMap<String, Value>,
52}
53
54#[bon::bon]
55impl<'a, W: Write> Writer<'a, W> {
56 #[builder]
57 pub fn builder(
58 schema: &'a Schema,
59 schemata: Option<Vec<&'a Schema>>,
60 writer: W,
61 #[builder(default = Codec::Null)] codec: Codec,
62 #[builder(default = DEFAULT_BLOCK_SIZE)] block_size: usize,
63 #[builder(default = generate_sync_marker())] marker: [u8; 16],
64 #[builder(default = false)]
68 has_header: bool,
69 #[builder(default)] user_metadata: HashMap<String, Value>,
70 ) -> AvroResult<Self> {
71 let resolved_schema = if let Some(schemata) = schemata {
72 ResolvedSchema::try_from(schemata)?
73 } else {
74 ResolvedSchema::try_from(schema)?
75 };
76 Ok(Self {
77 schema,
78 writer,
79 resolved_schema,
80 codec,
81 block_size,
82 buffer: Vec::with_capacity(block_size),
83 num_values: 0,
84 marker,
85 has_header,
86 user_metadata,
87 })
88 }
89}
90
91impl<'a, W: Write> Writer<'a, W> {
92 pub fn new(schema: &'a Schema, writer: W) -> AvroResult<Self> {
96 Writer::with_codec(schema, writer, Codec::Null)
97 }
98
99 pub fn with_codec(schema: &'a Schema, writer: W, codec: Codec) -> AvroResult<Self> {
102 Self::builder()
103 .schema(schema)
104 .writer(writer)
105 .codec(codec)
106 .build()
107 }
108
109 pub fn with_schemata(
114 schema: &'a Schema,
115 schemata: Vec<&'a Schema>,
116 writer: W,
117 codec: Codec,
118 ) -> AvroResult<Self> {
119 Self::builder()
120 .schema(schema)
121 .schemata(schemata)
122 .writer(writer)
123 .codec(codec)
124 .build()
125 }
126
127 pub fn append_to(schema: &'a Schema, writer: W, marker: [u8; 16]) -> AvroResult<Self> {
131 Writer::append_to_with_codec(schema, writer, Codec::Null, marker)
132 }
133
134 pub fn append_to_with_codec(
137 schema: &'a Schema,
138 writer: W,
139 codec: Codec,
140 marker: [u8; 16],
141 ) -> AvroResult<Self> {
142 Self::builder()
143 .schema(schema)
144 .writer(writer)
145 .codec(codec)
146 .marker(marker)
147 .has_header(true)
148 .build()
149 }
150
151 pub fn append_to_with_codec_schemata(
154 schema: &'a Schema,
155 schemata: Vec<&'a Schema>,
156 writer: W,
157 codec: Codec,
158 marker: [u8; 16],
159 ) -> AvroResult<Self> {
160 Self::builder()
161 .schema(schema)
162 .schemata(schemata)
163 .writer(writer)
164 .codec(codec)
165 .marker(marker)
166 .has_header(true)
167 .build()
168 }
169
170 pub fn schema(&self) -> &'a Schema {
172 self.schema
173 }
174
175 #[deprecated(since = "0.22.0", note = "Use `Writer::append_value` instead")]
177 pub fn append<T: Into<Value>>(&mut self, value: T) -> AvroResult<usize> {
178 self.append_value(value)
179 }
180
181 pub fn append_value<T: Into<Value>>(&mut self, value: T) -> AvroResult<usize> {
189 let avro = value.into();
190 self.append_value_ref(&avro)
191 }
192
193 pub fn append_value_ref(&mut self, value: &Value) -> AvroResult<usize> {
201 if let Some(reason) = value.validate_internal(
202 self.schema,
203 self.resolved_schema.get_names(),
204 &self.schema.namespace(),
205 ) {
206 return Err(Details::ValidationWithReason {
207 value: value.clone(),
208 schema: self.schema.clone(),
209 reason,
210 }
211 .into());
212 }
213 self.unvalidated_append_value_ref(value)
214 }
215
216 pub fn unvalidated_append_value<T: Into<Value>>(&mut self, value: T) -> AvroResult<usize> {
228 let value = value.into();
229 self.unvalidated_append_value_ref(&value)
230 }
231
232 pub fn unvalidated_append_value_ref(&mut self, value: &Value) -> AvroResult<usize> {
244 let n = self.maybe_write_header()?;
245 encode_internal(
246 value,
247 self.schema,
248 self.resolved_schema.get_names(),
249 &self.schema.namespace(),
250 &mut self.buffer,
251 )?;
252
253 self.num_values += 1;
254
255 if self.buffer.len() >= self.block_size {
256 return self.flush().map(|b| b + n);
257 }
258
259 Ok(n)
260 }
261
262 pub fn append_ser<S: Serialize>(&mut self, value: S) -> AvroResult<usize> {
272 let n = self.maybe_write_header()?;
273
274 let mut serializer = SchemaAwareWriteSerializer::new(
275 &mut self.buffer,
276 self.schema,
277 self.resolved_schema.get_names(),
278 None,
279 );
280 value.serialize(&mut serializer)?;
281 self.num_values += 1;
282
283 if self.buffer.len() >= self.block_size {
284 return self.flush().map(|b| b + n);
285 }
286
287 Ok(n)
288 }
289
290 pub fn extend<I, T: Into<Value>>(&mut self, values: I) -> AvroResult<usize>
297 where
298 I: IntoIterator<Item = T>,
299 {
300 let mut num_bytes = 0;
315 for value in values {
316 num_bytes += self.append_value(value)?;
317 }
318 num_bytes += self.flush()?;
319
320 Ok(num_bytes)
321 }
322
323 pub fn extend_ser<I, T: Serialize>(&mut self, values: I) -> AvroResult<usize>
332 where
333 I: IntoIterator<Item = T>,
334 {
335 let mut num_bytes = 0;
350 for value in values {
351 num_bytes += self.append_ser(value)?;
352 }
353 num_bytes += self.flush()?;
354
355 Ok(num_bytes)
356 }
357
358 pub fn extend_from_slice(&mut self, values: &[Value]) -> AvroResult<usize> {
366 let mut num_bytes = 0;
367 for value in values {
368 num_bytes += self.append_value_ref(value)?;
369 }
370 num_bytes += self.flush()?;
371
372 Ok(num_bytes)
373 }
374
375 pub fn flush(&mut self) -> AvroResult<usize> {
383 let mut num_bytes = self.maybe_write_header()?;
384 if self.num_values == 0 {
385 return Ok(num_bytes);
386 }
387
388 self.codec.compress(&mut self.buffer)?;
389
390 let num_values = self.num_values;
391 let stream_len = self.buffer.len();
392
393 num_bytes += self.append_raw(&num_values.into(), &Schema::Long)?
394 + self.append_raw(&stream_len.into(), &Schema::Long)?
395 + self
396 .writer
397 .write(self.buffer.as_ref())
398 .map_err(Details::WriteBytes)?
399 + self.append_marker()?;
400
401 self.buffer.clear();
402 self.num_values = 0;
403
404 self.writer.flush().map_err(Details::FlushWriter)?;
405
406 Ok(num_bytes)
407 }
408
409 pub fn into_inner(mut self) -> AvroResult<W> {
414 self.maybe_write_header()?;
415 self.flush()?;
416
417 let mut this = ManuallyDrop::new(self);
418
419 let _buffer = std::mem::take(&mut this.buffer);
421 let _user_metadata = std::mem::take(&mut this.user_metadata);
422 unsafe { std::ptr::drop_in_place(&mut this.resolved_schema) };
424
425 let writer = unsafe { std::ptr::read(&this.writer) };
427
428 Ok(writer)
429 }
430
431 pub fn get_ref(&self) -> &W {
436 &self.writer
437 }
438
439 pub fn get_mut(&mut self) -> &mut W {
446 &mut self.writer
447 }
448
449 fn append_marker(&mut self) -> AvroResult<usize> {
451 self.writer
454 .write(&self.marker)
455 .map_err(|e| Details::WriteMarker(e).into())
456 }
457
458 fn append_raw(&mut self, value: &Value, schema: &Schema) -> AvroResult<usize> {
460 self.append_bytes(encode_to_vec(value, schema)?.as_ref())
461 }
462
463 fn append_bytes(&mut self, bytes: &[u8]) -> AvroResult<usize> {
465 self.writer
466 .write(bytes)
467 .map_err(|e| Details::WriteBytes(e).into())
468 }
469
470 pub fn add_user_metadata<T: AsRef<[u8]>>(&mut self, key: String, value: T) -> AvroResult<()> {
473 if !self.has_header {
474 if key.starts_with("avro.") {
475 return Err(Details::InvalidMetadataKey(key).into());
476 }
477 self.user_metadata
478 .insert(key, Value::Bytes(value.as_ref().to_vec()));
479 Ok(())
480 } else {
481 Err(Details::FileHeaderAlreadyWritten.into())
482 }
483 }
484
485 fn header(&self) -> Result<Vec<u8>, Error> {
487 let schema_bytes = serde_json::to_string(self.schema)
488 .map_err(Details::ConvertJsonToString)?
489 .into_bytes();
490
491 let mut metadata = HashMap::with_capacity(2);
492 metadata.insert("avro.schema", Value::Bytes(schema_bytes));
493 if self.codec != Codec::Null {
494 metadata.insert("avro.codec", self.codec.into());
495 }
496 match self.codec {
497 #[cfg(feature = "bzip")]
498 Codec::Bzip2(settings) => {
499 metadata.insert(
500 "avro.codec.compression_level",
501 Value::Bytes(vec![settings.compression_level]),
502 );
503 }
504 #[cfg(feature = "xz")]
505 Codec::Xz(settings) => {
506 metadata.insert(
507 "avro.codec.compression_level",
508 Value::Bytes(vec![settings.compression_level]),
509 );
510 }
511 #[cfg(feature = "zstandard")]
512 Codec::Zstandard(settings) => {
513 metadata.insert(
514 "avro.codec.compression_level",
515 Value::Bytes(vec![settings.compression_level]),
516 );
517 }
518 _ => {}
519 }
520
521 for (k, v) in &self.user_metadata {
522 metadata.insert(k.as_str(), v.clone());
523 }
524
525 let mut header = Vec::new();
526 header.extend_from_slice(AVRO_OBJECT_HEADER);
527 encode(&metadata.into(), &Schema::map(Schema::Bytes), &mut header)?;
528 header.extend_from_slice(&self.marker);
529
530 Ok(header)
531 }
532
533 fn maybe_write_header(&mut self) -> AvroResult<usize> {
534 if !self.has_header {
535 let header = self.header()?;
536 let n = self.append_bytes(header.as_ref())?;
537 self.has_header = true;
538 Ok(n)
539 } else {
540 Ok(0)
541 }
542 }
543}
544
545impl<W: Write> Drop for Writer<'_, W> {
546 fn drop(&mut self) {
548 let _ = self.maybe_write_header();
549 let _ = self.flush();
550 }
551}
552
553fn write_avro_datum<T: Into<Value>, W: Write>(
558 schema: &Schema,
559 value: T,
560 writer: &mut W,
561) -> Result<(), Error> {
562 let avro = value.into();
563 if !avro.validate(schema) {
564 return Err(Details::Validation.into());
565 }
566 encode(&avro, schema, writer)?;
567 Ok(())
568}
569
570fn write_avro_datum_schemata<T: Into<Value>>(
571 schema: &Schema,
572 schemata: Vec<&Schema>,
573 value: T,
574 buffer: &mut Vec<u8>,
575) -> AvroResult<usize> {
576 let avro = value.into();
577 let rs = ResolvedSchema::try_from(schemata)?;
578 let names = rs.get_names();
579 let enclosing_namespace = schema.namespace();
580 if let Some(_err) = avro.validate_internal(schema, names, &enclosing_namespace) {
581 return Err(Details::Validation.into());
582 }
583 encode_internal(&avro, schema, names, &enclosing_namespace, buffer)
584}
585
586pub struct GenericSingleObjectWriter {
590 buffer: Vec<u8>,
591 resolved: ResolvedOwnedSchema,
592}
593
594impl GenericSingleObjectWriter {
595 pub fn new_with_capacity(
596 schema: &Schema,
597 initial_buffer_cap: usize,
598 ) -> AvroResult<GenericSingleObjectWriter> {
599 let header_builder = RabinFingerprintHeader::from_schema(schema);
600 Self::new_with_capacity_and_header_builder(schema, initial_buffer_cap, header_builder)
601 }
602
603 pub fn new_with_capacity_and_header_builder<HB: HeaderBuilder>(
604 schema: &Schema,
605 initial_buffer_cap: usize,
606 header_builder: HB,
607 ) -> AvroResult<GenericSingleObjectWriter> {
608 let mut buffer = Vec::with_capacity(initial_buffer_cap);
609 let header = header_builder.build_header();
610 buffer.extend_from_slice(&header);
611
612 Ok(GenericSingleObjectWriter {
613 buffer,
614 resolved: ResolvedOwnedSchema::try_from(schema.clone())?,
615 })
616 }
617
618 const HEADER_LENGTH_RANGE: RangeInclusive<usize> = 10_usize..=20_usize;
619
620 pub fn write_value_ref<W: Write>(&mut self, v: &Value, writer: &mut W) -> AvroResult<usize> {
622 let original_length = self.buffer.len();
623 if !Self::HEADER_LENGTH_RANGE.contains(&original_length) {
624 Err(Details::IllegalSingleObjectWriterState.into())
625 } else {
626 write_value_ref_owned_resolved(&self.resolved, v, &mut self.buffer)?;
627 writer
628 .write_all(&self.buffer)
629 .map_err(Details::WriteBytes)?;
630 let len = self.buffer.len();
631 self.buffer.truncate(original_length);
632 Ok(len)
633 }
634 }
635
636 pub fn write_value<W: Write>(&mut self, v: Value, writer: &mut W) -> AvroResult<usize> {
638 self.write_value_ref(&v, writer)
639 }
640}
641
642pub struct SpecificSingleObjectWriter<T>
644where
645 T: AvroSchema,
646{
647 resolved: ResolvedOwnedSchema,
648 header: Vec<u8>,
649 _model: PhantomData<T>,
650}
651
652impl<T> SpecificSingleObjectWriter<T>
653where
654 T: AvroSchema,
655{
656 pub fn new() -> AvroResult<Self> {
657 let schema = T::get_schema();
658 let header = RabinFingerprintHeader::from_schema(&schema).build_header();
659 let resolved = ResolvedOwnedSchema::new(schema)?;
660 Ok(Self {
662 resolved,
663 header,
664 _model: PhantomData,
665 })
666 }
667
668 pub fn new_with_header_builder(header_builder: impl HeaderBuilder) -> AvroResult<Self> {
669 let header = header_builder.build_header();
670 let resolved = ResolvedOwnedSchema::new(T::get_schema())?;
671 Ok(Self {
672 resolved,
673 header,
674 _model: PhantomData,
675 })
676 }
677
678 #[deprecated(since = "0.22.0", note = "Use new() instead")]
680 pub fn with_capacity(_buffer_cap: usize) -> AvroResult<Self> {
681 Self::new()
682 }
683}
684
685impl<T> SpecificSingleObjectWriter<T>
686where
687 T: AvroSchema + Into<Value>,
688{
689 pub fn write_value<W: Write>(&self, data: T, writer: &mut W) -> AvroResult<usize> {
696 writer
697 .write_all(&self.header)
698 .map_err(Details::WriteBytes)?;
699 let value: Value = data.into();
700 let bytes = write_value_ref_owned_resolved(&self.resolved, &value, writer)?;
701 Ok(bytes + self.header.len())
702 }
703}
704
705impl<T> SpecificSingleObjectWriter<T>
706where
707 T: AvroSchema + Serialize,
708{
709 pub fn write_ref<W: Write>(&self, data: &T, writer: &mut W) -> AvroResult<usize> {
716 writer
717 .write_all(&self.header)
718 .map_err(Details::WriteBytes)?;
719
720 let bytes = write_avro_datum_ref(
721 self.resolved.get_root_schema(),
722 self.resolved.get_names(),
723 data,
724 writer,
725 )?;
726
727 Ok(bytes + self.header.len())
728 }
729
730 pub fn write<W: Write>(&self, data: T, writer: &mut W) -> AvroResult<usize> {
737 self.write_ref(&data, writer)
738 }
739}
740
741fn write_value_ref_owned_resolved<W: Write>(
742 resolved_schema: &ResolvedOwnedSchema,
743 value: &Value,
744 writer: &mut W,
745) -> AvroResult<usize> {
746 let root_schema = resolved_schema.get_root_schema();
747 if let Some(reason) = value.validate_internal(
748 root_schema,
749 resolved_schema.get_names(),
750 &root_schema.namespace(),
751 ) {
752 return Err(Details::ValidationWithReason {
753 value: value.clone(),
754 schema: root_schema.clone(),
755 reason,
756 }
757 .into());
758 }
759 encode_internal(
760 value,
761 root_schema,
762 resolved_schema.get_names(),
763 &root_schema.namespace(),
764 writer,
765 )
766}
767
768pub fn to_avro_datum<T: Into<Value>>(schema: &Schema, value: T) -> AvroResult<Vec<u8>> {
774 let mut buffer = Vec::new();
775 write_avro_datum(schema, value, &mut buffer)?;
776 Ok(buffer)
777}
778
779pub fn write_avro_datum_ref<T: Serialize, W: Write>(
787 schema: &Schema,
788 names: &NamesRef,
789 data: &T,
790 writer: &mut W,
791) -> AvroResult<usize> {
792 let mut serializer = SchemaAwareWriteSerializer::new(writer, schema, names, None);
793 data.serialize(&mut serializer)
794}
795
796pub fn to_avro_datum_schemata<T: Into<Value>>(
801 schema: &Schema,
802 schemata: Vec<&Schema>,
803 value: T,
804) -> AvroResult<Vec<u8>> {
805 let mut buffer = Vec::new();
806 write_avro_datum_schemata(schema, schemata, value, &mut buffer)?;
807 Ok(buffer)
808}
809
810#[cfg(not(target_arch = "wasm32"))]
811fn generate_sync_marker() -> [u8; 16] {
812 let mut marker = [0_u8; 16];
813 std::iter::repeat_with(rand::random)
814 .take(16)
815 .enumerate()
816 .for_each(|(i, n)| marker[i] = n);
817 marker
818}
819
820#[cfg(target_arch = "wasm32")]
821fn generate_sync_marker() -> [u8; 16] {
822 let mut marker = [0_u8; 16];
823 std::iter::repeat_with(quad_rand::rand)
824 .take(4)
825 .flat_map(|i| i.to_be_bytes())
826 .enumerate()
827 .for_each(|(i, n)| marker[i] = n);
828 marker
829}
830
831#[cfg(test)]
832mod tests {
833 use std::{cell::RefCell, rc::Rc};
834
835 use super::*;
836 use crate::{
837 Reader,
838 decimal::Decimal,
839 duration::{Days, Duration, Millis, Months},
840 headers::GlueSchemaUuidHeader,
841 rabin::Rabin,
842 schema::{DecimalSchema, FixedSchema, Name},
843 types::Record,
844 util::zig_i64,
845 };
846 use pretty_assertions::assert_eq;
847 use serde::{Deserialize, Serialize};
848 use uuid::Uuid;
849
850 use crate::schema::InnerDecimalSchema;
851 use crate::{codec::DeflateSettings, error::Details};
852 use apache_avro_test_helper::TestResult;
853
854 const AVRO_OBJECT_HEADER_LEN: usize = AVRO_OBJECT_HEADER.len();
855
856 const SCHEMA: &str = r#"
857 {
858 "type": "record",
859 "name": "test",
860 "fields": [
861 {
862 "name": "a",
863 "type": "long",
864 "default": 42
865 },
866 {
867 "name": "b",
868 "type": "string"
869 }
870 ]
871 }
872 "#;
873
874 const UNION_SCHEMA: &str = r#"["null", "long"]"#;
875
876 #[test]
877 fn test_to_avro_datum() -> TestResult {
878 let schema = Schema::parse_str(SCHEMA)?;
879 let mut record = Record::new(&schema).unwrap();
880 record.put("a", 27i64);
881 record.put("b", "foo");
882
883 let mut expected = Vec::new();
884 zig_i64(27, &mut expected)?;
885 zig_i64(3, &mut expected)?;
886 expected.extend([b'f', b'o', b'o']);
887
888 assert_eq!(to_avro_datum(&schema, record)?, expected);
889
890 Ok(())
891 }
892
893 #[test]
894 fn avro_rs_193_write_avro_datum_ref() -> TestResult {
895 #[derive(Serialize)]
896 struct TestStruct {
897 a: i64,
898 b: String,
899 }
900
901 let schema = Schema::parse_str(SCHEMA)?;
902 let mut writer: Vec<u8> = Vec::new();
903 let data = TestStruct {
904 a: 27,
905 b: "foo".to_string(),
906 };
907
908 let mut expected = Vec::new();
909 zig_i64(27, &mut expected)?;
910 zig_i64(3, &mut expected)?;
911 expected.extend([b'f', b'o', b'o']);
912
913 let bytes = write_avro_datum_ref(&schema, &HashMap::new(), &data, &mut writer)?;
914
915 assert_eq!(bytes, expected.len());
916 assert_eq!(writer, expected);
917
918 Ok(())
919 }
920
921 #[test]
922 fn avro_rs_220_flush_write_header() -> TestResult {
923 let schema = Schema::parse_str(SCHEMA)?;
924
925 let mut writer = Writer::new(&schema, Vec::new())?;
927 writer.flush()?;
928 let result = writer.into_inner()?;
929 assert_eq!(result.len(), 147);
930
931 let mut writer = Writer::builder()
933 .has_header(true)
934 .schema(&schema)
935 .writer(Vec::new())
936 .build()?;
937 writer.flush()?;
938 let result = writer.into_inner()?;
939 assert_eq!(result.len(), 0);
940
941 Ok(())
942 }
943
944 #[test]
945 fn test_union_not_null() -> TestResult {
946 let schema = Schema::parse_str(UNION_SCHEMA)?;
947 let union = Value::Union(1, Box::new(Value::Long(3)));
948
949 let mut expected = Vec::new();
950 zig_i64(1, &mut expected)?;
951 zig_i64(3, &mut expected)?;
952
953 assert_eq!(to_avro_datum(&schema, union)?, expected);
954
955 Ok(())
956 }
957
958 #[test]
959 fn test_union_null() -> TestResult {
960 let schema = Schema::parse_str(UNION_SCHEMA)?;
961 let union = Value::Union(0, Box::new(Value::Null));
962
963 let mut expected = Vec::new();
964 zig_i64(0, &mut expected)?;
965
966 assert_eq!(to_avro_datum(&schema, union)?, expected);
967
968 Ok(())
969 }
970
971 fn logical_type_test<T: Into<Value> + Clone>(
972 schema_str: &'static str,
973
974 expected_schema: &Schema,
975 value: Value,
976
977 raw_schema: &Schema,
978 raw_value: T,
979 ) -> TestResult {
980 let schema = Schema::parse_str(schema_str)?;
981 assert_eq!(&schema, expected_schema);
982 let ser = to_avro_datum(&schema, value.clone())?;
984 let raw_ser = to_avro_datum(raw_schema, raw_value)?;
985 assert_eq!(ser, raw_ser);
986
987 let mut r = ser.as_slice();
989 let de = crate::from_avro_datum(&schema, &mut r, None)?;
990 assert_eq!(de, value);
991 Ok(())
992 }
993
994 #[test]
995 fn date() -> TestResult {
996 logical_type_test(
997 r#"{"type": "int", "logicalType": "date"}"#,
998 &Schema::Date,
999 Value::Date(1_i32),
1000 &Schema::Int,
1001 1_i32,
1002 )
1003 }
1004
1005 #[test]
1006 fn time_millis() -> TestResult {
1007 logical_type_test(
1008 r#"{"type": "int", "logicalType": "time-millis"}"#,
1009 &Schema::TimeMillis,
1010 Value::TimeMillis(1_i32),
1011 &Schema::Int,
1012 1_i32,
1013 )
1014 }
1015
1016 #[test]
1017 fn time_micros() -> TestResult {
1018 logical_type_test(
1019 r#"{"type": "long", "logicalType": "time-micros"}"#,
1020 &Schema::TimeMicros,
1021 Value::TimeMicros(1_i64),
1022 &Schema::Long,
1023 1_i64,
1024 )
1025 }
1026
1027 #[test]
1028 fn timestamp_millis() -> TestResult {
1029 logical_type_test(
1030 r#"{"type": "long", "logicalType": "timestamp-millis"}"#,
1031 &Schema::TimestampMillis,
1032 Value::TimestampMillis(1_i64),
1033 &Schema::Long,
1034 1_i64,
1035 )
1036 }
1037
1038 #[test]
1039 fn timestamp_micros() -> TestResult {
1040 logical_type_test(
1041 r#"{"type": "long", "logicalType": "timestamp-micros"}"#,
1042 &Schema::TimestampMicros,
1043 Value::TimestampMicros(1_i64),
1044 &Schema::Long,
1045 1_i64,
1046 )
1047 }
1048
1049 #[test]
1050 fn decimal_fixed() -> TestResult {
1051 let size = 30;
1052 let fixed = FixedSchema {
1053 name: Name::new("decimal")?,
1054 aliases: None,
1055 doc: None,
1056 size,
1057 attributes: Default::default(),
1058 };
1059 let inner = InnerDecimalSchema::Fixed(fixed.clone());
1060 let value = vec![0u8; size];
1061 logical_type_test(
1062 r#"{"type": {"type": "fixed", "size": 30, "name": "decimal"}, "logicalType": "decimal", "precision": 20, "scale": 5}"#,
1063 &Schema::Decimal(DecimalSchema {
1064 precision: 20,
1065 scale: 5,
1066 inner,
1067 }),
1068 Value::Decimal(Decimal::from(value.clone())),
1069 &Schema::Fixed(fixed),
1070 Value::Fixed(size, value),
1071 )
1072 }
1073
1074 #[test]
1075 fn decimal_bytes() -> TestResult {
1076 let value = vec![0u8; 10];
1077 logical_type_test(
1078 r#"{"type": "bytes", "logicalType": "decimal", "precision": 4, "scale": 3}"#,
1079 &Schema::Decimal(DecimalSchema {
1080 precision: 4,
1081 scale: 3,
1082 inner: InnerDecimalSchema::Bytes,
1083 }),
1084 Value::Decimal(Decimal::from(value.clone())),
1085 &Schema::Bytes,
1086 value,
1087 )
1088 }
1089
1090 #[test]
1091 fn duration() -> TestResult {
1092 let inner = Schema::Fixed(FixedSchema {
1093 name: Name::new("duration")?,
1094 aliases: None,
1095 doc: None,
1096 size: 12,
1097 attributes: Default::default(),
1098 });
1099 let value = Value::Duration(Duration::new(
1100 Months::new(256),
1101 Days::new(512),
1102 Millis::new(1024),
1103 ));
1104 logical_type_test(
1105 r#"{"type": {"type": "fixed", "name": "duration", "size": 12}, "logicalType": "duration"}"#,
1106 &Schema::Duration(FixedSchema {
1107 name: Name::try_from("duration").expect("Name is valid"),
1108 aliases: None,
1109 doc: None,
1110 size: 12,
1111 attributes: Default::default(),
1112 }),
1113 value,
1114 &inner,
1115 Value::Fixed(12, vec![0, 1, 0, 0, 0, 2, 0, 0, 0, 4, 0, 0]),
1116 )
1117 }
1118
1119 #[test]
1120 fn test_writer_append() -> TestResult {
1121 let schema = Schema::parse_str(SCHEMA)?;
1122 let mut writer = Writer::new(&schema, Vec::new())?;
1123
1124 let mut record = Record::new(&schema).unwrap();
1125 record.put("a", 27i64);
1126 record.put("b", "foo");
1127
1128 let n1 = writer.append_value(record.clone())?;
1129 let n2 = writer.append_value(record.clone())?;
1130 let n3 = writer.flush()?;
1131 let result = writer.into_inner()?;
1132
1133 assert_eq!(n1 + n2 + n3, result.len());
1134
1135 let mut data = Vec::new();
1136 zig_i64(27, &mut data)?;
1137 zig_i64(3, &mut data)?;
1138 data.extend(b"foo");
1139 data.extend(data.clone());
1140
1141 assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1143 let last_data_byte = result.len() - 16;
1145 assert_eq!(
1146 &result[last_data_byte - data.len()..last_data_byte],
1147 data.as_slice()
1148 );
1149
1150 Ok(())
1151 }
1152
1153 #[test]
1154 fn test_writer_extend() -> TestResult {
1155 let schema = Schema::parse_str(SCHEMA)?;
1156 let mut writer = Writer::new(&schema, Vec::new())?;
1157
1158 let mut record = Record::new(&schema).unwrap();
1159 record.put("a", 27i64);
1160 record.put("b", "foo");
1161 let record_copy = record.clone();
1162 let records = vec![record, record_copy];
1163
1164 let n1 = writer.extend(records)?;
1165 let n2 = writer.flush()?;
1166 let result = writer.into_inner()?;
1167
1168 assert_eq!(n1 + n2, result.len());
1169
1170 let mut data = Vec::new();
1171 zig_i64(27, &mut data)?;
1172 zig_i64(3, &mut data)?;
1173 data.extend(b"foo");
1174 data.extend(data.clone());
1175
1176 assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1178 let last_data_byte = result.len() - 16;
1180 assert_eq!(
1181 &result[last_data_byte - data.len()..last_data_byte],
1182 data.as_slice()
1183 );
1184
1185 Ok(())
1186 }
1187
1188 #[derive(Debug, Clone, Deserialize, Serialize)]
1189 struct TestSerdeSerialize {
1190 a: i64,
1191 b: String,
1192 }
1193
1194 #[test]
1195 fn test_writer_append_ser() -> TestResult {
1196 let schema = Schema::parse_str(SCHEMA)?;
1197 let mut writer = Writer::new(&schema, Vec::new())?;
1198
1199 let record = TestSerdeSerialize {
1200 a: 27,
1201 b: "foo".to_owned(),
1202 };
1203
1204 let n1 = writer.append_ser(record)?;
1205 let n2 = writer.flush()?;
1206 let result = writer.into_inner()?;
1207
1208 assert_eq!(n1 + n2, result.len());
1209
1210 let mut data = Vec::new();
1211 zig_i64(27, &mut data)?;
1212 zig_i64(3, &mut data)?;
1213 data.extend(b"foo");
1214
1215 assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1217 let last_data_byte = result.len() - 16;
1219 assert_eq!(
1220 &result[last_data_byte - data.len()..last_data_byte],
1221 data.as_slice()
1222 );
1223
1224 Ok(())
1225 }
1226
1227 #[test]
1228 fn test_writer_extend_ser() -> TestResult {
1229 let schema = Schema::parse_str(SCHEMA)?;
1230 let mut writer = Writer::new(&schema, Vec::new())?;
1231
1232 let record = TestSerdeSerialize {
1233 a: 27,
1234 b: "foo".to_owned(),
1235 };
1236 let record_copy = record.clone();
1237 let records = vec![record, record_copy];
1238
1239 let n1 = writer.extend_ser(records)?;
1240 let n2 = writer.flush()?;
1241 let result = writer.into_inner()?;
1242
1243 assert_eq!(n1 + n2, result.len());
1244
1245 let mut data = Vec::new();
1246 zig_i64(27, &mut data)?;
1247 zig_i64(3, &mut data)?;
1248 data.extend(b"foo");
1249 data.extend(data.clone());
1250
1251 assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1253 let last_data_byte = result.len() - 16;
1255 assert_eq!(
1256 &result[last_data_byte - data.len()..last_data_byte],
1257 data.as_slice()
1258 );
1259
1260 Ok(())
1261 }
1262
1263 fn make_writer_with_codec(schema: &Schema) -> AvroResult<Writer<'_, Vec<u8>>> {
1264 Writer::with_codec(
1265 schema,
1266 Vec::new(),
1267 Codec::Deflate(DeflateSettings::default()),
1268 )
1269 }
1270
1271 fn make_writer_with_builder(schema: &Schema) -> AvroResult<Writer<'_, Vec<u8>>> {
1272 Writer::builder()
1273 .writer(Vec::new())
1274 .schema(schema)
1275 .codec(Codec::Deflate(DeflateSettings::default()))
1276 .block_size(100)
1277 .build()
1278 }
1279
1280 fn check_writer(mut writer: Writer<'_, Vec<u8>>, schema: &Schema) -> TestResult {
1281 let mut record = Record::new(schema).unwrap();
1282 record.put("a", 27i64);
1283 record.put("b", "foo");
1284
1285 let n1 = writer.append_value(record.clone())?;
1286 let n2 = writer.append_value(record.clone())?;
1287 let n3 = writer.flush()?;
1288 let result = writer.into_inner()?;
1289
1290 assert_eq!(n1 + n2 + n3, result.len());
1291
1292 let mut data = Vec::new();
1293 zig_i64(27, &mut data)?;
1294 zig_i64(3, &mut data)?;
1295 data.extend(b"foo");
1296 data.extend(data.clone());
1297 Codec::Deflate(DeflateSettings::default()).compress(&mut data)?;
1298
1299 assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1301 let last_data_byte = result.len() - 16;
1303 assert_eq!(
1304 &result[last_data_byte - data.len()..last_data_byte],
1305 data.as_slice()
1306 );
1307
1308 Ok(())
1309 }
1310
1311 #[test]
1312 fn test_writer_with_codec() -> TestResult {
1313 let schema = Schema::parse_str(SCHEMA)?;
1314 let writer = make_writer_with_codec(&schema)?;
1315 check_writer(writer, &schema)
1316 }
1317
1318 #[test]
1319 fn test_writer_with_builder() -> TestResult {
1320 let schema = Schema::parse_str(SCHEMA)?;
1321 let writer = make_writer_with_builder(&schema)?;
1322 check_writer(writer, &schema)
1323 }
1324
1325 #[test]
1326 fn test_logical_writer() -> TestResult {
1327 const LOGICAL_TYPE_SCHEMA: &str = r#"
1328 {
1329 "type": "record",
1330 "name": "logical_type_test",
1331 "fields": [
1332 {
1333 "name": "a",
1334 "type": [
1335 "null",
1336 {
1337 "type": "long",
1338 "logicalType": "timestamp-micros"
1339 }
1340 ]
1341 }
1342 ]
1343 }
1344 "#;
1345 let codec = Codec::Deflate(DeflateSettings::default());
1346 let schema = Schema::parse_str(LOGICAL_TYPE_SCHEMA)?;
1347 let mut writer = Writer::builder()
1348 .schema(&schema)
1349 .codec(codec)
1350 .writer(Vec::new())
1351 .build()?;
1352
1353 let mut record1 = Record::new(&schema).unwrap();
1354 record1.put(
1355 "a",
1356 Value::Union(1, Box::new(Value::TimestampMicros(1234_i64))),
1357 );
1358
1359 let mut record2 = Record::new(&schema).unwrap();
1360 record2.put("a", Value::Union(0, Box::new(Value::Null)));
1361
1362 let n1 = writer.append_value(record1)?;
1363 let n2 = writer.append_value(record2)?;
1364 let n3 = writer.flush()?;
1365 let result = writer.into_inner()?;
1366
1367 assert_eq!(n1 + n2 + n3, result.len());
1368
1369 let mut data = Vec::new();
1370 zig_i64(1, &mut data)?;
1372 zig_i64(1234, &mut data)?;
1373
1374 zig_i64(0, &mut data)?;
1376 codec.compress(&mut data)?;
1377
1378 assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1380 let last_data_byte = result.len() - 16;
1382 assert_eq!(
1383 &result[last_data_byte - data.len()..last_data_byte],
1384 data.as_slice()
1385 );
1386
1387 Ok(())
1388 }
1389
1390 #[test]
1391 fn test_avro_3405_writer_add_metadata_success() -> TestResult {
1392 let schema = Schema::parse_str(SCHEMA)?;
1393 let mut writer = Writer::new(&schema, Vec::new())?;
1394
1395 writer.add_user_metadata("stringKey".to_string(), String::from("stringValue"))?;
1396 writer.add_user_metadata("strKey".to_string(), "strValue")?;
1397 writer.add_user_metadata("bytesKey".to_string(), b"bytesValue")?;
1398 writer.add_user_metadata("vecKey".to_string(), vec![1, 2, 3])?;
1399
1400 let mut record = Record::new(&schema).unwrap();
1401 record.put("a", 27i64);
1402 record.put("b", "foo");
1403
1404 writer.append_value(record.clone())?;
1405 writer.append_value(record.clone())?;
1406 writer.flush()?;
1407 let result = writer.into_inner()?;
1408
1409 assert_eq!(result.len(), 244);
1410
1411 Ok(())
1412 }
1413
1414 #[test]
1415 fn test_avro_3881_metadata_empty_body() -> TestResult {
1416 let schema = Schema::parse_str(SCHEMA)?;
1417 let mut writer = Writer::new(&schema, Vec::new())?;
1418 writer.add_user_metadata("a".to_string(), "b")?;
1419 let result = writer.into_inner()?;
1420
1421 let reader = Reader::with_schema(&schema, &result[..])?;
1422 let mut expected = HashMap::new();
1423 expected.insert("a".to_string(), vec![b'b']);
1424 assert_eq!(reader.user_metadata(), &expected);
1425 assert_eq!(reader.into_iter().count(), 0);
1426
1427 Ok(())
1428 }
1429
1430 #[test]
1431 fn test_avro_3405_writer_add_metadata_failure() -> TestResult {
1432 let schema = Schema::parse_str(SCHEMA)?;
1433 let mut writer = Writer::new(&schema, Vec::new())?;
1434
1435 let mut record = Record::new(&schema).unwrap();
1436 record.put("a", 27i64);
1437 record.put("b", "foo");
1438 writer.append_value(record.clone())?;
1439
1440 match writer
1441 .add_user_metadata("stringKey".to_string(), String::from("value2"))
1442 .map_err(Error::into_details)
1443 {
1444 Err(e @ Details::FileHeaderAlreadyWritten) => {
1445 assert_eq!(e.to_string(), "The file metadata is already flushed.")
1446 }
1447 Err(e) => panic!("Unexpected error occurred while writing user metadata: {e:?}"),
1448 Ok(_) => panic!("Expected an error that metadata cannot be added after adding data"),
1449 }
1450
1451 Ok(())
1452 }
1453
1454 #[test]
1455 fn test_avro_3405_writer_add_metadata_reserved_prefix_failure() -> TestResult {
1456 let schema = Schema::parse_str(SCHEMA)?;
1457 let mut writer = Writer::new(&schema, Vec::new())?;
1458
1459 let key = "avro.stringKey".to_string();
1460 match writer
1461 .add_user_metadata(key.clone(), "value")
1462 .map_err(Error::into_details)
1463 {
1464 Err(ref e @ Details::InvalidMetadataKey(_)) => {
1465 assert_eq!(
1466 e.to_string(),
1467 format!(
1468 "Metadata keys starting with 'avro.' are reserved for internal usage: {key}."
1469 )
1470 )
1471 }
1472 Err(e) => panic!(
1473 "Unexpected error occurred while writing user metadata with reserved prefix ('avro.'): {e:?}"
1474 ),
1475 Ok(_) => {
1476 panic!("Expected an error that the metadata key cannot be prefixed with 'avro.'")
1477 }
1478 }
1479
1480 Ok(())
1481 }
1482
1483 #[test]
1484 fn test_avro_3405_writer_add_metadata_with_builder_api_success() -> TestResult {
1485 let schema = Schema::parse_str(SCHEMA)?;
1486
1487 let mut user_meta_data: HashMap<String, Value> = HashMap::new();
1488 user_meta_data.insert(
1489 "stringKey".to_string(),
1490 Value::String("stringValue".to_string()),
1491 );
1492 user_meta_data.insert("bytesKey".to_string(), Value::Bytes(b"bytesValue".to_vec()));
1493 user_meta_data.insert("vecKey".to_string(), Value::Bytes(vec![1, 2, 3]));
1494
1495 let writer: Writer<'_, Vec<u8>> = Writer::builder()
1496 .writer(Vec::new())
1497 .schema(&schema)
1498 .user_metadata(user_meta_data.clone())
1499 .build()?;
1500
1501 assert_eq!(writer.user_metadata, user_meta_data);
1502
1503 Ok(())
1504 }
1505
1506 #[derive(Serialize, Clone)]
1507 struct TestSingleObjectWriter {
1508 a: i64,
1509 b: f64,
1510 c: Vec<String>,
1511 }
1512
1513 impl AvroSchema for TestSingleObjectWriter {
1514 fn get_schema() -> Schema {
1515 let schema = r#"
1516 {
1517 "type":"record",
1518 "name":"TestSingleObjectWrtierSerialize",
1519 "fields":[
1520 {
1521 "name":"a",
1522 "type":"long"
1523 },
1524 {
1525 "name":"b",
1526 "type":"double"
1527 },
1528 {
1529 "name":"c",
1530 "type":{
1531 "type":"array",
1532 "items":"string"
1533 }
1534 }
1535 ]
1536 }
1537 "#;
1538 Schema::parse_str(schema).unwrap()
1539 }
1540 }
1541
1542 impl From<TestSingleObjectWriter> for Value {
1543 fn from(obj: TestSingleObjectWriter) -> Value {
1544 Value::Record(vec![
1545 ("a".into(), obj.a.into()),
1546 ("b".into(), obj.b.into()),
1547 (
1548 "c".into(),
1549 Value::Array(obj.c.into_iter().map(|s| s.into()).collect()),
1550 ),
1551 ])
1552 }
1553 }
1554
1555 #[test]
1556 fn test_single_object_writer() -> TestResult {
1557 let mut buf: Vec<u8> = Vec::new();
1558 let obj = TestSingleObjectWriter {
1559 a: 300,
1560 b: 34.555,
1561 c: vec!["cat".into(), "dog".into()],
1562 };
1563 let mut writer = GenericSingleObjectWriter::new_with_capacity(
1564 &TestSingleObjectWriter::get_schema(),
1565 1024,
1566 )
1567 .expect("Should resolve schema");
1568 let value = obj.into();
1569 let written_bytes = writer
1570 .write_value_ref(&value, &mut buf)
1571 .expect("Error serializing properly");
1572
1573 assert!(buf.len() > 10, "no bytes written");
1574 assert_eq!(buf.len(), written_bytes);
1575 assert_eq!(buf[0], 0xC3);
1576 assert_eq!(buf[1], 0x01);
1577 assert_eq!(
1578 &buf[2..10],
1579 &TestSingleObjectWriter::get_schema()
1580 .fingerprint::<Rabin>()
1581 .bytes[..]
1582 );
1583 let mut msg_binary = Vec::new();
1584 encode(
1585 &value,
1586 &TestSingleObjectWriter::get_schema(),
1587 &mut msg_binary,
1588 )
1589 .expect("encode should have failed by here as a dependency of any writing");
1590 assert_eq!(&buf[10..], &msg_binary[..]);
1591
1592 Ok(())
1593 }
1594
1595 #[test]
1596 fn test_single_object_writer_with_header_builder() -> TestResult {
1597 let mut buf: Vec<u8> = Vec::new();
1598 let obj = TestSingleObjectWriter {
1599 a: 300,
1600 b: 34.555,
1601 c: vec!["cat".into(), "dog".into()],
1602 };
1603 let schema_uuid = Uuid::parse_str("b2f1cf00-0434-013e-439a-125eb8485a5f")?;
1604 let header_builder = GlueSchemaUuidHeader::from_uuid(schema_uuid);
1605 let mut writer = GenericSingleObjectWriter::new_with_capacity_and_header_builder(
1606 &TestSingleObjectWriter::get_schema(),
1607 1024,
1608 header_builder,
1609 )
1610 .expect("Should resolve schema");
1611 let value = obj.into();
1612 writer
1613 .write_value_ref(&value, &mut buf)
1614 .expect("Error serializing properly");
1615
1616 assert_eq!(buf[0], 0x03);
1617 assert_eq!(buf[1], 0x00);
1618 assert_eq!(buf[2..18], schema_uuid.into_bytes()[..]);
1619 Ok(())
1620 }
1621
1622 #[test]
1623 fn test_writer_parity() -> TestResult {
1624 let obj1 = TestSingleObjectWriter {
1625 a: 300,
1626 b: 34.555,
1627 c: vec!["cat".into(), "dog".into()],
1628 };
1629
1630 let mut buf1: Vec<u8> = Vec::new();
1631 let mut buf2: Vec<u8> = Vec::new();
1632 let mut buf3: Vec<u8> = Vec::new();
1633 let mut buf4: Vec<u8> = Vec::new();
1634
1635 let mut generic_writer = GenericSingleObjectWriter::new_with_capacity(
1636 &TestSingleObjectWriter::get_schema(),
1637 1024,
1638 )
1639 .expect("Should resolve schema");
1640 let specific_writer = SpecificSingleObjectWriter::<TestSingleObjectWriter>::new()
1641 .expect("Resolved should pass");
1642 specific_writer
1643 .write_ref(&obj1, &mut buf1)
1644 .expect("Serialization expected");
1645 specific_writer
1646 .write_ref(&obj1, &mut buf2)
1647 .expect("Serialization expected");
1648 specific_writer
1649 .write_value(obj1.clone(), &mut buf3)
1650 .expect("Serialization expected");
1651
1652 generic_writer
1653 .write_value(obj1.into(), &mut buf4)
1654 .expect("Serialization expected");
1655
1656 assert_eq!(buf1, buf2);
1657 assert_eq!(buf2, buf3);
1658 assert_eq!(buf3, buf4);
1659
1660 Ok(())
1661 }
1662
1663 #[test]
1664 fn avro_3894_take_aliases_into_account_when_serializing() -> TestResult {
1665 const SCHEMA: &str = r#"
1666 {
1667 "type": "record",
1668 "name": "Conference",
1669 "fields": [
1670 {"type": "string", "name": "name"},
1671 {"type": ["null", "long"], "name": "date", "aliases" : [ "time2", "time" ]}
1672 ]
1673 }"#;
1674
1675 #[derive(Debug, PartialEq, Eq, Clone, Serialize)]
1676 pub struct Conference {
1677 pub name: String,
1678 pub time: Option<i64>,
1679 }
1680
1681 let conf = Conference {
1682 name: "RustConf".to_string(),
1683 time: Some(1234567890),
1684 };
1685
1686 let schema = Schema::parse_str(SCHEMA)?;
1687 let mut writer = Writer::new(&schema, Vec::new())?;
1688
1689 let bytes = writer.append_ser(conf)?;
1690
1691 assert_eq!(182, bytes);
1692
1693 Ok(())
1694 }
1695
1696 #[test]
1697 fn avro_4014_validation_returns_a_detailed_error() -> TestResult {
1698 const SCHEMA: &str = r#"
1699 {
1700 "type": "record",
1701 "name": "Conference",
1702 "fields": [
1703 {"type": "string", "name": "name"},
1704 {"type": ["null", "long"], "name": "date", "aliases" : [ "time2", "time" ]}
1705 ]
1706 }"#;
1707
1708 #[derive(Debug, PartialEq, Clone, Serialize)]
1709 pub struct Conference {
1710 pub name: String,
1711 pub time: Option<f64>, }
1713
1714 let conf = Conference {
1715 name: "RustConf".to_string(),
1716 time: Some(12345678.90),
1717 };
1718
1719 let schema = Schema::parse_str(SCHEMA)?;
1720 let mut writer = Writer::new(&schema, Vec::new())?;
1721
1722 match writer.append_ser(conf) {
1723 Ok(bytes) => panic!("Expected an error, but got {bytes} bytes written"),
1724 Err(e) => {
1725 assert_eq!(
1726 e.to_string(),
1727 r#"Failed to serialize field 'time' for record Record(RecordSchema { name: Name { name: "Conference", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "name", doc: None, aliases: None, default: None, schema: String, order: Ascending, position: 0, custom_attributes: {} }, RecordField { name: "date", doc: None, aliases: Some(["time2", "time"]), default: None, schema: Union(UnionSchema { schemas: [Null, Long], variant_index: {Null: 0, Long: 1} }), order: Ascending, position: 1, custom_attributes: {} }], lookup: {"date": 1, "name": 0, "time": 1, "time2": 1}, attributes: {} }): Failed to serialize value of type f64 using schema Union(UnionSchema { schemas: [Null, Long], variant_index: {Null: 0, Long: 1} }): 12345678.9. Cause: Cannot find a Double schema in [Null, Long]"#
1728 );
1729 }
1730 }
1731 Ok(())
1732 }
1733
1734 #[test]
1735 fn avro_4063_flush_applies_to_inner_writer() -> TestResult {
1736 const SCHEMA: &str = r#"
1737 {
1738 "type": "record",
1739 "name": "ExampleSchema",
1740 "fields": [
1741 {"name": "exampleField", "type": "string"}
1742 ]
1743 }
1744 "#;
1745
1746 #[derive(Clone, Default)]
1747 struct TestBuffer(Rc<RefCell<Vec<u8>>>);
1748
1749 impl TestBuffer {
1750 fn len(&self) -> usize {
1751 self.0.borrow().len()
1752 }
1753 }
1754
1755 impl Write for TestBuffer {
1756 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
1757 self.0.borrow_mut().write(buf)
1758 }
1759
1760 fn flush(&mut self) -> std::io::Result<()> {
1761 Ok(())
1762 }
1763 }
1764
1765 let shared_buffer = TestBuffer::default();
1766
1767 let buffered_writer = std::io::BufWriter::new(shared_buffer.clone());
1768
1769 let schema = Schema::parse_str(SCHEMA)?;
1770
1771 let mut writer = Writer::new(&schema, buffered_writer)?;
1772
1773 let mut record = Record::new(writer.schema()).unwrap();
1774 record.put("exampleField", "value");
1775
1776 writer.append_value(record)?;
1777 writer.flush()?;
1778
1779 assert_eq!(
1780 shared_buffer.len(),
1781 151,
1782 "the test buffer was not fully written to after Writer::flush was called"
1783 );
1784
1785 Ok(())
1786 }
1787
1788 #[test]
1789 fn avro_rs_439_specific_single_object_writer_ref() -> TestResult {
1790 #[derive(Serialize)]
1791 struct Recursive {
1792 field: bool,
1793 recurse: Option<Box<Recursive>>,
1794 }
1795
1796 impl AvroSchema for Recursive {
1797 fn get_schema() -> Schema {
1798 Schema::parse_str(
1799 r#"{
1800 "name": "Recursive",
1801 "type": "record",
1802 "fields": [
1803 { "name": "field", "type": "boolean" },
1804 { "name": "recurse", "type": ["null", "Recursive"] }
1805 ]
1806 }"#,
1807 )
1808 .unwrap()
1809 }
1810 }
1811
1812 let mut buffer = Vec::new();
1813 let writer = SpecificSingleObjectWriter::new()?;
1814
1815 writer.write(
1816 Recursive {
1817 field: true,
1818 recurse: Some(Box::new(Recursive {
1819 field: false,
1820 recurse: None,
1821 })),
1822 },
1823 &mut buffer,
1824 )?;
1825 assert_eq!(
1826 buffer,
1827 &[195, 1, 83, 223, 43, 26, 181, 179, 227, 224, 1, 2, 0, 0][..]
1828 );
1829
1830 Ok(())
1831 }
1832
1833 #[test]
1834 fn avro_rs_310_append_unvalidated_value() -> TestResult {
1835 let schema = Schema::String;
1836 let value = Value::Int(1);
1837
1838 let mut writer = Writer::new(&schema, Vec::new())?;
1839 writer.unvalidated_append_value_ref(&value)?;
1840 writer.unvalidated_append_value(value)?;
1841 let buffer = writer.into_inner()?;
1842
1843 assert_eq!(&buffer[buffer.len() - 18..buffer.len() - 16], &[2, 2]);
1845
1846 let mut writer = Writer::new(&schema, Vec::new())?;
1847 let value = Value::Int(1);
1848 let err = writer.append_value_ref(&value).unwrap_err();
1849 assert_eq!(
1850 err.to_string(),
1851 "Value Int(1) does not match schema String: Reason: Unsupported value-schema combination! Value: Int(1), schema: String"
1852 );
1853 let err = writer.append_value(value).unwrap_err();
1854 assert_eq!(
1855 err.to_string(),
1856 "Value Int(1) does not match schema String: Reason: Unsupported value-schema combination! Value: Int(1), schema: String"
1857 );
1858
1859 Ok(())
1860 }
1861}