1use crate::{
20 encode::{encode, encode_internal, encode_to_vec},
21 headers::{HeaderBuilder, RabinFingerprintHeader},
22 schema::{AvroSchema, Name, ResolvedOwnedSchema, ResolvedSchema, Schema},
23 ser_schema::SchemaAwareWriteSerializer,
24 types::Value,
25 AvroResult, Codec, Error,
26};
27use serde::Serialize;
28use std::{collections::HashMap, io::Write, marker::PhantomData, ops::RangeInclusive};
29
30const DEFAULT_BLOCK_SIZE: usize = 16000;
31const AVRO_OBJECT_HEADER: &[u8] = b"Obj\x01";
32
33#[derive(bon::Builder)]
35pub struct Writer<'a, W: Write> {
36 schema: &'a Schema,
37 writer: W,
38 #[builder(skip)]
39 resolved_schema: Option<ResolvedSchema<'a>>,
40 #[builder(default = Codec::Null)]
41 codec: Codec,
42 #[builder(default = DEFAULT_BLOCK_SIZE)]
43 block_size: usize,
44 #[builder(skip = Vec::with_capacity(block_size))]
45 buffer: Vec<u8>,
46 #[builder(skip)]
47 num_values: usize,
48 #[builder(default = generate_sync_marker())]
49 marker: [u8; 16],
50 #[builder(default = false)]
51 has_header: bool,
52 #[builder(default)]
53 user_metadata: HashMap<String, Value>,
54}
55
56impl<'a, W: Write> Writer<'a, W> {
57 pub fn new(schema: &'a Schema, writer: W) -> Self {
61 Writer::with_codec(schema, writer, Codec::Null)
62 }
63
64 pub fn with_codec(schema: &'a Schema, writer: W, codec: Codec) -> Self {
67 let mut w = Self::builder()
68 .schema(schema)
69 .writer(writer)
70 .codec(codec)
71 .build();
72 w.resolved_schema = ResolvedSchema::try_from(schema).ok();
73 w
74 }
75
76 pub fn with_schemata(
81 schema: &'a Schema,
82 schemata: Vec<&'a Schema>,
83 writer: W,
84 codec: Codec,
85 ) -> Self {
86 let mut w = Self::builder()
87 .schema(schema)
88 .writer(writer)
89 .codec(codec)
90 .build();
91 w.resolved_schema = ResolvedSchema::try_from(schemata).ok();
92 w
93 }
94
95 pub fn append_to(schema: &'a Schema, writer: W, marker: [u8; 16]) -> Self {
99 Writer::append_to_with_codec(schema, writer, Codec::Null, marker)
100 }
101
102 pub fn append_to_with_codec(
105 schema: &'a Schema,
106 writer: W,
107 codec: Codec,
108 marker: [u8; 16],
109 ) -> Self {
110 let mut w = Self::builder()
111 .schema(schema)
112 .writer(writer)
113 .codec(codec)
114 .marker(marker)
115 .has_header(true)
116 .build();
117 w.resolved_schema = ResolvedSchema::try_from(schema).ok();
118 w
119 }
120
121 pub fn append_to_with_codec_schemata(
124 schema: &'a Schema,
125 schemata: Vec<&'a Schema>,
126 writer: W,
127 codec: Codec,
128 marker: [u8; 16],
129 ) -> Self {
130 let mut w = Self::builder()
131 .schema(schema)
132 .writer(writer)
133 .codec(codec)
134 .marker(marker)
135 .has_header(true)
136 .build();
137 w.resolved_schema = ResolvedSchema::try_from(schemata).ok();
138 w
139 }
140
141 pub fn schema(&self) -> &'a Schema {
143 self.schema
144 }
145
146 pub fn append<T: Into<Value>>(&mut self, value: T) -> AvroResult<usize> {
155 let n = self.maybe_write_header()?;
156
157 let avro = value.into();
158 self.append_value_ref(&avro).map(|m| m + n)
159 }
160
161 pub fn append_value_ref(&mut self, value: &Value) -> AvroResult<usize> {
169 let n = self.maybe_write_header()?;
170
171 match self.resolved_schema {
173 Some(ref rs) => {
174 write_value_ref_resolved(self.schema, rs, value, &mut self.buffer)?;
175 self.num_values += 1;
176
177 if self.buffer.len() >= self.block_size {
178 return self.flush().map(|b| b + n);
179 }
180
181 Ok(n)
182 }
183 None => {
184 let rs = ResolvedSchema::try_from(self.schema)?;
185 self.resolved_schema = Some(rs);
186 self.append_value_ref(value)
187 }
188 }
189 }
190
191 pub fn append_ser<S: Serialize>(&mut self, value: S) -> AvroResult<usize> {
201 let n = self.maybe_write_header()?;
202
203 match self.resolved_schema {
204 Some(ref rs) => {
205 let mut serializer = SchemaAwareWriteSerializer::new(
206 &mut self.buffer,
207 self.schema,
208 rs.get_names(),
209 None,
210 );
211 value.serialize(&mut serializer)?;
212 self.num_values += 1;
213
214 if self.buffer.len() >= self.block_size {
215 return self.flush().map(|b| b + n);
216 }
217
218 Ok(n)
219 }
220 None => {
221 let rs = ResolvedSchema::try_from(self.schema)?;
222 self.resolved_schema = Some(rs);
223 self.append_ser(value)
224 }
225 }
226 }
227
228 pub fn extend<I, T: Into<Value>>(&mut self, values: I) -> AvroResult<usize>
236 where
237 I: IntoIterator<Item = T>,
238 {
239 let mut num_bytes = 0;
254 for value in values {
255 num_bytes += self.append(value)?;
256 }
257 num_bytes += self.flush()?;
258
259 Ok(num_bytes)
260 }
261
262 pub fn extend_ser<I, T: Serialize>(&mut self, values: I) -> AvroResult<usize>
271 where
272 I: IntoIterator<Item = T>,
273 {
274 let mut num_bytes = 0;
289 for value in values {
290 num_bytes += self.append_ser(value)?;
291 }
292 num_bytes += self.flush()?;
293
294 Ok(num_bytes)
295 }
296
297 pub fn extend_from_slice(&mut self, values: &[Value]) -> AvroResult<usize> {
305 let mut num_bytes = 0;
306 for value in values {
307 num_bytes += self.append_value_ref(value)?;
308 }
309 num_bytes += self.flush()?;
310
311 Ok(num_bytes)
312 }
313
314 pub fn flush(&mut self) -> AvroResult<usize> {
319 if self.num_values == 0 {
320 return Ok(0);
321 }
322
323 self.codec.compress(&mut self.buffer)?;
324
325 let num_values = self.num_values;
326 let stream_len = self.buffer.len();
327
328 let num_bytes = self.append_raw(&num_values.into(), &Schema::Long)?
329 + self.append_raw(&stream_len.into(), &Schema::Long)?
330 + self
331 .writer
332 .write(self.buffer.as_ref())
333 .map_err(Error::WriteBytes)?
334 + self.append_marker()?;
335
336 self.buffer.clear();
337 self.num_values = 0;
338
339 self.writer.flush().map_err(Error::FlushWriter)?;
340
341 Ok(num_bytes)
342 }
343
344 pub fn into_inner(mut self) -> AvroResult<W> {
349 self.maybe_write_header()?;
350 self.flush()?;
351 Ok(self.writer)
352 }
353
354 pub fn get_ref(&self) -> &W {
359 &self.writer
360 }
361
362 pub fn get_mut(&mut self) -> &mut W {
369 &mut self.writer
370 }
371
372 fn append_marker(&mut self) -> AvroResult<usize> {
374 self.writer.write(&self.marker).map_err(Error::WriteMarker)
377 }
378
379 fn append_raw(&mut self, value: &Value, schema: &Schema) -> AvroResult<usize> {
381 self.append_bytes(encode_to_vec(value, schema)?.as_ref())
382 }
383
384 fn append_bytes(&mut self, bytes: &[u8]) -> AvroResult<usize> {
386 self.writer.write(bytes).map_err(Error::WriteBytes)
387 }
388
389 pub fn add_user_metadata<T: AsRef<[u8]>>(&mut self, key: String, value: T) -> AvroResult<()> {
392 if !self.has_header {
393 if key.starts_with("avro.") {
394 return Err(Error::InvalidMetadataKey(key));
395 }
396 self.user_metadata
397 .insert(key, Value::Bytes(value.as_ref().to_vec()));
398 Ok(())
399 } else {
400 Err(Error::FileHeaderAlreadyWritten)
401 }
402 }
403
404 fn header(&self) -> Result<Vec<u8>, Error> {
406 let schema_bytes = serde_json::to_string(self.schema)
407 .map_err(Error::ConvertJsonToString)?
408 .into_bytes();
409
410 let mut metadata = HashMap::with_capacity(2);
411 metadata.insert("avro.schema", Value::Bytes(schema_bytes));
412 metadata.insert("avro.codec", self.codec.into());
413 match self.codec {
414 #[cfg(feature = "bzip")]
415 Codec::Bzip2(settings) => {
416 metadata.insert(
417 "avro.codec.compression_level",
418 Value::Bytes(vec![settings.compression_level]),
419 );
420 }
421 #[cfg(feature = "xz")]
422 Codec::Xz(settings) => {
423 metadata.insert(
424 "avro.codec.compression_level",
425 Value::Bytes(vec![settings.compression_level]),
426 );
427 }
428 #[cfg(feature = "zstandard")]
429 Codec::Zstandard(settings) => {
430 metadata.insert(
431 "avro.codec.compression_level",
432 Value::Bytes(vec![settings.compression_level]),
433 );
434 }
435 _ => {}
436 }
437
438 for (k, v) in &self.user_metadata {
439 metadata.insert(k.as_str(), v.clone());
440 }
441
442 let mut header = Vec::new();
443 header.extend_from_slice(AVRO_OBJECT_HEADER);
444 encode(&metadata.into(), &Schema::map(Schema::Bytes), &mut header)?;
445 header.extend_from_slice(&self.marker);
446
447 Ok(header)
448 }
449
450 fn maybe_write_header(&mut self) -> AvroResult<usize> {
451 if !self.has_header {
452 let header = self.header()?;
453 let n = self.append_bytes(header.as_ref())?;
454 self.has_header = true;
455 Ok(n)
456 } else {
457 Ok(0)
458 }
459 }
460}
461
462fn write_avro_datum<T: Into<Value>, W: Write>(
468 schema: &Schema,
469 value: T,
470 writer: &mut W,
471) -> Result<(), Error> {
472 let avro = value.into();
473 if !avro.validate(schema) {
474 return Err(Error::Validation);
475 }
476 encode(&avro, schema, writer)?;
477 Ok(())
478}
479
480fn write_avro_datum_schemata<T: Into<Value>>(
481 schema: &Schema,
482 schemata: Vec<&Schema>,
483 value: T,
484 buffer: &mut Vec<u8>,
485) -> AvroResult<usize> {
486 let avro = value.into();
487 let rs = ResolvedSchema::try_from(schemata)?;
488 let names = rs.get_names();
489 let enclosing_namespace = schema.namespace();
490 if let Some(_err) = avro.validate_internal(schema, names, &enclosing_namespace) {
491 return Err(Error::Validation);
492 }
493 encode_internal(&avro, schema, names, &enclosing_namespace, buffer)
494}
495
496pub struct GenericSingleObjectWriter {
500 buffer: Vec<u8>,
501 resolved: ResolvedOwnedSchema,
502}
503
504impl GenericSingleObjectWriter {
505 pub fn new_with_capacity(
506 schema: &Schema,
507 initial_buffer_cap: usize,
508 ) -> AvroResult<GenericSingleObjectWriter> {
509 let header_builder = RabinFingerprintHeader::from_schema(schema);
510 Self::new_with_capacity_and_header_builder(schema, initial_buffer_cap, header_builder)
511 }
512
513 pub fn new_with_capacity_and_header_builder<HB: HeaderBuilder>(
514 schema: &Schema,
515 initial_buffer_cap: usize,
516 header_builder: HB,
517 ) -> AvroResult<GenericSingleObjectWriter> {
518 let mut buffer = Vec::with_capacity(initial_buffer_cap);
519 let header = header_builder.build_header();
520 buffer.extend_from_slice(&header);
521
522 Ok(GenericSingleObjectWriter {
523 buffer,
524 resolved: ResolvedOwnedSchema::try_from(schema.clone())?,
525 })
526 }
527
528 const HEADER_LENGTH_RANGE: RangeInclusive<usize> = 10_usize..=20_usize;
529
530 pub fn write_value_ref<W: Write>(&mut self, v: &Value, writer: &mut W) -> AvroResult<usize> {
532 let original_length = self.buffer.len();
533 if !Self::HEADER_LENGTH_RANGE.contains(&original_length) {
534 Err(Error::IllegalSingleObjectWriterState)
535 } else {
536 write_value_ref_owned_resolved(&self.resolved, v, &mut self.buffer)?;
537 writer.write_all(&self.buffer).map_err(Error::WriteBytes)?;
538 let len = self.buffer.len();
539 self.buffer.truncate(original_length);
540 Ok(len)
541 }
542 }
543
544 pub fn write_value<W: Write>(&mut self, v: Value, writer: &mut W) -> AvroResult<usize> {
546 self.write_value_ref(&v, writer)
547 }
548}
549
550pub struct SpecificSingleObjectWriter<T>
552where
553 T: AvroSchema,
554{
555 inner: GenericSingleObjectWriter,
556 schema: Schema,
557 header_written: bool,
558 _model: PhantomData<T>,
559}
560
561impl<T> SpecificSingleObjectWriter<T>
562where
563 T: AvroSchema,
564{
565 pub fn with_capacity(buffer_cap: usize) -> AvroResult<SpecificSingleObjectWriter<T>> {
566 let schema = T::get_schema();
567 Ok(SpecificSingleObjectWriter {
568 inner: GenericSingleObjectWriter::new_with_capacity(&schema, buffer_cap)?,
569 schema,
570 header_written: false,
571 _model: PhantomData,
572 })
573 }
574}
575
576impl<T> SpecificSingleObjectWriter<T>
577where
578 T: AvroSchema + Into<Value>,
579{
580 pub fn write_value<W: Write>(&mut self, data: T, writer: &mut W) -> AvroResult<usize> {
583 let v: Value = data.into();
584 self.inner.write_value_ref(&v, writer)
585 }
586}
587
588impl<T> SpecificSingleObjectWriter<T>
589where
590 T: AvroSchema + Serialize,
591{
592 pub fn write_ref<W: Write>(&mut self, data: &T, writer: &mut W) -> AvroResult<usize> {
595 let mut bytes_written: usize = 0;
596
597 if !self.header_written {
598 bytes_written += writer
599 .write(self.inner.buffer.as_slice())
600 .map_err(Error::WriteBytes)?;
601 self.header_written = true;
602 }
603
604 bytes_written += write_avro_datum_ref(&self.schema, data, writer)?;
605
606 Ok(bytes_written)
607 }
608
609 pub fn write<W: Write>(&mut self, data: T, writer: &mut W) -> AvroResult<usize> {
612 self.write_ref(&data, writer)
613 }
614}
615
616fn write_value_ref_resolved(
617 schema: &Schema,
618 resolved_schema: &ResolvedSchema,
619 value: &Value,
620 buffer: &mut Vec<u8>,
621) -> AvroResult<usize> {
622 match value.validate_internal(schema, resolved_schema.get_names(), &schema.namespace()) {
623 Some(reason) => Err(Error::ValidationWithReason {
624 value: value.clone(),
625 schema: Box::new(schema.clone()),
626 reason,
627 }),
628 None => encode_internal(
629 value,
630 schema,
631 resolved_schema.get_names(),
632 &schema.namespace(),
633 buffer,
634 ),
635 }
636}
637
638fn write_value_ref_owned_resolved(
639 resolved_schema: &ResolvedOwnedSchema,
640 value: &Value,
641 buffer: &mut Vec<u8>,
642) -> AvroResult<()> {
643 let root_schema = resolved_schema.get_root_schema();
644 if let Some(reason) = value.validate_internal(
645 root_schema,
646 resolved_schema.get_names(),
647 &root_schema.namespace(),
648 ) {
649 return Err(Error::ValidationWithReason {
650 value: value.clone(),
651 schema: Box::new(root_schema.clone()),
652 reason,
653 });
654 }
655 encode_internal(
656 value,
657 root_schema,
658 resolved_schema.get_names(),
659 &root_schema.namespace(),
660 buffer,
661 )?;
662 Ok(())
663}
664
665pub fn to_avro_datum<T: Into<Value>>(schema: &Schema, value: T) -> AvroResult<Vec<u8>> {
672 let mut buffer = Vec::new();
673 write_avro_datum(schema, value, &mut buffer)?;
674 Ok(buffer)
675}
676
677pub fn write_avro_datum_ref<T: Serialize, W: Write>(
684 schema: &Schema,
685 data: &T,
686 writer: &mut W,
687) -> AvroResult<usize> {
688 let names: HashMap<Name, &Schema> = HashMap::new();
689 let mut serializer = SchemaAwareWriteSerializer::new(writer, schema, &names, None);
690 let bytes_written = data.serialize(&mut serializer)?;
691 Ok(bytes_written)
692}
693
694pub fn to_avro_datum_schemata<T: Into<Value>>(
699 schema: &Schema,
700 schemata: Vec<&Schema>,
701 value: T,
702) -> AvroResult<Vec<u8>> {
703 let mut buffer = Vec::new();
704 write_avro_datum_schemata(schema, schemata, value, &mut buffer)?;
705 Ok(buffer)
706}
707
708#[cfg(not(target_arch = "wasm32"))]
709fn generate_sync_marker() -> [u8; 16] {
710 let mut marker = [0_u8; 16];
711 std::iter::repeat_with(rand::random)
712 .take(16)
713 .enumerate()
714 .for_each(|(i, n)| marker[i] = n);
715 marker
716}
717
718#[cfg(target_arch = "wasm32")]
719fn generate_sync_marker() -> [u8; 16] {
720 let mut marker = [0_u8; 16];
721 std::iter::repeat_with(quad_rand::rand)
722 .take(4)
723 .flat_map(|i| i.to_be_bytes())
724 .enumerate()
725 .for_each(|(i, n)| marker[i] = n);
726 marker
727}
728
729#[cfg(test)]
730mod tests {
731 use std::{cell::RefCell, rc::Rc};
732
733 use super::*;
734 use crate::{
735 decimal::Decimal,
736 duration::{Days, Duration, Millis, Months},
737 headers::GlueSchemaUuidHeader,
738 rabin::Rabin,
739 schema::{DecimalSchema, FixedSchema, Name},
740 types::Record,
741 util::zig_i64,
742 Reader,
743 };
744 use pretty_assertions::assert_eq;
745 use serde::{Deserialize, Serialize};
746 use uuid::Uuid;
747
748 use crate::codec::DeflateSettings;
749 use apache_avro_test_helper::TestResult;
750
751 const AVRO_OBJECT_HEADER_LEN: usize = AVRO_OBJECT_HEADER.len();
752
753 const SCHEMA: &str = r#"
754 {
755 "type": "record",
756 "name": "test",
757 "fields": [
758 {
759 "name": "a",
760 "type": "long",
761 "default": 42
762 },
763 {
764 "name": "b",
765 "type": "string"
766 }
767 ]
768 }
769 "#;
770
771 const UNION_SCHEMA: &str = r#"["null", "long"]"#;
772
773 #[test]
774 fn test_to_avro_datum() -> TestResult {
775 let schema = Schema::parse_str(SCHEMA)?;
776 let mut record = Record::new(&schema).unwrap();
777 record.put("a", 27i64);
778 record.put("b", "foo");
779
780 let mut expected = Vec::new();
781 zig_i64(27, &mut expected)?;
782 zig_i64(3, &mut expected)?;
783 expected.extend([b'f', b'o', b'o']);
784
785 assert_eq!(to_avro_datum(&schema, record)?, expected);
786
787 Ok(())
788 }
789
790 #[test]
791 fn avro_rs_193_write_avro_datum_ref() -> TestResult {
792 #[derive(Serialize)]
793 struct TestStruct {
794 a: i64,
795 b: String,
796 }
797
798 let schema = Schema::parse_str(SCHEMA)?;
799 let mut writer: Vec<u8> = Vec::new();
800 let data = TestStruct {
801 a: 27,
802 b: "foo".to_string(),
803 };
804
805 let mut expected = Vec::new();
806 zig_i64(27, &mut expected)?;
807 zig_i64(3, &mut expected)?;
808 expected.extend([b'f', b'o', b'o']);
809
810 let bytes = write_avro_datum_ref(&schema, &data, &mut writer)?;
811
812 assert_eq!(bytes, expected.len());
813 assert_eq!(writer, expected);
814
815 Ok(())
816 }
817
818 #[test]
819 fn test_union_not_null() -> TestResult {
820 let schema = Schema::parse_str(UNION_SCHEMA)?;
821 let union = Value::Union(1, Box::new(Value::Long(3)));
822
823 let mut expected = Vec::new();
824 zig_i64(1, &mut expected)?;
825 zig_i64(3, &mut expected)?;
826
827 assert_eq!(to_avro_datum(&schema, union)?, expected);
828
829 Ok(())
830 }
831
832 #[test]
833 fn test_union_null() -> TestResult {
834 let schema = Schema::parse_str(UNION_SCHEMA)?;
835 let union = Value::Union(0, Box::new(Value::Null));
836
837 let mut expected = Vec::new();
838 zig_i64(0, &mut expected)?;
839
840 assert_eq!(to_avro_datum(&schema, union)?, expected);
841
842 Ok(())
843 }
844
845 fn logical_type_test<T: Into<Value> + Clone>(
846 schema_str: &'static str,
847
848 expected_schema: &Schema,
849 value: Value,
850
851 raw_schema: &Schema,
852 raw_value: T,
853 ) -> TestResult {
854 let schema = Schema::parse_str(schema_str)?;
855 assert_eq!(&schema, expected_schema);
856 let ser = to_avro_datum(&schema, value.clone())?;
858 let raw_ser = to_avro_datum(raw_schema, raw_value)?;
859 assert_eq!(ser, raw_ser);
860
861 let mut r = ser.as_slice();
863 let de = crate::from_avro_datum(&schema, &mut r, None)?;
864 assert_eq!(de, value);
865 Ok(())
866 }
867
868 #[test]
869 fn date() -> TestResult {
870 logical_type_test(
871 r#"{"type": "int", "logicalType": "date"}"#,
872 &Schema::Date,
873 Value::Date(1_i32),
874 &Schema::Int,
875 1_i32,
876 )
877 }
878
879 #[test]
880 fn time_millis() -> TestResult {
881 logical_type_test(
882 r#"{"type": "int", "logicalType": "time-millis"}"#,
883 &Schema::TimeMillis,
884 Value::TimeMillis(1_i32),
885 &Schema::Int,
886 1_i32,
887 )
888 }
889
890 #[test]
891 fn time_micros() -> TestResult {
892 logical_type_test(
893 r#"{"type": "long", "logicalType": "time-micros"}"#,
894 &Schema::TimeMicros,
895 Value::TimeMicros(1_i64),
896 &Schema::Long,
897 1_i64,
898 )
899 }
900
901 #[test]
902 fn timestamp_millis() -> TestResult {
903 logical_type_test(
904 r#"{"type": "long", "logicalType": "timestamp-millis"}"#,
905 &Schema::TimestampMillis,
906 Value::TimestampMillis(1_i64),
907 &Schema::Long,
908 1_i64,
909 )
910 }
911
912 #[test]
913 fn timestamp_micros() -> TestResult {
914 logical_type_test(
915 r#"{"type": "long", "logicalType": "timestamp-micros"}"#,
916 &Schema::TimestampMicros,
917 Value::TimestampMicros(1_i64),
918 &Schema::Long,
919 1_i64,
920 )
921 }
922
923 #[test]
924 fn decimal_fixed() -> TestResult {
925 let size = 30;
926 let inner = Schema::Fixed(FixedSchema {
927 name: Name::new("decimal")?,
928 aliases: None,
929 doc: None,
930 size,
931 default: None,
932 attributes: Default::default(),
933 });
934 let value = vec![0u8; size];
935 logical_type_test(
936 r#"{"type": {"type": "fixed", "size": 30, "name": "decimal"}, "logicalType": "decimal", "precision": 20, "scale": 5}"#,
937 &Schema::Decimal(DecimalSchema {
938 precision: 20,
939 scale: 5,
940 inner: Box::new(inner.clone()),
941 }),
942 Value::Decimal(Decimal::from(value.clone())),
943 &inner,
944 Value::Fixed(size, value),
945 )
946 }
947
948 #[test]
949 fn decimal_bytes() -> TestResult {
950 let inner = Schema::Bytes;
951 let value = vec![0u8; 10];
952 logical_type_test(
953 r#"{"type": "bytes", "logicalType": "decimal", "precision": 4, "scale": 3}"#,
954 &Schema::Decimal(DecimalSchema {
955 precision: 4,
956 scale: 3,
957 inner: Box::new(inner.clone()),
958 }),
959 Value::Decimal(Decimal::from(value.clone())),
960 &inner,
961 value,
962 )
963 }
964
965 #[test]
966 fn duration() -> TestResult {
967 let inner = Schema::Fixed(FixedSchema {
968 name: Name::new("duration")?,
969 aliases: None,
970 doc: None,
971 size: 12,
972 default: None,
973 attributes: Default::default(),
974 });
975 let value = Value::Duration(Duration::new(
976 Months::new(256),
977 Days::new(512),
978 Millis::new(1024),
979 ));
980 logical_type_test(
981 r#"{"type": {"type": "fixed", "name": "duration", "size": 12}, "logicalType": "duration"}"#,
982 &Schema::Duration,
983 value,
984 &inner,
985 Value::Fixed(12, vec![0, 1, 0, 0, 0, 2, 0, 0, 0, 4, 0, 0]),
986 )
987 }
988
989 #[test]
990 fn test_writer_append() -> TestResult {
991 let schema = Schema::parse_str(SCHEMA)?;
992 let mut writer = Writer::new(&schema, Vec::new());
993
994 let mut record = Record::new(&schema).unwrap();
995 record.put("a", 27i64);
996 record.put("b", "foo");
997
998 let n1 = writer.append(record.clone())?;
999 let n2 = writer.append(record.clone())?;
1000 let n3 = writer.flush()?;
1001 let result = writer.into_inner()?;
1002
1003 assert_eq!(n1 + n2 + n3, result.len());
1004
1005 let mut data = Vec::new();
1006 zig_i64(27, &mut data)?;
1007 zig_i64(3, &mut data)?;
1008 data.extend(b"foo");
1009 data.extend(data.clone());
1010
1011 assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1013 let last_data_byte = result.len() - 16;
1015 assert_eq!(
1016 &result[last_data_byte - data.len()..last_data_byte],
1017 data.as_slice()
1018 );
1019
1020 Ok(())
1021 }
1022
1023 #[test]
1024 fn test_writer_extend() -> TestResult {
1025 let schema = Schema::parse_str(SCHEMA)?;
1026 let mut writer = Writer::new(&schema, Vec::new());
1027
1028 let mut record = Record::new(&schema).unwrap();
1029 record.put("a", 27i64);
1030 record.put("b", "foo");
1031 let record_copy = record.clone();
1032 let records = vec![record, record_copy];
1033
1034 let n1 = writer.extend(records)?;
1035 let n2 = writer.flush()?;
1036 let result = writer.into_inner()?;
1037
1038 assert_eq!(n1 + n2, result.len());
1039
1040 let mut data = Vec::new();
1041 zig_i64(27, &mut data)?;
1042 zig_i64(3, &mut data)?;
1043 data.extend(b"foo");
1044 data.extend(data.clone());
1045
1046 assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1048 let last_data_byte = result.len() - 16;
1050 assert_eq!(
1051 &result[last_data_byte - data.len()..last_data_byte],
1052 data.as_slice()
1053 );
1054
1055 Ok(())
1056 }
1057
1058 #[derive(Debug, Clone, Deserialize, Serialize)]
1059 struct TestSerdeSerialize {
1060 a: i64,
1061 b: String,
1062 }
1063
1064 #[test]
1065 fn test_writer_append_ser() -> TestResult {
1066 let schema = Schema::parse_str(SCHEMA)?;
1067 let mut writer = Writer::new(&schema, Vec::new());
1068
1069 let record = TestSerdeSerialize {
1070 a: 27,
1071 b: "foo".to_owned(),
1072 };
1073
1074 let n1 = writer.append_ser(record)?;
1075 let n2 = writer.flush()?;
1076 let result = writer.into_inner()?;
1077
1078 assert_eq!(n1 + n2, result.len());
1079
1080 let mut data = Vec::new();
1081 zig_i64(27, &mut data)?;
1082 zig_i64(3, &mut data)?;
1083 data.extend(b"foo");
1084
1085 assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1087 let last_data_byte = result.len() - 16;
1089 assert_eq!(
1090 &result[last_data_byte - data.len()..last_data_byte],
1091 data.as_slice()
1092 );
1093
1094 Ok(())
1095 }
1096
1097 #[test]
1098 fn test_writer_extend_ser() -> TestResult {
1099 let schema = Schema::parse_str(SCHEMA)?;
1100 let mut writer = Writer::new(&schema, Vec::new());
1101
1102 let record = TestSerdeSerialize {
1103 a: 27,
1104 b: "foo".to_owned(),
1105 };
1106 let record_copy = record.clone();
1107 let records = vec![record, record_copy];
1108
1109 let n1 = writer.extend_ser(records)?;
1110 let n2 = writer.flush()?;
1111 let result = writer.into_inner()?;
1112
1113 assert_eq!(n1 + n2, result.len());
1114
1115 let mut data = Vec::new();
1116 zig_i64(27, &mut data)?;
1117 zig_i64(3, &mut data)?;
1118 data.extend(b"foo");
1119 data.extend(data.clone());
1120
1121 assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1123 let last_data_byte = result.len() - 16;
1125 assert_eq!(
1126 &result[last_data_byte - data.len()..last_data_byte],
1127 data.as_slice()
1128 );
1129
1130 Ok(())
1131 }
1132
1133 fn make_writer_with_codec(schema: &Schema) -> Writer<'_, Vec<u8>> {
1134 Writer::with_codec(
1135 schema,
1136 Vec::new(),
1137 Codec::Deflate(DeflateSettings::default()),
1138 )
1139 }
1140
1141 fn make_writer_with_builder(schema: &Schema) -> Writer<'_, Vec<u8>> {
1142 Writer::builder()
1143 .writer(Vec::new())
1144 .schema(schema)
1145 .codec(Codec::Deflate(DeflateSettings::default()))
1146 .block_size(100)
1147 .build()
1148 }
1149
1150 fn check_writer(mut writer: Writer<'_, Vec<u8>>, schema: &Schema) -> TestResult {
1151 let mut record = Record::new(schema).unwrap();
1152 record.put("a", 27i64);
1153 record.put("b", "foo");
1154
1155 let n1 = writer.append(record.clone())?;
1156 let n2 = writer.append(record.clone())?;
1157 let n3 = writer.flush()?;
1158 let result = writer.into_inner()?;
1159
1160 assert_eq!(n1 + n2 + n3, result.len());
1161
1162 let mut data = Vec::new();
1163 zig_i64(27, &mut data)?;
1164 zig_i64(3, &mut data)?;
1165 data.extend(b"foo");
1166 data.extend(data.clone());
1167 Codec::Deflate(DeflateSettings::default()).compress(&mut data)?;
1168
1169 assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1171 let last_data_byte = result.len() - 16;
1173 assert_eq!(
1174 &result[last_data_byte - data.len()..last_data_byte],
1175 data.as_slice()
1176 );
1177
1178 Ok(())
1179 }
1180
1181 #[test]
1182 fn test_writer_with_codec() -> TestResult {
1183 let schema = Schema::parse_str(SCHEMA)?;
1184 let writer = make_writer_with_codec(&schema);
1185 check_writer(writer, &schema)
1186 }
1187
1188 #[test]
1189 fn test_writer_with_builder() -> TestResult {
1190 let schema = Schema::parse_str(SCHEMA)?;
1191 let writer = make_writer_with_builder(&schema);
1192 check_writer(writer, &schema)
1193 }
1194
1195 #[test]
1196 fn test_logical_writer() -> TestResult {
1197 const LOGICAL_TYPE_SCHEMA: &str = r#"
1198 {
1199 "type": "record",
1200 "name": "logical_type_test",
1201 "fields": [
1202 {
1203 "name": "a",
1204 "type": [
1205 "null",
1206 {
1207 "type": "long",
1208 "logicalType": "timestamp-micros"
1209 }
1210 ]
1211 }
1212 ]
1213 }
1214 "#;
1215 let codec = Codec::Deflate(DeflateSettings::default());
1216 let schema = Schema::parse_str(LOGICAL_TYPE_SCHEMA)?;
1217 let mut writer = Writer::builder()
1218 .schema(&schema)
1219 .codec(codec)
1220 .writer(Vec::new())
1221 .build();
1222
1223 let mut record1 = Record::new(&schema).unwrap();
1224 record1.put(
1225 "a",
1226 Value::Union(1, Box::new(Value::TimestampMicros(1234_i64))),
1227 );
1228
1229 let mut record2 = Record::new(&schema).unwrap();
1230 record2.put("a", Value::Union(0, Box::new(Value::Null)));
1231
1232 let n1 = writer.append(record1)?;
1233 let n2 = writer.append(record2)?;
1234 let n3 = writer.flush()?;
1235 let result = writer.into_inner()?;
1236
1237 assert_eq!(n1 + n2 + n3, result.len());
1238
1239 let mut data = Vec::new();
1240 zig_i64(1, &mut data)?;
1242 zig_i64(1234, &mut data)?;
1243
1244 zig_i64(0, &mut data)?;
1246 codec.compress(&mut data)?;
1247
1248 assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1250 let last_data_byte = result.len() - 16;
1252 assert_eq!(
1253 &result[last_data_byte - data.len()..last_data_byte],
1254 data.as_slice()
1255 );
1256
1257 Ok(())
1258 }
1259
1260 #[test]
1261 fn test_avro_3405_writer_add_metadata_success() -> TestResult {
1262 let schema = Schema::parse_str(SCHEMA)?;
1263 let mut writer = Writer::new(&schema, Vec::new());
1264
1265 writer.add_user_metadata("stringKey".to_string(), String::from("stringValue"))?;
1266 writer.add_user_metadata("strKey".to_string(), "strValue")?;
1267 writer.add_user_metadata("bytesKey".to_string(), b"bytesValue")?;
1268 writer.add_user_metadata("vecKey".to_string(), vec![1, 2, 3])?;
1269
1270 let mut record = Record::new(&schema).unwrap();
1271 record.put("a", 27i64);
1272 record.put("b", "foo");
1273
1274 writer.append(record.clone())?;
1275 writer.append(record.clone())?;
1276 writer.flush()?;
1277 let result = writer.into_inner()?;
1278
1279 assert_eq!(result.len(), 260);
1280
1281 Ok(())
1282 }
1283
1284 #[test]
1285 fn test_avro_3881_metadata_empty_body() -> TestResult {
1286 let schema = Schema::parse_str(SCHEMA)?;
1287 let mut writer = Writer::new(&schema, Vec::new());
1288 writer.add_user_metadata("a".to_string(), "b")?;
1289 let result = writer.into_inner()?;
1290
1291 let reader = Reader::with_schema(&schema, &result[..])?;
1292 let mut expected = HashMap::new();
1293 expected.insert("a".to_string(), vec![b'b']);
1294 assert_eq!(reader.user_metadata(), &expected);
1295 assert_eq!(reader.into_iter().count(), 0);
1296
1297 Ok(())
1298 }
1299
1300 #[test]
1301 fn test_avro_3405_writer_add_metadata_failure() -> TestResult {
1302 let schema = Schema::parse_str(SCHEMA)?;
1303 let mut writer = Writer::new(&schema, Vec::new());
1304
1305 let mut record = Record::new(&schema).unwrap();
1306 record.put("a", 27i64);
1307 record.put("b", "foo");
1308 writer.append(record.clone())?;
1309
1310 match writer.add_user_metadata("stringKey".to_string(), String::from("value2")) {
1311 Err(e @ Error::FileHeaderAlreadyWritten) => {
1312 assert_eq!(e.to_string(), "The file metadata is already flushed.")
1313 }
1314 Err(e) => panic!("Unexpected error occurred while writing user metadata: {e:?}"),
1315 Ok(_) => panic!("Expected an error that metadata cannot be added after adding data"),
1316 }
1317
1318 Ok(())
1319 }
1320
1321 #[test]
1322 fn test_avro_3405_writer_add_metadata_reserved_prefix_failure() -> TestResult {
1323 let schema = Schema::parse_str(SCHEMA)?;
1324 let mut writer = Writer::new(&schema, Vec::new());
1325
1326 let key = "avro.stringKey".to_string();
1327 match writer.add_user_metadata(key.clone(), "value") {
1328 Err(ref e @ Error::InvalidMetadataKey(_)) => {
1329 assert_eq!(e.to_string(), format!("Metadata keys starting with 'avro.' are reserved for internal usage: {key}."))
1330 }
1331 Err(e) => panic!(
1332 "Unexpected error occurred while writing user metadata with reserved prefix ('avro.'): {e:?}"
1333 ),
1334 Ok(_) => panic!("Expected an error that the metadata key cannot be prefixed with 'avro.'"),
1335 }
1336
1337 Ok(())
1338 }
1339
1340 #[test]
1341 fn test_avro_3405_writer_add_metadata_with_builder_api_success() -> TestResult {
1342 let schema = Schema::parse_str(SCHEMA)?;
1343
1344 let mut user_meta_data: HashMap<String, Value> = HashMap::new();
1345 user_meta_data.insert(
1346 "stringKey".to_string(),
1347 Value::String("stringValue".to_string()),
1348 );
1349 user_meta_data.insert("bytesKey".to_string(), Value::Bytes(b"bytesValue".to_vec()));
1350 user_meta_data.insert("vecKey".to_string(), Value::Bytes(vec![1, 2, 3]));
1351
1352 let writer: Writer<'_, Vec<u8>> = Writer::builder()
1353 .writer(Vec::new())
1354 .schema(&schema)
1355 .user_metadata(user_meta_data.clone())
1356 .build();
1357
1358 assert_eq!(writer.user_metadata, user_meta_data);
1359
1360 Ok(())
1361 }
1362
1363 #[derive(Serialize, Clone)]
1364 struct TestSingleObjectWriter {
1365 a: i64,
1366 b: f64,
1367 c: Vec<String>,
1368 }
1369
1370 impl AvroSchema for TestSingleObjectWriter {
1371 fn get_schema() -> Schema {
1372 let schema = r#"
1373 {
1374 "type":"record",
1375 "name":"TestSingleObjectWrtierSerialize",
1376 "fields":[
1377 {
1378 "name":"a",
1379 "type":"long"
1380 },
1381 {
1382 "name":"b",
1383 "type":"double"
1384 },
1385 {
1386 "name":"c",
1387 "type":{
1388 "type":"array",
1389 "items":"string"
1390 }
1391 }
1392 ]
1393 }
1394 "#;
1395 Schema::parse_str(schema).unwrap()
1396 }
1397 }
1398
1399 impl From<TestSingleObjectWriter> for Value {
1400 fn from(obj: TestSingleObjectWriter) -> Value {
1401 Value::Record(vec![
1402 ("a".into(), obj.a.into()),
1403 ("b".into(), obj.b.into()),
1404 (
1405 "c".into(),
1406 Value::Array(obj.c.into_iter().map(|s| s.into()).collect()),
1407 ),
1408 ])
1409 }
1410 }
1411
1412 #[test]
1413 fn test_single_object_writer() -> TestResult {
1414 let mut buf: Vec<u8> = Vec::new();
1415 let obj = TestSingleObjectWriter {
1416 a: 300,
1417 b: 34.555,
1418 c: vec!["cat".into(), "dog".into()],
1419 };
1420 let mut writer = GenericSingleObjectWriter::new_with_capacity(
1421 &TestSingleObjectWriter::get_schema(),
1422 1024,
1423 )
1424 .expect("Should resolve schema");
1425 let value = obj.into();
1426 let written_bytes = writer
1427 .write_value_ref(&value, &mut buf)
1428 .expect("Error serializing properly");
1429
1430 assert!(buf.len() > 10, "no bytes written");
1431 assert_eq!(buf.len(), written_bytes);
1432 assert_eq!(buf[0], 0xC3);
1433 assert_eq!(buf[1], 0x01);
1434 assert_eq!(
1435 &buf[2..10],
1436 &TestSingleObjectWriter::get_schema()
1437 .fingerprint::<Rabin>()
1438 .bytes[..]
1439 );
1440 let mut msg_binary = Vec::new();
1441 encode(
1442 &value,
1443 &TestSingleObjectWriter::get_schema(),
1444 &mut msg_binary,
1445 )
1446 .expect("encode should have failed by here as a dependency of any writing");
1447 assert_eq!(&buf[10..], &msg_binary[..]);
1448
1449 Ok(())
1450 }
1451
1452 #[test]
1453 fn test_single_object_writer_with_header_builder() -> TestResult {
1454 let mut buf: Vec<u8> = Vec::new();
1455 let obj = TestSingleObjectWriter {
1456 a: 300,
1457 b: 34.555,
1458 c: vec!["cat".into(), "dog".into()],
1459 };
1460 let schema_uuid = Uuid::parse_str("b2f1cf00-0434-013e-439a-125eb8485a5f")?;
1461 let header_builder = GlueSchemaUuidHeader::from_uuid(schema_uuid);
1462 let mut writer = GenericSingleObjectWriter::new_with_capacity_and_header_builder(
1463 &TestSingleObjectWriter::get_schema(),
1464 1024,
1465 header_builder,
1466 )
1467 .expect("Should resolve schema");
1468 let value = obj.into();
1469 writer
1470 .write_value_ref(&value, &mut buf)
1471 .expect("Error serializing properly");
1472
1473 assert_eq!(buf[0], 0x03);
1474 assert_eq!(buf[1], 0x00);
1475 assert_eq!(buf[2..18], schema_uuid.into_bytes()[..]);
1476 Ok(())
1477 }
1478
1479 #[test]
1480 fn test_writer_parity() -> TestResult {
1481 let obj1 = TestSingleObjectWriter {
1482 a: 300,
1483 b: 34.555,
1484 c: vec!["cat".into(), "dog".into()],
1485 };
1486
1487 let mut buf1: Vec<u8> = Vec::new();
1488 let mut buf2: Vec<u8> = Vec::new();
1489 let mut buf3: Vec<u8> = Vec::new();
1490
1491 let mut generic_writer = GenericSingleObjectWriter::new_with_capacity(
1492 &TestSingleObjectWriter::get_schema(),
1493 1024,
1494 )
1495 .expect("Should resolve schema");
1496 let mut specific_writer =
1497 SpecificSingleObjectWriter::<TestSingleObjectWriter>::with_capacity(1024)
1498 .expect("Resolved should pass");
1499 specific_writer
1500 .write(obj1.clone(), &mut buf1)
1501 .expect("Serialization expected");
1502 specific_writer
1503 .write_value(obj1.clone(), &mut buf2)
1504 .expect("Serialization expected");
1505 generic_writer
1506 .write_value(obj1.into(), &mut buf3)
1507 .expect("Serialization expected");
1508 assert_eq!(buf1, buf2);
1509 assert_eq!(buf1, buf3);
1510
1511 Ok(())
1512 }
1513
1514 #[test]
1515 fn avro_3894_take_aliases_into_account_when_serializing() -> TestResult {
1516 const SCHEMA: &str = r#"
1517 {
1518 "type": "record",
1519 "name": "Conference",
1520 "fields": [
1521 {"type": "string", "name": "name"},
1522 {"type": ["null", "long"], "name": "date", "aliases" : [ "time2", "time" ]}
1523 ]
1524 }"#;
1525
1526 #[derive(Debug, PartialEq, Eq, Clone, Serialize)]
1527 pub struct Conference {
1528 pub name: String,
1529 pub time: Option<i64>,
1530 }
1531
1532 let conf = Conference {
1533 name: "RustConf".to_string(),
1534 time: Some(1234567890),
1535 };
1536
1537 let schema = Schema::parse_str(SCHEMA)?;
1538 let mut writer = Writer::new(&schema, Vec::new());
1539
1540 let bytes = writer.append_ser(conf)?;
1541
1542 assert_eq!(198, bytes);
1543
1544 Ok(())
1545 }
1546
1547 #[test]
1548 fn avro_4014_validation_returns_a_detailed_error() -> TestResult {
1549 const SCHEMA: &str = r#"
1550 {
1551 "type": "record",
1552 "name": "Conference",
1553 "fields": [
1554 {"type": "string", "name": "name"},
1555 {"type": ["null", "long"], "name": "date", "aliases" : [ "time2", "time" ]}
1556 ]
1557 }"#;
1558
1559 #[derive(Debug, PartialEq, Clone, Serialize)]
1560 pub struct Conference {
1561 pub name: String,
1562 pub time: Option<f64>, }
1564
1565 let conf = Conference {
1566 name: "RustConf".to_string(),
1567 time: Some(12345678.90),
1568 };
1569
1570 let schema = Schema::parse_str(SCHEMA)?;
1571 let mut writer = Writer::new(&schema, Vec::new());
1572
1573 match writer.append_ser(conf) {
1574 Ok(bytes) => panic!("Expected an error, but got {bytes} bytes written"),
1575 Err(e) => {
1576 assert_eq!(
1577 e.to_string(),
1578 r#"Failed to serialize field 'time' for record Record(RecordSchema { name: Name { name: "Conference", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "name", doc: None, aliases: None, default: None, schema: String, order: Ascending, position: 0, custom_attributes: {} }, RecordField { name: "date", doc: None, aliases: Some(["time2", "time"]), default: None, schema: Union(UnionSchema { schemas: [Null, Long], variant_index: {Null: 0, Long: 1} }), order: Ascending, position: 1, custom_attributes: {} }], lookup: {"date": 1, "name": 0, "time": 1, "time2": 1}, attributes: {} }): Failed to serialize value of type f64 using schema Long: 12345678.9. Cause: Expected: Long. Got: Double"#
1579 );
1580 }
1581 }
1582 Ok(())
1583 }
1584
1585 #[test]
1586 fn avro_4063_flush_applies_to_inner_writer() -> TestResult {
1587 const SCHEMA: &str = r#"
1588 {
1589 "type": "record",
1590 "name": "ExampleSchema",
1591 "fields": [
1592 {"name": "exampleField", "type": "string"}
1593 ]
1594 }
1595 "#;
1596
1597 #[derive(Clone, Default)]
1598 struct TestBuffer(Rc<RefCell<Vec<u8>>>);
1599
1600 impl TestBuffer {
1601 fn len(&self) -> usize {
1602 self.0.borrow().len()
1603 }
1604 }
1605
1606 impl Write for TestBuffer {
1607 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
1608 self.0.borrow_mut().write(buf)
1609 }
1610
1611 fn flush(&mut self) -> std::io::Result<()> {
1612 Ok(())
1613 }
1614 }
1615
1616 let shared_buffer = TestBuffer::default();
1617
1618 let buffered_writer = std::io::BufWriter::new(shared_buffer.clone());
1619
1620 let schema = Schema::parse_str(SCHEMA)?;
1621
1622 let mut writer = Writer::new(&schema, buffered_writer);
1623
1624 let mut record = Record::new(writer.schema()).unwrap();
1625 record.put("exampleField", "value");
1626
1627 writer.append(record)?;
1628 writer.flush()?;
1629
1630 assert_eq!(
1631 shared_buffer.len(),
1632 167,
1633 "the test buffer was not fully written to after Writer::flush was called"
1634 );
1635
1636 Ok(())
1637 }
1638}