1use crate::{
20 AvroResult, Codec, Error,
21 encode::{encode, encode_internal, encode_to_vec},
22 error::Details,
23 headers::{HeaderBuilder, RabinFingerprintHeader},
24 schema::{NamesRef, ResolvedOwnedSchema, ResolvedSchema, Schema},
25 serde::{AvroSchema, ser_schema::SchemaAwareWriteSerializer},
26 types::Value,
27};
28use serde::Serialize;
29use std::{
30 collections::HashMap, io::Write, marker::PhantomData, mem::ManuallyDrop, ops::RangeInclusive,
31};
32
33const DEFAULT_BLOCK_SIZE: usize = 16000;
34const AVRO_OBJECT_HEADER: &[u8] = b"Obj\x01";
35
36pub struct Writer<'a, W: Write> {
42 schema: &'a Schema,
43 writer: W,
44 resolved_schema: ResolvedSchema<'a>,
45 codec: Codec,
46 block_size: usize,
47 buffer: Vec<u8>,
48 num_values: usize,
49 marker: [u8; 16],
50 has_header: bool,
51 user_metadata: HashMap<String, Value>,
52}
53
54#[bon::bon]
55impl<'a, W: Write> Writer<'a, W> {
56 #[builder]
57 pub fn builder(
58 schema: &'a Schema,
59 schemata: Option<Vec<&'a Schema>>,
60 writer: W,
61 #[builder(default = Codec::Null)] codec: Codec,
62 #[builder(default = DEFAULT_BLOCK_SIZE)] block_size: usize,
63 #[builder(default = generate_sync_marker())] marker: [u8; 16],
64 #[builder(default = false)]
68 has_header: bool,
69 #[builder(default)] user_metadata: HashMap<String, Value>,
70 ) -> AvroResult<Self> {
71 let resolved_schema = if let Some(schemata) = schemata {
72 ResolvedSchema::try_from(schemata)?
73 } else {
74 ResolvedSchema::try_from(schema)?
75 };
76 Ok(Self {
77 schema,
78 writer,
79 resolved_schema,
80 codec,
81 block_size,
82 buffer: Vec::with_capacity(block_size),
83 num_values: 0,
84 marker,
85 has_header,
86 user_metadata,
87 })
88 }
89}
90
91impl<'a, W: Write> Writer<'a, W> {
92 pub fn new(schema: &'a Schema, writer: W) -> AvroResult<Self> {
96 Writer::with_codec(schema, writer, Codec::Null)
97 }
98
99 pub fn with_codec(schema: &'a Schema, writer: W, codec: Codec) -> AvroResult<Self> {
102 Self::builder()
103 .schema(schema)
104 .writer(writer)
105 .codec(codec)
106 .build()
107 }
108
109 pub fn with_schemata(
114 schema: &'a Schema,
115 schemata: Vec<&'a Schema>,
116 writer: W,
117 codec: Codec,
118 ) -> AvroResult<Self> {
119 Self::builder()
120 .schema(schema)
121 .schemata(schemata)
122 .writer(writer)
123 .codec(codec)
124 .build()
125 }
126
127 pub fn append_to(schema: &'a Schema, writer: W, marker: [u8; 16]) -> AvroResult<Self> {
131 Writer::append_to_with_codec(schema, writer, Codec::Null, marker)
132 }
133
134 pub fn append_to_with_codec(
137 schema: &'a Schema,
138 writer: W,
139 codec: Codec,
140 marker: [u8; 16],
141 ) -> AvroResult<Self> {
142 Self::builder()
143 .schema(schema)
144 .writer(writer)
145 .codec(codec)
146 .marker(marker)
147 .has_header(true)
148 .build()
149 }
150
151 pub fn append_to_with_codec_schemata(
154 schema: &'a Schema,
155 schemata: Vec<&'a Schema>,
156 writer: W,
157 codec: Codec,
158 marker: [u8; 16],
159 ) -> AvroResult<Self> {
160 Self::builder()
161 .schema(schema)
162 .schemata(schemata)
163 .writer(writer)
164 .codec(codec)
165 .marker(marker)
166 .has_header(true)
167 .build()
168 }
169
170 pub fn schema(&self) -> &'a Schema {
172 self.schema
173 }
174
175 #[deprecated(since = "0.22.0", note = "Use `Writer::append_value` instead")]
177 pub fn append<T: Into<Value>>(&mut self, value: T) -> AvroResult<usize> {
178 self.append_value(value)
179 }
180
181 pub fn append_value<T: Into<Value>>(&mut self, value: T) -> AvroResult<usize> {
189 let avro = value.into();
190 self.append_value_ref(&avro)
191 }
192
193 pub fn append_value_ref(&mut self, value: &Value) -> AvroResult<usize> {
201 if let Some(reason) = value.validate_internal(
202 self.schema,
203 self.resolved_schema.get_names(),
204 &self.schema.namespace(),
205 ) {
206 return Err(Details::ValidationWithReason {
207 value: value.clone(),
208 schema: self.schema.clone(),
209 reason,
210 }
211 .into());
212 }
213 self.unvalidated_append_value_ref(value)
214 }
215
216 pub fn unvalidated_append_value<T: Into<Value>>(&mut self, value: T) -> AvroResult<usize> {
228 let value = value.into();
229 self.unvalidated_append_value_ref(&value)
230 }
231
232 pub fn unvalidated_append_value_ref(&mut self, value: &Value) -> AvroResult<usize> {
244 let n = self.maybe_write_header()?;
245 encode_internal(
246 value,
247 self.schema,
248 self.resolved_schema.get_names(),
249 &self.schema.namespace(),
250 &mut self.buffer,
251 )?;
252
253 self.num_values += 1;
254
255 if self.buffer.len() >= self.block_size {
256 return self.flush().map(|b| b + n);
257 }
258
259 Ok(n)
260 }
261
262 pub fn append_ser<S: Serialize>(&mut self, value: S) -> AvroResult<usize> {
272 let n = self.maybe_write_header()?;
273
274 let mut serializer = SchemaAwareWriteSerializer::new(
275 &mut self.buffer,
276 self.schema,
277 self.resolved_schema.get_names(),
278 None,
279 );
280 value.serialize(&mut serializer)?;
281 self.num_values += 1;
282
283 if self.buffer.len() >= self.block_size {
284 return self.flush().map(|b| b + n);
285 }
286
287 Ok(n)
288 }
289
290 pub fn extend<I, T: Into<Value>>(&mut self, values: I) -> AvroResult<usize>
298 where
299 I: IntoIterator<Item = T>,
300 {
301 let mut num_bytes = 0;
316 for value in values {
317 num_bytes += self.append_value(value)?;
318 }
319 num_bytes += self.flush()?;
320
321 Ok(num_bytes)
322 }
323
324 pub fn extend_ser<I, T: Serialize>(&mut self, values: I) -> AvroResult<usize>
333 where
334 I: IntoIterator<Item = T>,
335 {
336 let mut num_bytes = 0;
351 for value in values {
352 num_bytes += self.append_ser(value)?;
353 }
354 num_bytes += self.flush()?;
355
356 Ok(num_bytes)
357 }
358
359 pub fn extend_from_slice(&mut self, values: &[Value]) -> AvroResult<usize> {
367 let mut num_bytes = 0;
368 for value in values {
369 num_bytes += self.append_value_ref(value)?;
370 }
371 num_bytes += self.flush()?;
372
373 Ok(num_bytes)
374 }
375
376 pub fn flush(&mut self) -> AvroResult<usize> {
384 let mut num_bytes = self.maybe_write_header()?;
385 if self.num_values == 0 {
386 return Ok(num_bytes);
387 }
388
389 self.codec.compress(&mut self.buffer)?;
390
391 let num_values = self.num_values;
392 let stream_len = self.buffer.len();
393
394 num_bytes += self.append_raw(&num_values.into(), &Schema::Long)?
395 + self.append_raw(&stream_len.into(), &Schema::Long)?
396 + self
397 .writer
398 .write(self.buffer.as_ref())
399 .map_err(Details::WriteBytes)?
400 + self.append_marker()?;
401
402 self.buffer.clear();
403 self.num_values = 0;
404
405 self.writer.flush().map_err(Details::FlushWriter)?;
406
407 Ok(num_bytes)
408 }
409
410 pub fn into_inner(mut self) -> AvroResult<W> {
415 self.maybe_write_header()?;
416 self.flush()?;
417
418 let mut this = ManuallyDrop::new(self);
419
420 let _buffer = std::mem::take(&mut this.buffer);
422 let _user_metadata = std::mem::take(&mut this.user_metadata);
423 unsafe { std::ptr::drop_in_place(&mut this.resolved_schema) };
425
426 let writer = unsafe { std::ptr::read(&this.writer) };
428
429 Ok(writer)
430 }
431
432 pub fn get_ref(&self) -> &W {
437 &self.writer
438 }
439
440 pub fn get_mut(&mut self) -> &mut W {
447 &mut self.writer
448 }
449
450 fn append_marker(&mut self) -> AvroResult<usize> {
452 self.writer
455 .write(&self.marker)
456 .map_err(|e| Details::WriteMarker(e).into())
457 }
458
459 fn append_raw(&mut self, value: &Value, schema: &Schema) -> AvroResult<usize> {
461 self.append_bytes(encode_to_vec(value, schema)?.as_ref())
462 }
463
464 fn append_bytes(&mut self, bytes: &[u8]) -> AvroResult<usize> {
466 self.writer
467 .write(bytes)
468 .map_err(|e| Details::WriteBytes(e).into())
469 }
470
471 pub fn add_user_metadata<T: AsRef<[u8]>>(&mut self, key: String, value: T) -> AvroResult<()> {
474 if !self.has_header {
475 if key.starts_with("avro.") {
476 return Err(Details::InvalidMetadataKey(key).into());
477 }
478 self.user_metadata
479 .insert(key, Value::Bytes(value.as_ref().to_vec()));
480 Ok(())
481 } else {
482 Err(Details::FileHeaderAlreadyWritten.into())
483 }
484 }
485
486 fn header(&self) -> Result<Vec<u8>, Error> {
488 let schema_bytes = serde_json::to_string(self.schema)
489 .map_err(Details::ConvertJsonToString)?
490 .into_bytes();
491
492 let mut metadata = HashMap::with_capacity(2);
493 metadata.insert("avro.schema", Value::Bytes(schema_bytes));
494 if self.codec != Codec::Null {
495 metadata.insert("avro.codec", self.codec.into());
496 }
497 match self.codec {
498 #[cfg(feature = "bzip")]
499 Codec::Bzip2(settings) => {
500 metadata.insert(
501 "avro.codec.compression_level",
502 Value::Bytes(vec![settings.compression_level]),
503 );
504 }
505 #[cfg(feature = "xz")]
506 Codec::Xz(settings) => {
507 metadata.insert(
508 "avro.codec.compression_level",
509 Value::Bytes(vec![settings.compression_level]),
510 );
511 }
512 #[cfg(feature = "zstandard")]
513 Codec::Zstandard(settings) => {
514 metadata.insert(
515 "avro.codec.compression_level",
516 Value::Bytes(vec![settings.compression_level]),
517 );
518 }
519 _ => {}
520 }
521
522 for (k, v) in &self.user_metadata {
523 metadata.insert(k.as_str(), v.clone());
524 }
525
526 let mut header = Vec::new();
527 header.extend_from_slice(AVRO_OBJECT_HEADER);
528 encode(&metadata.into(), &Schema::map(Schema::Bytes), &mut header)?;
529 header.extend_from_slice(&self.marker);
530
531 Ok(header)
532 }
533
534 fn maybe_write_header(&mut self) -> AvroResult<usize> {
535 if !self.has_header {
536 let header = self.header()?;
537 let n = self.append_bytes(header.as_ref())?;
538 self.has_header = true;
539 Ok(n)
540 } else {
541 Ok(0)
542 }
543 }
544}
545
546impl<W: Write> Drop for Writer<'_, W> {
547 fn drop(&mut self) {
549 let _ = self.maybe_write_header();
550 let _ = self.flush();
551 }
552}
553
554fn write_avro_datum<T: Into<Value>, W: Write>(
560 schema: &Schema,
561 value: T,
562 writer: &mut W,
563) -> Result<(), Error> {
564 let avro = value.into();
565 if !avro.validate(schema) {
566 return Err(Details::Validation.into());
567 }
568 encode(&avro, schema, writer)?;
569 Ok(())
570}
571
572fn write_avro_datum_schemata<T: Into<Value>>(
573 schema: &Schema,
574 schemata: Vec<&Schema>,
575 value: T,
576 buffer: &mut Vec<u8>,
577) -> AvroResult<usize> {
578 let avro = value.into();
579 let rs = ResolvedSchema::try_from(schemata)?;
580 let names = rs.get_names();
581 let enclosing_namespace = schema.namespace();
582 if let Some(_err) = avro.validate_internal(schema, names, &enclosing_namespace) {
583 return Err(Details::Validation.into());
584 }
585 encode_internal(&avro, schema, names, &enclosing_namespace, buffer)
586}
587
588pub struct GenericSingleObjectWriter {
592 buffer: Vec<u8>,
593 resolved: ResolvedOwnedSchema,
594}
595
596impl GenericSingleObjectWriter {
597 pub fn new_with_capacity(
598 schema: &Schema,
599 initial_buffer_cap: usize,
600 ) -> AvroResult<GenericSingleObjectWriter> {
601 let header_builder = RabinFingerprintHeader::from_schema(schema);
602 Self::new_with_capacity_and_header_builder(schema, initial_buffer_cap, header_builder)
603 }
604
605 pub fn new_with_capacity_and_header_builder<HB: HeaderBuilder>(
606 schema: &Schema,
607 initial_buffer_cap: usize,
608 header_builder: HB,
609 ) -> AvroResult<GenericSingleObjectWriter> {
610 let mut buffer = Vec::with_capacity(initial_buffer_cap);
611 let header = header_builder.build_header();
612 buffer.extend_from_slice(&header);
613
614 Ok(GenericSingleObjectWriter {
615 buffer,
616 resolved: ResolvedOwnedSchema::try_from(schema.clone())?,
617 })
618 }
619
620 const HEADER_LENGTH_RANGE: RangeInclusive<usize> = 10_usize..=20_usize;
621
622 pub fn write_value_ref<W: Write>(&mut self, v: &Value, writer: &mut W) -> AvroResult<usize> {
624 let original_length = self.buffer.len();
625 if !Self::HEADER_LENGTH_RANGE.contains(&original_length) {
626 Err(Details::IllegalSingleObjectWriterState.into())
627 } else {
628 write_value_ref_owned_resolved(&self.resolved, v, &mut self.buffer)?;
629 writer
630 .write_all(&self.buffer)
631 .map_err(Details::WriteBytes)?;
632 let len = self.buffer.len();
633 self.buffer.truncate(original_length);
634 Ok(len)
635 }
636 }
637
638 pub fn write_value<W: Write>(&mut self, v: Value, writer: &mut W) -> AvroResult<usize> {
640 self.write_value_ref(&v, writer)
641 }
642}
643
644pub struct SpecificSingleObjectWriter<T>
646where
647 T: AvroSchema,
648{
649 resolved: ResolvedOwnedSchema,
650 header: Vec<u8>,
651 _model: PhantomData<T>,
652}
653
654impl<T> SpecificSingleObjectWriter<T>
655where
656 T: AvroSchema,
657{
658 pub fn new() -> AvroResult<Self> {
659 let schema = T::get_schema();
660 let header = RabinFingerprintHeader::from_schema(&schema).build_header();
661 let resolved = ResolvedOwnedSchema::new(schema)?;
662 Ok(Self {
664 resolved,
665 header,
666 _model: PhantomData,
667 })
668 }
669
670 pub fn new_with_header_builder(header_builder: impl HeaderBuilder) -> AvroResult<Self> {
671 let header = header_builder.build_header();
672 let resolved = ResolvedOwnedSchema::new(T::get_schema())?;
673 Ok(Self {
674 resolved,
675 header,
676 _model: PhantomData,
677 })
678 }
679
680 #[deprecated(since = "0.22.0", note = "Use new() instead")]
682 pub fn with_capacity(_buffer_cap: usize) -> AvroResult<Self> {
683 Self::new()
684 }
685}
686
687impl<T> SpecificSingleObjectWriter<T>
688where
689 T: AvroSchema + Into<Value>,
690{
691 pub fn write_value<W: Write>(&self, data: T, writer: &mut W) -> AvroResult<usize> {
698 writer
699 .write_all(&self.header)
700 .map_err(Details::WriteBytes)?;
701 let value: Value = data.into();
702 let bytes = write_value_ref_owned_resolved(&self.resolved, &value, writer)?;
703 Ok(bytes + self.header.len())
704 }
705}
706
707impl<T> SpecificSingleObjectWriter<T>
708where
709 T: AvroSchema + Serialize,
710{
711 pub fn write_ref<W: Write>(&self, data: &T, writer: &mut W) -> AvroResult<usize> {
718 writer
719 .write_all(&self.header)
720 .map_err(Details::WriteBytes)?;
721
722 let bytes = write_avro_datum_ref(
723 self.resolved.get_root_schema(),
724 self.resolved.get_names(),
725 data,
726 writer,
727 )?;
728
729 Ok(bytes + self.header.len())
730 }
731
732 pub fn write<W: Write>(&self, data: T, writer: &mut W) -> AvroResult<usize> {
739 self.write_ref(&data, writer)
740 }
741}
742
743fn write_value_ref_owned_resolved<W: Write>(
744 resolved_schema: &ResolvedOwnedSchema,
745 value: &Value,
746 writer: &mut W,
747) -> AvroResult<usize> {
748 let root_schema = resolved_schema.get_root_schema();
749 if let Some(reason) = value.validate_internal(
750 root_schema,
751 resolved_schema.get_names(),
752 &root_schema.namespace(),
753 ) {
754 return Err(Details::ValidationWithReason {
755 value: value.clone(),
756 schema: root_schema.clone(),
757 reason,
758 }
759 .into());
760 }
761 encode_internal(
762 value,
763 root_schema,
764 resolved_schema.get_names(),
765 &root_schema.namespace(),
766 writer,
767 )
768}
769
770pub fn to_avro_datum<T: Into<Value>>(schema: &Schema, value: T) -> AvroResult<Vec<u8>> {
777 let mut buffer = Vec::new();
778 write_avro_datum(schema, value, &mut buffer)?;
779 Ok(buffer)
780}
781
782pub fn write_avro_datum_ref<T: Serialize, W: Write>(
789 schema: &Schema,
790 names: &NamesRef,
791 data: &T,
792 writer: &mut W,
793) -> AvroResult<usize> {
794 let mut serializer = SchemaAwareWriteSerializer::new(writer, schema, names, None);
795 data.serialize(&mut serializer)
796}
797
798pub fn to_avro_datum_schemata<T: Into<Value>>(
803 schema: &Schema,
804 schemata: Vec<&Schema>,
805 value: T,
806) -> AvroResult<Vec<u8>> {
807 let mut buffer = Vec::new();
808 write_avro_datum_schemata(schema, schemata, value, &mut buffer)?;
809 Ok(buffer)
810}
811
812#[cfg(not(target_arch = "wasm32"))]
813fn generate_sync_marker() -> [u8; 16] {
814 let mut marker = [0_u8; 16];
815 std::iter::repeat_with(rand::random)
816 .take(16)
817 .enumerate()
818 .for_each(|(i, n)| marker[i] = n);
819 marker
820}
821
822#[cfg(target_arch = "wasm32")]
823fn generate_sync_marker() -> [u8; 16] {
824 let mut marker = [0_u8; 16];
825 std::iter::repeat_with(quad_rand::rand)
826 .take(4)
827 .flat_map(|i| i.to_be_bytes())
828 .enumerate()
829 .for_each(|(i, n)| marker[i] = n);
830 marker
831}
832
833#[cfg(test)]
834mod tests {
835 use std::{cell::RefCell, rc::Rc};
836
837 use super::*;
838 use crate::{
839 Reader,
840 decimal::Decimal,
841 duration::{Days, Duration, Millis, Months},
842 headers::GlueSchemaUuidHeader,
843 rabin::Rabin,
844 schema::{DecimalSchema, FixedSchema, Name},
845 types::Record,
846 util::zig_i64,
847 };
848 use pretty_assertions::assert_eq;
849 use serde::{Deserialize, Serialize};
850 use uuid::Uuid;
851
852 use crate::schema::InnerDecimalSchema;
853 use crate::{codec::DeflateSettings, error::Details};
854 use apache_avro_test_helper::TestResult;
855
856 const AVRO_OBJECT_HEADER_LEN: usize = AVRO_OBJECT_HEADER.len();
857
858 const SCHEMA: &str = r#"
859 {
860 "type": "record",
861 "name": "test",
862 "fields": [
863 {
864 "name": "a",
865 "type": "long",
866 "default": 42
867 },
868 {
869 "name": "b",
870 "type": "string"
871 }
872 ]
873 }
874 "#;
875
876 const UNION_SCHEMA: &str = r#"["null", "long"]"#;
877
878 #[test]
879 fn test_to_avro_datum() -> TestResult {
880 let schema = Schema::parse_str(SCHEMA)?;
881 let mut record = Record::new(&schema).unwrap();
882 record.put("a", 27i64);
883 record.put("b", "foo");
884
885 let mut expected = Vec::new();
886 zig_i64(27, &mut expected)?;
887 zig_i64(3, &mut expected)?;
888 expected.extend([b'f', b'o', b'o']);
889
890 assert_eq!(to_avro_datum(&schema, record)?, expected);
891
892 Ok(())
893 }
894
895 #[test]
896 fn avro_rs_193_write_avro_datum_ref() -> TestResult {
897 #[derive(Serialize)]
898 struct TestStruct {
899 a: i64,
900 b: String,
901 }
902
903 let schema = Schema::parse_str(SCHEMA)?;
904 let mut writer: Vec<u8> = Vec::new();
905 let data = TestStruct {
906 a: 27,
907 b: "foo".to_string(),
908 };
909
910 let mut expected = Vec::new();
911 zig_i64(27, &mut expected)?;
912 zig_i64(3, &mut expected)?;
913 expected.extend([b'f', b'o', b'o']);
914
915 let bytes = write_avro_datum_ref(&schema, &HashMap::new(), &data, &mut writer)?;
916
917 assert_eq!(bytes, expected.len());
918 assert_eq!(writer, expected);
919
920 Ok(())
921 }
922
923 #[test]
924 fn avro_rs_220_flush_write_header() -> TestResult {
925 let schema = Schema::parse_str(SCHEMA)?;
926
927 let mut writer = Writer::new(&schema, Vec::new())?;
929 writer.flush()?;
930 let result = writer.into_inner()?;
931 assert_eq!(result.len(), 147);
932
933 let mut writer = Writer::builder()
935 .has_header(true)
936 .schema(&schema)
937 .writer(Vec::new())
938 .build()?;
939 writer.flush()?;
940 let result = writer.into_inner()?;
941 assert_eq!(result.len(), 0);
942
943 Ok(())
944 }
945
946 #[test]
947 fn test_union_not_null() -> TestResult {
948 let schema = Schema::parse_str(UNION_SCHEMA)?;
949 let union = Value::Union(1, Box::new(Value::Long(3)));
950
951 let mut expected = Vec::new();
952 zig_i64(1, &mut expected)?;
953 zig_i64(3, &mut expected)?;
954
955 assert_eq!(to_avro_datum(&schema, union)?, expected);
956
957 Ok(())
958 }
959
960 #[test]
961 fn test_union_null() -> TestResult {
962 let schema = Schema::parse_str(UNION_SCHEMA)?;
963 let union = Value::Union(0, Box::new(Value::Null));
964
965 let mut expected = Vec::new();
966 zig_i64(0, &mut expected)?;
967
968 assert_eq!(to_avro_datum(&schema, union)?, expected);
969
970 Ok(())
971 }
972
973 fn logical_type_test<T: Into<Value> + Clone>(
974 schema_str: &'static str,
975
976 expected_schema: &Schema,
977 value: Value,
978
979 raw_schema: &Schema,
980 raw_value: T,
981 ) -> TestResult {
982 let schema = Schema::parse_str(schema_str)?;
983 assert_eq!(&schema, expected_schema);
984 let ser = to_avro_datum(&schema, value.clone())?;
986 let raw_ser = to_avro_datum(raw_schema, raw_value)?;
987 assert_eq!(ser, raw_ser);
988
989 let mut r = ser.as_slice();
991 let de = crate::from_avro_datum(&schema, &mut r, None)?;
992 assert_eq!(de, value);
993 Ok(())
994 }
995
996 #[test]
997 fn date() -> TestResult {
998 logical_type_test(
999 r#"{"type": "int", "logicalType": "date"}"#,
1000 &Schema::Date,
1001 Value::Date(1_i32),
1002 &Schema::Int,
1003 1_i32,
1004 )
1005 }
1006
1007 #[test]
1008 fn time_millis() -> TestResult {
1009 logical_type_test(
1010 r#"{"type": "int", "logicalType": "time-millis"}"#,
1011 &Schema::TimeMillis,
1012 Value::TimeMillis(1_i32),
1013 &Schema::Int,
1014 1_i32,
1015 )
1016 }
1017
1018 #[test]
1019 fn time_micros() -> TestResult {
1020 logical_type_test(
1021 r#"{"type": "long", "logicalType": "time-micros"}"#,
1022 &Schema::TimeMicros,
1023 Value::TimeMicros(1_i64),
1024 &Schema::Long,
1025 1_i64,
1026 )
1027 }
1028
1029 #[test]
1030 fn timestamp_millis() -> TestResult {
1031 logical_type_test(
1032 r#"{"type": "long", "logicalType": "timestamp-millis"}"#,
1033 &Schema::TimestampMillis,
1034 Value::TimestampMillis(1_i64),
1035 &Schema::Long,
1036 1_i64,
1037 )
1038 }
1039
1040 #[test]
1041 fn timestamp_micros() -> TestResult {
1042 logical_type_test(
1043 r#"{"type": "long", "logicalType": "timestamp-micros"}"#,
1044 &Schema::TimestampMicros,
1045 Value::TimestampMicros(1_i64),
1046 &Schema::Long,
1047 1_i64,
1048 )
1049 }
1050
1051 #[test]
1052 fn decimal_fixed() -> TestResult {
1053 let size = 30;
1054 let fixed = FixedSchema {
1055 name: Name::new("decimal")?,
1056 aliases: None,
1057 doc: None,
1058 size,
1059 default: None,
1060 attributes: Default::default(),
1061 };
1062 let inner = InnerDecimalSchema::Fixed(fixed.clone());
1063 let value = vec![0u8; size];
1064 logical_type_test(
1065 r#"{"type": {"type": "fixed", "size": 30, "name": "decimal"}, "logicalType": "decimal", "precision": 20, "scale": 5}"#,
1066 &Schema::Decimal(DecimalSchema {
1067 precision: 20,
1068 scale: 5,
1069 inner,
1070 }),
1071 Value::Decimal(Decimal::from(value.clone())),
1072 &Schema::Fixed(fixed),
1073 Value::Fixed(size, value),
1074 )
1075 }
1076
1077 #[test]
1078 fn decimal_bytes() -> TestResult {
1079 let value = vec![0u8; 10];
1080 logical_type_test(
1081 r#"{"type": "bytes", "logicalType": "decimal", "precision": 4, "scale": 3}"#,
1082 &Schema::Decimal(DecimalSchema {
1083 precision: 4,
1084 scale: 3,
1085 inner: InnerDecimalSchema::Bytes,
1086 }),
1087 Value::Decimal(Decimal::from(value.clone())),
1088 &Schema::Bytes,
1089 value,
1090 )
1091 }
1092
1093 #[test]
1094 fn duration() -> TestResult {
1095 let inner = Schema::Fixed(FixedSchema {
1096 name: Name::new("duration")?,
1097 aliases: None,
1098 doc: None,
1099 size: 12,
1100 default: None,
1101 attributes: Default::default(),
1102 });
1103 let value = Value::Duration(Duration::new(
1104 Months::new(256),
1105 Days::new(512),
1106 Millis::new(1024),
1107 ));
1108 logical_type_test(
1109 r#"{"type": {"type": "fixed", "name": "duration", "size": 12}, "logicalType": "duration"}"#,
1110 &Schema::Duration(FixedSchema {
1111 name: Name::try_from("duration").expect("Name is valid"),
1112 aliases: None,
1113 doc: None,
1114 size: 12,
1115 default: None,
1116 attributes: Default::default(),
1117 }),
1118 value,
1119 &inner,
1120 Value::Fixed(12, vec![0, 1, 0, 0, 0, 2, 0, 0, 0, 4, 0, 0]),
1121 )
1122 }
1123
1124 #[test]
1125 fn test_writer_append() -> TestResult {
1126 let schema = Schema::parse_str(SCHEMA)?;
1127 let mut writer = Writer::new(&schema, Vec::new())?;
1128
1129 let mut record = Record::new(&schema).unwrap();
1130 record.put("a", 27i64);
1131 record.put("b", "foo");
1132
1133 let n1 = writer.append_value(record.clone())?;
1134 let n2 = writer.append_value(record.clone())?;
1135 let n3 = writer.flush()?;
1136 let result = writer.into_inner()?;
1137
1138 assert_eq!(n1 + n2 + n3, result.len());
1139
1140 let mut data = Vec::new();
1141 zig_i64(27, &mut data)?;
1142 zig_i64(3, &mut data)?;
1143 data.extend(b"foo");
1144 data.extend(data.clone());
1145
1146 assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1148 let last_data_byte = result.len() - 16;
1150 assert_eq!(
1151 &result[last_data_byte - data.len()..last_data_byte],
1152 data.as_slice()
1153 );
1154
1155 Ok(())
1156 }
1157
1158 #[test]
1159 fn test_writer_extend() -> TestResult {
1160 let schema = Schema::parse_str(SCHEMA)?;
1161 let mut writer = Writer::new(&schema, Vec::new())?;
1162
1163 let mut record = Record::new(&schema).unwrap();
1164 record.put("a", 27i64);
1165 record.put("b", "foo");
1166 let record_copy = record.clone();
1167 let records = vec![record, record_copy];
1168
1169 let n1 = writer.extend(records)?;
1170 let n2 = writer.flush()?;
1171 let result = writer.into_inner()?;
1172
1173 assert_eq!(n1 + n2, result.len());
1174
1175 let mut data = Vec::new();
1176 zig_i64(27, &mut data)?;
1177 zig_i64(3, &mut data)?;
1178 data.extend(b"foo");
1179 data.extend(data.clone());
1180
1181 assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1183 let last_data_byte = result.len() - 16;
1185 assert_eq!(
1186 &result[last_data_byte - data.len()..last_data_byte],
1187 data.as_slice()
1188 );
1189
1190 Ok(())
1191 }
1192
1193 #[derive(Debug, Clone, Deserialize, Serialize)]
1194 struct TestSerdeSerialize {
1195 a: i64,
1196 b: String,
1197 }
1198
1199 #[test]
1200 fn test_writer_append_ser() -> TestResult {
1201 let schema = Schema::parse_str(SCHEMA)?;
1202 let mut writer = Writer::new(&schema, Vec::new())?;
1203
1204 let record = TestSerdeSerialize {
1205 a: 27,
1206 b: "foo".to_owned(),
1207 };
1208
1209 let n1 = writer.append_ser(record)?;
1210 let n2 = writer.flush()?;
1211 let result = writer.into_inner()?;
1212
1213 assert_eq!(n1 + n2, result.len());
1214
1215 let mut data = Vec::new();
1216 zig_i64(27, &mut data)?;
1217 zig_i64(3, &mut data)?;
1218 data.extend(b"foo");
1219
1220 assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1222 let last_data_byte = result.len() - 16;
1224 assert_eq!(
1225 &result[last_data_byte - data.len()..last_data_byte],
1226 data.as_slice()
1227 );
1228
1229 Ok(())
1230 }
1231
1232 #[test]
1233 fn test_writer_extend_ser() -> TestResult {
1234 let schema = Schema::parse_str(SCHEMA)?;
1235 let mut writer = Writer::new(&schema, Vec::new())?;
1236
1237 let record = TestSerdeSerialize {
1238 a: 27,
1239 b: "foo".to_owned(),
1240 };
1241 let record_copy = record.clone();
1242 let records = vec![record, record_copy];
1243
1244 let n1 = writer.extend_ser(records)?;
1245 let n2 = writer.flush()?;
1246 let result = writer.into_inner()?;
1247
1248 assert_eq!(n1 + n2, result.len());
1249
1250 let mut data = Vec::new();
1251 zig_i64(27, &mut data)?;
1252 zig_i64(3, &mut data)?;
1253 data.extend(b"foo");
1254 data.extend(data.clone());
1255
1256 assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1258 let last_data_byte = result.len() - 16;
1260 assert_eq!(
1261 &result[last_data_byte - data.len()..last_data_byte],
1262 data.as_slice()
1263 );
1264
1265 Ok(())
1266 }
1267
1268 fn make_writer_with_codec(schema: &Schema) -> AvroResult<Writer<'_, Vec<u8>>> {
1269 Writer::with_codec(
1270 schema,
1271 Vec::new(),
1272 Codec::Deflate(DeflateSettings::default()),
1273 )
1274 }
1275
1276 fn make_writer_with_builder(schema: &Schema) -> AvroResult<Writer<'_, Vec<u8>>> {
1277 Writer::builder()
1278 .writer(Vec::new())
1279 .schema(schema)
1280 .codec(Codec::Deflate(DeflateSettings::default()))
1281 .block_size(100)
1282 .build()
1283 }
1284
1285 fn check_writer(mut writer: Writer<'_, Vec<u8>>, schema: &Schema) -> TestResult {
1286 let mut record = Record::new(schema).unwrap();
1287 record.put("a", 27i64);
1288 record.put("b", "foo");
1289
1290 let n1 = writer.append_value(record.clone())?;
1291 let n2 = writer.append_value(record.clone())?;
1292 let n3 = writer.flush()?;
1293 let result = writer.into_inner()?;
1294
1295 assert_eq!(n1 + n2 + n3, result.len());
1296
1297 let mut data = Vec::new();
1298 zig_i64(27, &mut data)?;
1299 zig_i64(3, &mut data)?;
1300 data.extend(b"foo");
1301 data.extend(data.clone());
1302 Codec::Deflate(DeflateSettings::default()).compress(&mut data)?;
1303
1304 assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1306 let last_data_byte = result.len() - 16;
1308 assert_eq!(
1309 &result[last_data_byte - data.len()..last_data_byte],
1310 data.as_slice()
1311 );
1312
1313 Ok(())
1314 }
1315
1316 #[test]
1317 fn test_writer_with_codec() -> TestResult {
1318 let schema = Schema::parse_str(SCHEMA)?;
1319 let writer = make_writer_with_codec(&schema)?;
1320 check_writer(writer, &schema)
1321 }
1322
1323 #[test]
1324 fn test_writer_with_builder() -> TestResult {
1325 let schema = Schema::parse_str(SCHEMA)?;
1326 let writer = make_writer_with_builder(&schema)?;
1327 check_writer(writer, &schema)
1328 }
1329
1330 #[test]
1331 fn test_logical_writer() -> TestResult {
1332 const LOGICAL_TYPE_SCHEMA: &str = r#"
1333 {
1334 "type": "record",
1335 "name": "logical_type_test",
1336 "fields": [
1337 {
1338 "name": "a",
1339 "type": [
1340 "null",
1341 {
1342 "type": "long",
1343 "logicalType": "timestamp-micros"
1344 }
1345 ]
1346 }
1347 ]
1348 }
1349 "#;
1350 let codec = Codec::Deflate(DeflateSettings::default());
1351 let schema = Schema::parse_str(LOGICAL_TYPE_SCHEMA)?;
1352 let mut writer = Writer::builder()
1353 .schema(&schema)
1354 .codec(codec)
1355 .writer(Vec::new())
1356 .build()?;
1357
1358 let mut record1 = Record::new(&schema).unwrap();
1359 record1.put(
1360 "a",
1361 Value::Union(1, Box::new(Value::TimestampMicros(1234_i64))),
1362 );
1363
1364 let mut record2 = Record::new(&schema).unwrap();
1365 record2.put("a", Value::Union(0, Box::new(Value::Null)));
1366
1367 let n1 = writer.append_value(record1)?;
1368 let n2 = writer.append_value(record2)?;
1369 let n3 = writer.flush()?;
1370 let result = writer.into_inner()?;
1371
1372 assert_eq!(n1 + n2 + n3, result.len());
1373
1374 let mut data = Vec::new();
1375 zig_i64(1, &mut data)?;
1377 zig_i64(1234, &mut data)?;
1378
1379 zig_i64(0, &mut data)?;
1381 codec.compress(&mut data)?;
1382
1383 assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1385 let last_data_byte = result.len() - 16;
1387 assert_eq!(
1388 &result[last_data_byte - data.len()..last_data_byte],
1389 data.as_slice()
1390 );
1391
1392 Ok(())
1393 }
1394
1395 #[test]
1396 fn test_avro_3405_writer_add_metadata_success() -> TestResult {
1397 let schema = Schema::parse_str(SCHEMA)?;
1398 let mut writer = Writer::new(&schema, Vec::new())?;
1399
1400 writer.add_user_metadata("stringKey".to_string(), String::from("stringValue"))?;
1401 writer.add_user_metadata("strKey".to_string(), "strValue")?;
1402 writer.add_user_metadata("bytesKey".to_string(), b"bytesValue")?;
1403 writer.add_user_metadata("vecKey".to_string(), vec![1, 2, 3])?;
1404
1405 let mut record = Record::new(&schema).unwrap();
1406 record.put("a", 27i64);
1407 record.put("b", "foo");
1408
1409 writer.append_value(record.clone())?;
1410 writer.append_value(record.clone())?;
1411 writer.flush()?;
1412 let result = writer.into_inner()?;
1413
1414 assert_eq!(result.len(), 244);
1415
1416 Ok(())
1417 }
1418
1419 #[test]
1420 fn test_avro_3881_metadata_empty_body() -> TestResult {
1421 let schema = Schema::parse_str(SCHEMA)?;
1422 let mut writer = Writer::new(&schema, Vec::new())?;
1423 writer.add_user_metadata("a".to_string(), "b")?;
1424 let result = writer.into_inner()?;
1425
1426 let reader = Reader::with_schema(&schema, &result[..])?;
1427 let mut expected = HashMap::new();
1428 expected.insert("a".to_string(), vec![b'b']);
1429 assert_eq!(reader.user_metadata(), &expected);
1430 assert_eq!(reader.into_iter().count(), 0);
1431
1432 Ok(())
1433 }
1434
1435 #[test]
1436 fn test_avro_3405_writer_add_metadata_failure() -> TestResult {
1437 let schema = Schema::parse_str(SCHEMA)?;
1438 let mut writer = Writer::new(&schema, Vec::new())?;
1439
1440 let mut record = Record::new(&schema).unwrap();
1441 record.put("a", 27i64);
1442 record.put("b", "foo");
1443 writer.append_value(record.clone())?;
1444
1445 match writer
1446 .add_user_metadata("stringKey".to_string(), String::from("value2"))
1447 .map_err(Error::into_details)
1448 {
1449 Err(e @ Details::FileHeaderAlreadyWritten) => {
1450 assert_eq!(e.to_string(), "The file metadata is already flushed.")
1451 }
1452 Err(e) => panic!("Unexpected error occurred while writing user metadata: {e:?}"),
1453 Ok(_) => panic!("Expected an error that metadata cannot be added after adding data"),
1454 }
1455
1456 Ok(())
1457 }
1458
1459 #[test]
1460 fn test_avro_3405_writer_add_metadata_reserved_prefix_failure() -> TestResult {
1461 let schema = Schema::parse_str(SCHEMA)?;
1462 let mut writer = Writer::new(&schema, Vec::new())?;
1463
1464 let key = "avro.stringKey".to_string();
1465 match writer
1466 .add_user_metadata(key.clone(), "value")
1467 .map_err(Error::into_details)
1468 {
1469 Err(ref e @ Details::InvalidMetadataKey(_)) => {
1470 assert_eq!(
1471 e.to_string(),
1472 format!(
1473 "Metadata keys starting with 'avro.' are reserved for internal usage: {key}."
1474 )
1475 )
1476 }
1477 Err(e) => panic!(
1478 "Unexpected error occurred while writing user metadata with reserved prefix ('avro.'): {e:?}"
1479 ),
1480 Ok(_) => {
1481 panic!("Expected an error that the metadata key cannot be prefixed with 'avro.'")
1482 }
1483 }
1484
1485 Ok(())
1486 }
1487
1488 #[test]
1489 fn test_avro_3405_writer_add_metadata_with_builder_api_success() -> TestResult {
1490 let schema = Schema::parse_str(SCHEMA)?;
1491
1492 let mut user_meta_data: HashMap<String, Value> = HashMap::new();
1493 user_meta_data.insert(
1494 "stringKey".to_string(),
1495 Value::String("stringValue".to_string()),
1496 );
1497 user_meta_data.insert("bytesKey".to_string(), Value::Bytes(b"bytesValue".to_vec()));
1498 user_meta_data.insert("vecKey".to_string(), Value::Bytes(vec![1, 2, 3]));
1499
1500 let writer: Writer<'_, Vec<u8>> = Writer::builder()
1501 .writer(Vec::new())
1502 .schema(&schema)
1503 .user_metadata(user_meta_data.clone())
1504 .build()?;
1505
1506 assert_eq!(writer.user_metadata, user_meta_data);
1507
1508 Ok(())
1509 }
1510
1511 #[derive(Serialize, Clone)]
1512 struct TestSingleObjectWriter {
1513 a: i64,
1514 b: f64,
1515 c: Vec<String>,
1516 }
1517
1518 impl AvroSchema for TestSingleObjectWriter {
1519 fn get_schema() -> Schema {
1520 let schema = r#"
1521 {
1522 "type":"record",
1523 "name":"TestSingleObjectWrtierSerialize",
1524 "fields":[
1525 {
1526 "name":"a",
1527 "type":"long"
1528 },
1529 {
1530 "name":"b",
1531 "type":"double"
1532 },
1533 {
1534 "name":"c",
1535 "type":{
1536 "type":"array",
1537 "items":"string"
1538 }
1539 }
1540 ]
1541 }
1542 "#;
1543 Schema::parse_str(schema).unwrap()
1544 }
1545 }
1546
1547 impl From<TestSingleObjectWriter> for Value {
1548 fn from(obj: TestSingleObjectWriter) -> Value {
1549 Value::Record(vec![
1550 ("a".into(), obj.a.into()),
1551 ("b".into(), obj.b.into()),
1552 (
1553 "c".into(),
1554 Value::Array(obj.c.into_iter().map(|s| s.into()).collect()),
1555 ),
1556 ])
1557 }
1558 }
1559
1560 #[test]
1561 fn test_single_object_writer() -> TestResult {
1562 let mut buf: Vec<u8> = Vec::new();
1563 let obj = TestSingleObjectWriter {
1564 a: 300,
1565 b: 34.555,
1566 c: vec!["cat".into(), "dog".into()],
1567 };
1568 let mut writer = GenericSingleObjectWriter::new_with_capacity(
1569 &TestSingleObjectWriter::get_schema(),
1570 1024,
1571 )
1572 .expect("Should resolve schema");
1573 let value = obj.into();
1574 let written_bytes = writer
1575 .write_value_ref(&value, &mut buf)
1576 .expect("Error serializing properly");
1577
1578 assert!(buf.len() > 10, "no bytes written");
1579 assert_eq!(buf.len(), written_bytes);
1580 assert_eq!(buf[0], 0xC3);
1581 assert_eq!(buf[1], 0x01);
1582 assert_eq!(
1583 &buf[2..10],
1584 &TestSingleObjectWriter::get_schema()
1585 .fingerprint::<Rabin>()
1586 .bytes[..]
1587 );
1588 let mut msg_binary = Vec::new();
1589 encode(
1590 &value,
1591 &TestSingleObjectWriter::get_schema(),
1592 &mut msg_binary,
1593 )
1594 .expect("encode should have failed by here as a dependency of any writing");
1595 assert_eq!(&buf[10..], &msg_binary[..]);
1596
1597 Ok(())
1598 }
1599
1600 #[test]
1601 fn test_single_object_writer_with_header_builder() -> TestResult {
1602 let mut buf: Vec<u8> = Vec::new();
1603 let obj = TestSingleObjectWriter {
1604 a: 300,
1605 b: 34.555,
1606 c: vec!["cat".into(), "dog".into()],
1607 };
1608 let schema_uuid = Uuid::parse_str("b2f1cf00-0434-013e-439a-125eb8485a5f")?;
1609 let header_builder = GlueSchemaUuidHeader::from_uuid(schema_uuid);
1610 let mut writer = GenericSingleObjectWriter::new_with_capacity_and_header_builder(
1611 &TestSingleObjectWriter::get_schema(),
1612 1024,
1613 header_builder,
1614 )
1615 .expect("Should resolve schema");
1616 let value = obj.into();
1617 writer
1618 .write_value_ref(&value, &mut buf)
1619 .expect("Error serializing properly");
1620
1621 assert_eq!(buf[0], 0x03);
1622 assert_eq!(buf[1], 0x00);
1623 assert_eq!(buf[2..18], schema_uuid.into_bytes()[..]);
1624 Ok(())
1625 }
1626
1627 #[test]
1628 fn test_writer_parity() -> TestResult {
1629 let obj1 = TestSingleObjectWriter {
1630 a: 300,
1631 b: 34.555,
1632 c: vec!["cat".into(), "dog".into()],
1633 };
1634
1635 let mut buf1: Vec<u8> = Vec::new();
1636 let mut buf2: Vec<u8> = Vec::new();
1637 let mut buf3: Vec<u8> = Vec::new();
1638 let mut buf4: Vec<u8> = Vec::new();
1639
1640 let mut generic_writer = GenericSingleObjectWriter::new_with_capacity(
1641 &TestSingleObjectWriter::get_schema(),
1642 1024,
1643 )
1644 .expect("Should resolve schema");
1645 let specific_writer = SpecificSingleObjectWriter::<TestSingleObjectWriter>::new()
1646 .expect("Resolved should pass");
1647 specific_writer
1648 .write_ref(&obj1, &mut buf1)
1649 .expect("Serialization expected");
1650 specific_writer
1651 .write_ref(&obj1, &mut buf2)
1652 .expect("Serialization expected");
1653 specific_writer
1654 .write_value(obj1.clone(), &mut buf3)
1655 .expect("Serialization expected");
1656
1657 generic_writer
1658 .write_value(obj1.into(), &mut buf4)
1659 .expect("Serialization expected");
1660
1661 assert_eq!(buf1, buf2);
1662 assert_eq!(buf2, buf3);
1663 assert_eq!(buf3, buf4);
1664
1665 Ok(())
1666 }
1667
1668 #[test]
1669 fn avro_3894_take_aliases_into_account_when_serializing() -> TestResult {
1670 const SCHEMA: &str = r#"
1671 {
1672 "type": "record",
1673 "name": "Conference",
1674 "fields": [
1675 {"type": "string", "name": "name"},
1676 {"type": ["null", "long"], "name": "date", "aliases" : [ "time2", "time" ]}
1677 ]
1678 }"#;
1679
1680 #[derive(Debug, PartialEq, Eq, Clone, Serialize)]
1681 pub struct Conference {
1682 pub name: String,
1683 pub time: Option<i64>,
1684 }
1685
1686 let conf = Conference {
1687 name: "RustConf".to_string(),
1688 time: Some(1234567890),
1689 };
1690
1691 let schema = Schema::parse_str(SCHEMA)?;
1692 let mut writer = Writer::new(&schema, Vec::new())?;
1693
1694 let bytes = writer.append_ser(conf)?;
1695
1696 assert_eq!(182, bytes);
1697
1698 Ok(())
1699 }
1700
1701 #[test]
1702 fn avro_4014_validation_returns_a_detailed_error() -> TestResult {
1703 const SCHEMA: &str = r#"
1704 {
1705 "type": "record",
1706 "name": "Conference",
1707 "fields": [
1708 {"type": "string", "name": "name"},
1709 {"type": ["null", "long"], "name": "date", "aliases" : [ "time2", "time" ]}
1710 ]
1711 }"#;
1712
1713 #[derive(Debug, PartialEq, Clone, Serialize)]
1714 pub struct Conference {
1715 pub name: String,
1716 pub time: Option<f64>, }
1718
1719 let conf = Conference {
1720 name: "RustConf".to_string(),
1721 time: Some(12345678.90),
1722 };
1723
1724 let schema = Schema::parse_str(SCHEMA)?;
1725 let mut writer = Writer::new(&schema, Vec::new())?;
1726
1727 match writer.append_ser(conf) {
1728 Ok(bytes) => panic!("Expected an error, but got {bytes} bytes written"),
1729 Err(e) => {
1730 assert_eq!(
1731 e.to_string(),
1732 r#"Failed to serialize field 'time' for record Record(RecordSchema { name: Name { name: "Conference", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "name", doc: None, aliases: None, default: None, schema: String, order: Ascending, position: 0, custom_attributes: {} }, RecordField { name: "date", doc: None, aliases: Some(["time2", "time"]), default: None, schema: Union(UnionSchema { schemas: [Null, Long], variant_index: {Null: 0, Long: 1} }), order: Ascending, position: 1, custom_attributes: {} }], lookup: {"date": 1, "name": 0, "time": 1, "time2": 1}, attributes: {} }): Failed to serialize value of type f64 using schema Union(UnionSchema { schemas: [Null, Long], variant_index: {Null: 0, Long: 1} }): 12345678.9. Cause: Cannot find a Double schema in [Null, Long]"#
1733 );
1734 }
1735 }
1736 Ok(())
1737 }
1738
1739 #[test]
1740 fn avro_4063_flush_applies_to_inner_writer() -> TestResult {
1741 const SCHEMA: &str = r#"
1742 {
1743 "type": "record",
1744 "name": "ExampleSchema",
1745 "fields": [
1746 {"name": "exampleField", "type": "string"}
1747 ]
1748 }
1749 "#;
1750
1751 #[derive(Clone, Default)]
1752 struct TestBuffer(Rc<RefCell<Vec<u8>>>);
1753
1754 impl TestBuffer {
1755 fn len(&self) -> usize {
1756 self.0.borrow().len()
1757 }
1758 }
1759
1760 impl Write for TestBuffer {
1761 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
1762 self.0.borrow_mut().write(buf)
1763 }
1764
1765 fn flush(&mut self) -> std::io::Result<()> {
1766 Ok(())
1767 }
1768 }
1769
1770 let shared_buffer = TestBuffer::default();
1771
1772 let buffered_writer = std::io::BufWriter::new(shared_buffer.clone());
1773
1774 let schema = Schema::parse_str(SCHEMA)?;
1775
1776 let mut writer = Writer::new(&schema, buffered_writer)?;
1777
1778 let mut record = Record::new(writer.schema()).unwrap();
1779 record.put("exampleField", "value");
1780
1781 writer.append_value(record)?;
1782 writer.flush()?;
1783
1784 assert_eq!(
1785 shared_buffer.len(),
1786 151,
1787 "the test buffer was not fully written to after Writer::flush was called"
1788 );
1789
1790 Ok(())
1791 }
1792
1793 #[test]
1794 fn avro_rs_439_specific_single_object_writer_ref() -> TestResult {
1795 #[derive(Serialize)]
1796 struct Recursive {
1797 field: bool,
1798 recurse: Option<Box<Recursive>>,
1799 }
1800
1801 impl AvroSchema for Recursive {
1802 fn get_schema() -> Schema {
1803 Schema::parse_str(
1804 r#"{
1805 "name": "Recursive",
1806 "type": "record",
1807 "fields": [
1808 { "name": "field", "type": "boolean" },
1809 { "name": "recurse", "type": ["null", "Recursive"] }
1810 ]
1811 }"#,
1812 )
1813 .unwrap()
1814 }
1815 }
1816
1817 let mut buffer = Vec::new();
1818 let writer = SpecificSingleObjectWriter::new()?;
1819
1820 writer.write(
1821 Recursive {
1822 field: true,
1823 recurse: Some(Box::new(Recursive {
1824 field: false,
1825 recurse: None,
1826 })),
1827 },
1828 &mut buffer,
1829 )?;
1830 assert_eq!(
1831 buffer,
1832 &[195, 1, 83, 223, 43, 26, 181, 179, 227, 224, 1, 2, 0, 0][..]
1833 );
1834
1835 Ok(())
1836 }
1837
1838 #[test]
1839 fn avro_rs_310_append_unvalidated_value() -> TestResult {
1840 let schema = Schema::String;
1841 let value = Value::Int(1);
1842
1843 let mut writer = Writer::new(&schema, Vec::new())?;
1844 writer.unvalidated_append_value_ref(&value)?;
1845 writer.unvalidated_append_value(value)?;
1846 let buffer = writer.into_inner()?;
1847
1848 assert_eq!(&buffer[buffer.len() - 18..buffer.len() - 16], &[2, 2]);
1850
1851 let mut writer = Writer::new(&schema, Vec::new())?;
1852 let value = Value::Int(1);
1853 let err = writer.append_value_ref(&value).unwrap_err();
1854 assert_eq!(
1855 err.to_string(),
1856 "Value Int(1) does not match schema String: Reason: Unsupported value-schema combination! Value: Int(1), schema: String"
1857 );
1858 let err = writer.append_value(value).unwrap_err();
1859 assert_eq!(
1860 err.to_string(),
1861 "Value Int(1) does not match schema String: Reason: Unsupported value-schema combination! Value: Int(1), schema: String"
1862 );
1863
1864 Ok(())
1865 }
1866}