1use crate::{
20 encode::{encode, encode_internal, encode_to_vec},
21 headers::{HeaderBuilder, RabinFingerprintHeader},
22 schema::{AvroSchema, Name, ResolvedOwnedSchema, ResolvedSchema, Schema},
23 ser_schema::SchemaAwareWriteSerializer,
24 types::Value,
25 AvroResult, Codec, Error,
26};
27use serde::Serialize;
28use std::{
29 collections::HashMap, io::Write, marker::PhantomData, mem::ManuallyDrop, ops::RangeInclusive,
30};
31
32const DEFAULT_BLOCK_SIZE: usize = 16000;
33const AVRO_OBJECT_HEADER: &[u8] = b"Obj\x01";
34
35#[derive(bon::Builder)]
41pub struct Writer<'a, W: Write> {
42 schema: &'a Schema,
43 writer: W,
44 #[builder(skip)]
45 resolved_schema: Option<ResolvedSchema<'a>>,
46 #[builder(default = Codec::Null)]
47 codec: Codec,
48 #[builder(default = DEFAULT_BLOCK_SIZE)]
49 block_size: usize,
50 #[builder(skip = Vec::with_capacity(block_size))]
51 buffer: Vec<u8>,
52 #[builder(skip)]
53 num_values: usize,
54 #[builder(default = generate_sync_marker())]
55 marker: [u8; 16],
56 #[builder(default = false)]
57 has_header: bool,
58 #[builder(default)]
59 user_metadata: HashMap<String, Value>,
60}
61
62impl<'a, W: Write> Writer<'a, W> {
63 pub fn new(schema: &'a Schema, writer: W) -> Self {
67 Writer::with_codec(schema, writer, Codec::Null)
68 }
69
70 pub fn with_codec(schema: &'a Schema, writer: W, codec: Codec) -> Self {
73 let mut w = Self::builder()
74 .schema(schema)
75 .writer(writer)
76 .codec(codec)
77 .build();
78 w.resolved_schema = ResolvedSchema::try_from(schema).ok();
79 w
80 }
81
82 pub fn with_schemata(
87 schema: &'a Schema,
88 schemata: Vec<&'a Schema>,
89 writer: W,
90 codec: Codec,
91 ) -> Self {
92 let mut w = Self::builder()
93 .schema(schema)
94 .writer(writer)
95 .codec(codec)
96 .build();
97 w.resolved_schema = ResolvedSchema::try_from(schemata).ok();
98 w
99 }
100
101 pub fn append_to(schema: &'a Schema, writer: W, marker: [u8; 16]) -> Self {
105 Writer::append_to_with_codec(schema, writer, Codec::Null, marker)
106 }
107
108 pub fn append_to_with_codec(
111 schema: &'a Schema,
112 writer: W,
113 codec: Codec,
114 marker: [u8; 16],
115 ) -> Self {
116 let mut w = Self::builder()
117 .schema(schema)
118 .writer(writer)
119 .codec(codec)
120 .marker(marker)
121 .has_header(true)
122 .build();
123 w.resolved_schema = ResolvedSchema::try_from(schema).ok();
124 w
125 }
126
127 pub fn append_to_with_codec_schemata(
130 schema: &'a Schema,
131 schemata: Vec<&'a Schema>,
132 writer: W,
133 codec: Codec,
134 marker: [u8; 16],
135 ) -> Self {
136 let mut w = Self::builder()
137 .schema(schema)
138 .writer(writer)
139 .codec(codec)
140 .marker(marker)
141 .has_header(true)
142 .build();
143 w.resolved_schema = ResolvedSchema::try_from(schemata).ok();
144 w
145 }
146
147 pub fn schema(&self) -> &'a Schema {
149 self.schema
150 }
151
152 pub fn append<T: Into<Value>>(&mut self, value: T) -> AvroResult<usize> {
161 let n = self.maybe_write_header()?;
162
163 let avro = value.into();
164 self.append_value_ref(&avro).map(|m| m + n)
165 }
166
167 pub fn append_value_ref(&mut self, value: &Value) -> AvroResult<usize> {
175 let n = self.maybe_write_header()?;
176
177 match self.resolved_schema {
179 Some(ref rs) => {
180 write_value_ref_resolved(self.schema, rs, value, &mut self.buffer)?;
181 self.num_values += 1;
182
183 if self.buffer.len() >= self.block_size {
184 return self.flush().map(|b| b + n);
185 }
186
187 Ok(n)
188 }
189 None => {
190 let rs = ResolvedSchema::try_from(self.schema)?;
191 self.resolved_schema = Some(rs);
192 self.append_value_ref(value)
193 }
194 }
195 }
196
197 pub fn append_ser<S: Serialize>(&mut self, value: S) -> AvroResult<usize> {
207 let n = self.maybe_write_header()?;
208
209 match self.resolved_schema {
210 Some(ref rs) => {
211 let mut serializer = SchemaAwareWriteSerializer::new(
212 &mut self.buffer,
213 self.schema,
214 rs.get_names(),
215 None,
216 );
217 value.serialize(&mut serializer)?;
218 self.num_values += 1;
219
220 if self.buffer.len() >= self.block_size {
221 return self.flush().map(|b| b + n);
222 }
223
224 Ok(n)
225 }
226 None => {
227 let rs = ResolvedSchema::try_from(self.schema)?;
228 self.resolved_schema = Some(rs);
229 self.append_ser(value)
230 }
231 }
232 }
233
234 pub fn extend<I, T: Into<Value>>(&mut self, values: I) -> AvroResult<usize>
242 where
243 I: IntoIterator<Item = T>,
244 {
245 let mut num_bytes = 0;
260 for value in values {
261 num_bytes += self.append(value)?;
262 }
263 num_bytes += self.flush()?;
264
265 Ok(num_bytes)
266 }
267
268 pub fn extend_ser<I, T: Serialize>(&mut self, values: I) -> AvroResult<usize>
277 where
278 I: IntoIterator<Item = T>,
279 {
280 let mut num_bytes = 0;
295 for value in values {
296 num_bytes += self.append_ser(value)?;
297 }
298 num_bytes += self.flush()?;
299
300 Ok(num_bytes)
301 }
302
303 pub fn extend_from_slice(&mut self, values: &[Value]) -> AvroResult<usize> {
311 let mut num_bytes = 0;
312 for value in values {
313 num_bytes += self.append_value_ref(value)?;
314 }
315 num_bytes += self.flush()?;
316
317 Ok(num_bytes)
318 }
319
320 pub fn flush(&mut self) -> AvroResult<usize> {
325 if self.num_values == 0 {
326 return Ok(0);
327 }
328
329 self.codec.compress(&mut self.buffer)?;
330
331 let num_values = self.num_values;
332 let stream_len = self.buffer.len();
333
334 let num_bytes = self.append_raw(&num_values.into(), &Schema::Long)?
335 + self.append_raw(&stream_len.into(), &Schema::Long)?
336 + self
337 .writer
338 .write(self.buffer.as_ref())
339 .map_err(Error::WriteBytes)?
340 + self.append_marker()?;
341
342 self.buffer.clear();
343 self.num_values = 0;
344
345 self.writer.flush().map_err(Error::FlushWriter)?;
346
347 Ok(num_bytes)
348 }
349
350 pub fn into_inner(mut self) -> AvroResult<W> {
355 self.maybe_write_header()?;
356 self.flush()?;
357
358 let mut this = ManuallyDrop::new(self);
359
360 let _resolved_schema = std::mem::take(&mut this.resolved_schema);
362 let _buffer = std::mem::take(&mut this.buffer);
363 let _user_metadata = std::mem::take(&mut this.user_metadata);
364
365 let writer = unsafe { std::ptr::read(&this.writer) };
367
368 Ok(writer)
369 }
370
371 pub fn get_ref(&self) -> &W {
376 &self.writer
377 }
378
379 pub fn get_mut(&mut self) -> &mut W {
386 &mut self.writer
387 }
388
389 fn append_marker(&mut self) -> AvroResult<usize> {
391 self.writer.write(&self.marker).map_err(Error::WriteMarker)
394 }
395
396 fn append_raw(&mut self, value: &Value, schema: &Schema) -> AvroResult<usize> {
398 self.append_bytes(encode_to_vec(value, schema)?.as_ref())
399 }
400
401 fn append_bytes(&mut self, bytes: &[u8]) -> AvroResult<usize> {
403 self.writer.write(bytes).map_err(Error::WriteBytes)
404 }
405
406 pub fn add_user_metadata<T: AsRef<[u8]>>(&mut self, key: String, value: T) -> AvroResult<()> {
409 if !self.has_header {
410 if key.starts_with("avro.") {
411 return Err(Error::InvalidMetadataKey(key));
412 }
413 self.user_metadata
414 .insert(key, Value::Bytes(value.as_ref().to_vec()));
415 Ok(())
416 } else {
417 Err(Error::FileHeaderAlreadyWritten)
418 }
419 }
420
421 fn header(&self) -> Result<Vec<u8>, Error> {
423 let schema_bytes = serde_json::to_string(self.schema)
424 .map_err(Error::ConvertJsonToString)?
425 .into_bytes();
426
427 let mut metadata = HashMap::with_capacity(2);
428 metadata.insert("avro.schema", Value::Bytes(schema_bytes));
429 metadata.insert("avro.codec", self.codec.into());
430 match self.codec {
431 #[cfg(feature = "bzip")]
432 Codec::Bzip2(settings) => {
433 metadata.insert(
434 "avro.codec.compression_level",
435 Value::Bytes(vec![settings.compression_level]),
436 );
437 }
438 #[cfg(feature = "xz")]
439 Codec::Xz(settings) => {
440 metadata.insert(
441 "avro.codec.compression_level",
442 Value::Bytes(vec![settings.compression_level]),
443 );
444 }
445 #[cfg(feature = "zstandard")]
446 Codec::Zstandard(settings) => {
447 metadata.insert(
448 "avro.codec.compression_level",
449 Value::Bytes(vec![settings.compression_level]),
450 );
451 }
452 _ => {}
453 }
454
455 for (k, v) in &self.user_metadata {
456 metadata.insert(k.as_str(), v.clone());
457 }
458
459 let mut header = Vec::new();
460 header.extend_from_slice(AVRO_OBJECT_HEADER);
461 encode(&metadata.into(), &Schema::map(Schema::Bytes), &mut header)?;
462 header.extend_from_slice(&self.marker);
463
464 Ok(header)
465 }
466
467 fn maybe_write_header(&mut self) -> AvroResult<usize> {
468 if !self.has_header {
469 let header = self.header()?;
470 let n = self.append_bytes(header.as_ref())?;
471 self.has_header = true;
472 Ok(n)
473 } else {
474 Ok(0)
475 }
476 }
477}
478
479impl<'a, W: Write> Drop for Writer<'a, W> {
480 fn drop(&mut self) {
482 let _ = self.maybe_write_header();
483 let _ = self.flush();
484 }
485}
486
487fn write_avro_datum<T: Into<Value>, W: Write>(
493 schema: &Schema,
494 value: T,
495 writer: &mut W,
496) -> Result<(), Error> {
497 let avro = value.into();
498 if !avro.validate(schema) {
499 return Err(Error::Validation);
500 }
501 encode(&avro, schema, writer)?;
502 Ok(())
503}
504
505fn write_avro_datum_schemata<T: Into<Value>>(
506 schema: &Schema,
507 schemata: Vec<&Schema>,
508 value: T,
509 buffer: &mut Vec<u8>,
510) -> AvroResult<usize> {
511 let avro = value.into();
512 let rs = ResolvedSchema::try_from(schemata)?;
513 let names = rs.get_names();
514 let enclosing_namespace = schema.namespace();
515 if let Some(_err) = avro.validate_internal(schema, names, &enclosing_namespace) {
516 return Err(Error::Validation);
517 }
518 encode_internal(&avro, schema, names, &enclosing_namespace, buffer)
519}
520
521pub struct GenericSingleObjectWriter {
525 buffer: Vec<u8>,
526 resolved: ResolvedOwnedSchema,
527}
528
529impl GenericSingleObjectWriter {
530 pub fn new_with_capacity(
531 schema: &Schema,
532 initial_buffer_cap: usize,
533 ) -> AvroResult<GenericSingleObjectWriter> {
534 let header_builder = RabinFingerprintHeader::from_schema(schema);
535 Self::new_with_capacity_and_header_builder(schema, initial_buffer_cap, header_builder)
536 }
537
538 pub fn new_with_capacity_and_header_builder<HB: HeaderBuilder>(
539 schema: &Schema,
540 initial_buffer_cap: usize,
541 header_builder: HB,
542 ) -> AvroResult<GenericSingleObjectWriter> {
543 let mut buffer = Vec::with_capacity(initial_buffer_cap);
544 let header = header_builder.build_header();
545 buffer.extend_from_slice(&header);
546
547 Ok(GenericSingleObjectWriter {
548 buffer,
549 resolved: ResolvedOwnedSchema::try_from(schema.clone())?,
550 })
551 }
552
553 const HEADER_LENGTH_RANGE: RangeInclusive<usize> = 10_usize..=20_usize;
554
555 pub fn write_value_ref<W: Write>(&mut self, v: &Value, writer: &mut W) -> AvroResult<usize> {
557 let original_length = self.buffer.len();
558 if !Self::HEADER_LENGTH_RANGE.contains(&original_length) {
559 Err(Error::IllegalSingleObjectWriterState)
560 } else {
561 write_value_ref_owned_resolved(&self.resolved, v, &mut self.buffer)?;
562 writer.write_all(&self.buffer).map_err(Error::WriteBytes)?;
563 let len = self.buffer.len();
564 self.buffer.truncate(original_length);
565 Ok(len)
566 }
567 }
568
569 pub fn write_value<W: Write>(&mut self, v: Value, writer: &mut W) -> AvroResult<usize> {
571 self.write_value_ref(&v, writer)
572 }
573}
574
575pub struct SpecificSingleObjectWriter<T>
577where
578 T: AvroSchema,
579{
580 inner: GenericSingleObjectWriter,
581 schema: Schema,
582 header_written: bool,
583 _model: PhantomData<T>,
584}
585
586impl<T> SpecificSingleObjectWriter<T>
587where
588 T: AvroSchema,
589{
590 pub fn with_capacity(buffer_cap: usize) -> AvroResult<SpecificSingleObjectWriter<T>> {
591 let schema = T::get_schema();
592 Ok(SpecificSingleObjectWriter {
593 inner: GenericSingleObjectWriter::new_with_capacity(&schema, buffer_cap)?,
594 schema,
595 header_written: false,
596 _model: PhantomData,
597 })
598 }
599}
600
601impl<T> SpecificSingleObjectWriter<T>
602where
603 T: AvroSchema + Into<Value>,
604{
605 pub fn write_value<W: Write>(&mut self, data: T, writer: &mut W) -> AvroResult<usize> {
608 let v: Value = data.into();
609 self.inner.write_value_ref(&v, writer)
610 }
611}
612
613impl<T> SpecificSingleObjectWriter<T>
614where
615 T: AvroSchema + Serialize,
616{
617 pub fn write_ref<W: Write>(&mut self, data: &T, writer: &mut W) -> AvroResult<usize> {
620 let mut bytes_written: usize = 0;
621
622 if !self.header_written {
623 bytes_written += writer
624 .write(self.inner.buffer.as_slice())
625 .map_err(Error::WriteBytes)?;
626 self.header_written = true;
627 }
628
629 bytes_written += write_avro_datum_ref(&self.schema, data, writer)?;
630
631 Ok(bytes_written)
632 }
633
634 pub fn write<W: Write>(&mut self, data: T, writer: &mut W) -> AvroResult<usize> {
637 self.write_ref(&data, writer)
638 }
639}
640
641fn write_value_ref_resolved(
642 schema: &Schema,
643 resolved_schema: &ResolvedSchema,
644 value: &Value,
645 buffer: &mut Vec<u8>,
646) -> AvroResult<usize> {
647 match value.validate_internal(schema, resolved_schema.get_names(), &schema.namespace()) {
648 Some(reason) => Err(Error::ValidationWithReason {
649 value: value.clone(),
650 schema: Box::new(schema.clone()),
651 reason,
652 }),
653 None => encode_internal(
654 value,
655 schema,
656 resolved_schema.get_names(),
657 &schema.namespace(),
658 buffer,
659 ),
660 }
661}
662
663fn write_value_ref_owned_resolved(
664 resolved_schema: &ResolvedOwnedSchema,
665 value: &Value,
666 buffer: &mut Vec<u8>,
667) -> AvroResult<()> {
668 let root_schema = resolved_schema.get_root_schema();
669 if let Some(reason) = value.validate_internal(
670 root_schema,
671 resolved_schema.get_names(),
672 &root_schema.namespace(),
673 ) {
674 return Err(Error::ValidationWithReason {
675 value: value.clone(),
676 schema: Box::new(root_schema.clone()),
677 reason,
678 });
679 }
680 encode_internal(
681 value,
682 root_schema,
683 resolved_schema.get_names(),
684 &root_schema.namespace(),
685 buffer,
686 )?;
687 Ok(())
688}
689
690pub fn to_avro_datum<T: Into<Value>>(schema: &Schema, value: T) -> AvroResult<Vec<u8>> {
697 let mut buffer = Vec::new();
698 write_avro_datum(schema, value, &mut buffer)?;
699 Ok(buffer)
700}
701
702pub fn write_avro_datum_ref<T: Serialize, W: Write>(
709 schema: &Schema,
710 data: &T,
711 writer: &mut W,
712) -> AvroResult<usize> {
713 let names: HashMap<Name, &Schema> = HashMap::new();
714 let mut serializer = SchemaAwareWriteSerializer::new(writer, schema, &names, None);
715 let bytes_written = data.serialize(&mut serializer)?;
716 Ok(bytes_written)
717}
718
719pub fn to_avro_datum_schemata<T: Into<Value>>(
724 schema: &Schema,
725 schemata: Vec<&Schema>,
726 value: T,
727) -> AvroResult<Vec<u8>> {
728 let mut buffer = Vec::new();
729 write_avro_datum_schemata(schema, schemata, value, &mut buffer)?;
730 Ok(buffer)
731}
732
733#[cfg(not(target_arch = "wasm32"))]
734fn generate_sync_marker() -> [u8; 16] {
735 let mut marker = [0_u8; 16];
736 std::iter::repeat_with(rand::random)
737 .take(16)
738 .enumerate()
739 .for_each(|(i, n)| marker[i] = n);
740 marker
741}
742
743#[cfg(target_arch = "wasm32")]
744fn generate_sync_marker() -> [u8; 16] {
745 let mut marker = [0_u8; 16];
746 std::iter::repeat_with(quad_rand::rand)
747 .take(4)
748 .flat_map(|i| i.to_be_bytes())
749 .enumerate()
750 .for_each(|(i, n)| marker[i] = n);
751 marker
752}
753
754#[cfg(test)]
755mod tests {
756 use std::{cell::RefCell, rc::Rc};
757
758 use super::*;
759 use crate::{
760 decimal::Decimal,
761 duration::{Days, Duration, Millis, Months},
762 headers::GlueSchemaUuidHeader,
763 rabin::Rabin,
764 schema::{DecimalSchema, FixedSchema, Name},
765 types::Record,
766 util::zig_i64,
767 Reader,
768 };
769 use pretty_assertions::assert_eq;
770 use serde::{Deserialize, Serialize};
771 use uuid::Uuid;
772
773 use crate::codec::DeflateSettings;
774 use apache_avro_test_helper::TestResult;
775
776 const AVRO_OBJECT_HEADER_LEN: usize = AVRO_OBJECT_HEADER.len();
777
778 const SCHEMA: &str = r#"
779 {
780 "type": "record",
781 "name": "test",
782 "fields": [
783 {
784 "name": "a",
785 "type": "long",
786 "default": 42
787 },
788 {
789 "name": "b",
790 "type": "string"
791 }
792 ]
793 }
794 "#;
795
796 const UNION_SCHEMA: &str = r#"["null", "long"]"#;
797
798 #[test]
799 fn test_to_avro_datum() -> TestResult {
800 let schema = Schema::parse_str(SCHEMA)?;
801 let mut record = Record::new(&schema).unwrap();
802 record.put("a", 27i64);
803 record.put("b", "foo");
804
805 let mut expected = Vec::new();
806 zig_i64(27, &mut expected)?;
807 zig_i64(3, &mut expected)?;
808 expected.extend([b'f', b'o', b'o']);
809
810 assert_eq!(to_avro_datum(&schema, record)?, expected);
811
812 Ok(())
813 }
814
815 #[test]
816 fn avro_rs_193_write_avro_datum_ref() -> TestResult {
817 #[derive(Serialize)]
818 struct TestStruct {
819 a: i64,
820 b: String,
821 }
822
823 let schema = Schema::parse_str(SCHEMA)?;
824 let mut writer: Vec<u8> = Vec::new();
825 let data = TestStruct {
826 a: 27,
827 b: "foo".to_string(),
828 };
829
830 let mut expected = Vec::new();
831 zig_i64(27, &mut expected)?;
832 zig_i64(3, &mut expected)?;
833 expected.extend([b'f', b'o', b'o']);
834
835 let bytes = write_avro_datum_ref(&schema, &data, &mut writer)?;
836
837 assert_eq!(bytes, expected.len());
838 assert_eq!(writer, expected);
839
840 Ok(())
841 }
842
843 #[test]
844 fn test_union_not_null() -> TestResult {
845 let schema = Schema::parse_str(UNION_SCHEMA)?;
846 let union = Value::Union(1, Box::new(Value::Long(3)));
847
848 let mut expected = Vec::new();
849 zig_i64(1, &mut expected)?;
850 zig_i64(3, &mut expected)?;
851
852 assert_eq!(to_avro_datum(&schema, union)?, expected);
853
854 Ok(())
855 }
856
857 #[test]
858 fn test_union_null() -> TestResult {
859 let schema = Schema::parse_str(UNION_SCHEMA)?;
860 let union = Value::Union(0, Box::new(Value::Null));
861
862 let mut expected = Vec::new();
863 zig_i64(0, &mut expected)?;
864
865 assert_eq!(to_avro_datum(&schema, union)?, expected);
866
867 Ok(())
868 }
869
870 fn logical_type_test<T: Into<Value> + Clone>(
871 schema_str: &'static str,
872
873 expected_schema: &Schema,
874 value: Value,
875
876 raw_schema: &Schema,
877 raw_value: T,
878 ) -> TestResult {
879 let schema = Schema::parse_str(schema_str)?;
880 assert_eq!(&schema, expected_schema);
881 let ser = to_avro_datum(&schema, value.clone())?;
883 let raw_ser = to_avro_datum(raw_schema, raw_value)?;
884 assert_eq!(ser, raw_ser);
885
886 let mut r = ser.as_slice();
888 let de = crate::from_avro_datum(&schema, &mut r, None)?;
889 assert_eq!(de, value);
890 Ok(())
891 }
892
893 #[test]
894 fn date() -> TestResult {
895 logical_type_test(
896 r#"{"type": "int", "logicalType": "date"}"#,
897 &Schema::Date,
898 Value::Date(1_i32),
899 &Schema::Int,
900 1_i32,
901 )
902 }
903
904 #[test]
905 fn time_millis() -> TestResult {
906 logical_type_test(
907 r#"{"type": "int", "logicalType": "time-millis"}"#,
908 &Schema::TimeMillis,
909 Value::TimeMillis(1_i32),
910 &Schema::Int,
911 1_i32,
912 )
913 }
914
915 #[test]
916 fn time_micros() -> TestResult {
917 logical_type_test(
918 r#"{"type": "long", "logicalType": "time-micros"}"#,
919 &Schema::TimeMicros,
920 Value::TimeMicros(1_i64),
921 &Schema::Long,
922 1_i64,
923 )
924 }
925
926 #[test]
927 fn timestamp_millis() -> TestResult {
928 logical_type_test(
929 r#"{"type": "long", "logicalType": "timestamp-millis"}"#,
930 &Schema::TimestampMillis,
931 Value::TimestampMillis(1_i64),
932 &Schema::Long,
933 1_i64,
934 )
935 }
936
937 #[test]
938 fn timestamp_micros() -> TestResult {
939 logical_type_test(
940 r#"{"type": "long", "logicalType": "timestamp-micros"}"#,
941 &Schema::TimestampMicros,
942 Value::TimestampMicros(1_i64),
943 &Schema::Long,
944 1_i64,
945 )
946 }
947
948 #[test]
949 fn decimal_fixed() -> TestResult {
950 let size = 30;
951 let inner = Schema::Fixed(FixedSchema {
952 name: Name::new("decimal")?,
953 aliases: None,
954 doc: None,
955 size,
956 default: None,
957 attributes: Default::default(),
958 });
959 let value = vec![0u8; size];
960 logical_type_test(
961 r#"{"type": {"type": "fixed", "size": 30, "name": "decimal"}, "logicalType": "decimal", "precision": 20, "scale": 5}"#,
962 &Schema::Decimal(DecimalSchema {
963 precision: 20,
964 scale: 5,
965 inner: Box::new(inner.clone()),
966 }),
967 Value::Decimal(Decimal::from(value.clone())),
968 &inner,
969 Value::Fixed(size, value),
970 )
971 }
972
973 #[test]
974 fn decimal_bytes() -> TestResult {
975 let inner = Schema::Bytes;
976 let value = vec![0u8; 10];
977 logical_type_test(
978 r#"{"type": "bytes", "logicalType": "decimal", "precision": 4, "scale": 3}"#,
979 &Schema::Decimal(DecimalSchema {
980 precision: 4,
981 scale: 3,
982 inner: Box::new(inner.clone()),
983 }),
984 Value::Decimal(Decimal::from(value.clone())),
985 &inner,
986 value,
987 )
988 }
989
990 #[test]
991 fn duration() -> TestResult {
992 let inner = Schema::Fixed(FixedSchema {
993 name: Name::new("duration")?,
994 aliases: None,
995 doc: None,
996 size: 12,
997 default: None,
998 attributes: Default::default(),
999 });
1000 let value = Value::Duration(Duration::new(
1001 Months::new(256),
1002 Days::new(512),
1003 Millis::new(1024),
1004 ));
1005 logical_type_test(
1006 r#"{"type": {"type": "fixed", "name": "duration", "size": 12}, "logicalType": "duration"}"#,
1007 &Schema::Duration,
1008 value,
1009 &inner,
1010 Value::Fixed(12, vec![0, 1, 0, 0, 0, 2, 0, 0, 0, 4, 0, 0]),
1011 )
1012 }
1013
1014 #[test]
1015 fn test_writer_append() -> TestResult {
1016 let schema = Schema::parse_str(SCHEMA)?;
1017 let mut writer = Writer::new(&schema, Vec::new());
1018
1019 let mut record = Record::new(&schema).unwrap();
1020 record.put("a", 27i64);
1021 record.put("b", "foo");
1022
1023 let n1 = writer.append(record.clone())?;
1024 let n2 = writer.append(record.clone())?;
1025 let n3 = writer.flush()?;
1026 let result = writer.into_inner()?;
1027
1028 assert_eq!(n1 + n2 + n3, result.len());
1029
1030 let mut data = Vec::new();
1031 zig_i64(27, &mut data)?;
1032 zig_i64(3, &mut data)?;
1033 data.extend(b"foo");
1034 data.extend(data.clone());
1035
1036 assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1038 let last_data_byte = result.len() - 16;
1040 assert_eq!(
1041 &result[last_data_byte - data.len()..last_data_byte],
1042 data.as_slice()
1043 );
1044
1045 Ok(())
1046 }
1047
1048 #[test]
1049 fn test_writer_extend() -> TestResult {
1050 let schema = Schema::parse_str(SCHEMA)?;
1051 let mut writer = Writer::new(&schema, Vec::new());
1052
1053 let mut record = Record::new(&schema).unwrap();
1054 record.put("a", 27i64);
1055 record.put("b", "foo");
1056 let record_copy = record.clone();
1057 let records = vec![record, record_copy];
1058
1059 let n1 = writer.extend(records)?;
1060 let n2 = writer.flush()?;
1061 let result = writer.into_inner()?;
1062
1063 assert_eq!(n1 + n2, result.len());
1064
1065 let mut data = Vec::new();
1066 zig_i64(27, &mut data)?;
1067 zig_i64(3, &mut data)?;
1068 data.extend(b"foo");
1069 data.extend(data.clone());
1070
1071 assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1073 let last_data_byte = result.len() - 16;
1075 assert_eq!(
1076 &result[last_data_byte - data.len()..last_data_byte],
1077 data.as_slice()
1078 );
1079
1080 Ok(())
1081 }
1082
1083 #[derive(Debug, Clone, Deserialize, Serialize)]
1084 struct TestSerdeSerialize {
1085 a: i64,
1086 b: String,
1087 }
1088
1089 #[test]
1090 fn test_writer_append_ser() -> TestResult {
1091 let schema = Schema::parse_str(SCHEMA)?;
1092 let mut writer = Writer::new(&schema, Vec::new());
1093
1094 let record = TestSerdeSerialize {
1095 a: 27,
1096 b: "foo".to_owned(),
1097 };
1098
1099 let n1 = writer.append_ser(record)?;
1100 let n2 = writer.flush()?;
1101 let result = writer.into_inner()?;
1102
1103 assert_eq!(n1 + n2, result.len());
1104
1105 let mut data = Vec::new();
1106 zig_i64(27, &mut data)?;
1107 zig_i64(3, &mut data)?;
1108 data.extend(b"foo");
1109
1110 assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1112 let last_data_byte = result.len() - 16;
1114 assert_eq!(
1115 &result[last_data_byte - data.len()..last_data_byte],
1116 data.as_slice()
1117 );
1118
1119 Ok(())
1120 }
1121
1122 #[test]
1123 fn test_writer_extend_ser() -> TestResult {
1124 let schema = Schema::parse_str(SCHEMA)?;
1125 let mut writer = Writer::new(&schema, Vec::new());
1126
1127 let record = TestSerdeSerialize {
1128 a: 27,
1129 b: "foo".to_owned(),
1130 };
1131 let record_copy = record.clone();
1132 let records = vec![record, record_copy];
1133
1134 let n1 = writer.extend_ser(records)?;
1135 let n2 = writer.flush()?;
1136 let result = writer.into_inner()?;
1137
1138 assert_eq!(n1 + n2, result.len());
1139
1140 let mut data = Vec::new();
1141 zig_i64(27, &mut data)?;
1142 zig_i64(3, &mut data)?;
1143 data.extend(b"foo");
1144 data.extend(data.clone());
1145
1146 assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1148 let last_data_byte = result.len() - 16;
1150 assert_eq!(
1151 &result[last_data_byte - data.len()..last_data_byte],
1152 data.as_slice()
1153 );
1154
1155 Ok(())
1156 }
1157
1158 fn make_writer_with_codec(schema: &Schema) -> Writer<'_, Vec<u8>> {
1159 Writer::with_codec(
1160 schema,
1161 Vec::new(),
1162 Codec::Deflate(DeflateSettings::default()),
1163 )
1164 }
1165
1166 fn make_writer_with_builder(schema: &Schema) -> Writer<'_, Vec<u8>> {
1167 Writer::builder()
1168 .writer(Vec::new())
1169 .schema(schema)
1170 .codec(Codec::Deflate(DeflateSettings::default()))
1171 .block_size(100)
1172 .build()
1173 }
1174
1175 fn check_writer(mut writer: Writer<'_, Vec<u8>>, schema: &Schema) -> TestResult {
1176 let mut record = Record::new(schema).unwrap();
1177 record.put("a", 27i64);
1178 record.put("b", "foo");
1179
1180 let n1 = writer.append(record.clone())?;
1181 let n2 = writer.append(record.clone())?;
1182 let n3 = writer.flush()?;
1183 let result = writer.into_inner()?;
1184
1185 assert_eq!(n1 + n2 + n3, result.len());
1186
1187 let mut data = Vec::new();
1188 zig_i64(27, &mut data)?;
1189 zig_i64(3, &mut data)?;
1190 data.extend(b"foo");
1191 data.extend(data.clone());
1192 Codec::Deflate(DeflateSettings::default()).compress(&mut data)?;
1193
1194 assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1196 let last_data_byte = result.len() - 16;
1198 assert_eq!(
1199 &result[last_data_byte - data.len()..last_data_byte],
1200 data.as_slice()
1201 );
1202
1203 Ok(())
1204 }
1205
1206 #[test]
1207 fn test_writer_with_codec() -> TestResult {
1208 let schema = Schema::parse_str(SCHEMA)?;
1209 let writer = make_writer_with_codec(&schema);
1210 check_writer(writer, &schema)
1211 }
1212
1213 #[test]
1214 fn test_writer_with_builder() -> TestResult {
1215 let schema = Schema::parse_str(SCHEMA)?;
1216 let writer = make_writer_with_builder(&schema);
1217 check_writer(writer, &schema)
1218 }
1219
1220 #[test]
1221 fn test_logical_writer() -> TestResult {
1222 const LOGICAL_TYPE_SCHEMA: &str = r#"
1223 {
1224 "type": "record",
1225 "name": "logical_type_test",
1226 "fields": [
1227 {
1228 "name": "a",
1229 "type": [
1230 "null",
1231 {
1232 "type": "long",
1233 "logicalType": "timestamp-micros"
1234 }
1235 ]
1236 }
1237 ]
1238 }
1239 "#;
1240 let codec = Codec::Deflate(DeflateSettings::default());
1241 let schema = Schema::parse_str(LOGICAL_TYPE_SCHEMA)?;
1242 let mut writer = Writer::builder()
1243 .schema(&schema)
1244 .codec(codec)
1245 .writer(Vec::new())
1246 .build();
1247
1248 let mut record1 = Record::new(&schema).unwrap();
1249 record1.put(
1250 "a",
1251 Value::Union(1, Box::new(Value::TimestampMicros(1234_i64))),
1252 );
1253
1254 let mut record2 = Record::new(&schema).unwrap();
1255 record2.put("a", Value::Union(0, Box::new(Value::Null)));
1256
1257 let n1 = writer.append(record1)?;
1258 let n2 = writer.append(record2)?;
1259 let n3 = writer.flush()?;
1260 let result = writer.into_inner()?;
1261
1262 assert_eq!(n1 + n2 + n3, result.len());
1263
1264 let mut data = Vec::new();
1265 zig_i64(1, &mut data)?;
1267 zig_i64(1234, &mut data)?;
1268
1269 zig_i64(0, &mut data)?;
1271 codec.compress(&mut data)?;
1272
1273 assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1275 let last_data_byte = result.len() - 16;
1277 assert_eq!(
1278 &result[last_data_byte - data.len()..last_data_byte],
1279 data.as_slice()
1280 );
1281
1282 Ok(())
1283 }
1284
1285 #[test]
1286 fn test_avro_3405_writer_add_metadata_success() -> TestResult {
1287 let schema = Schema::parse_str(SCHEMA)?;
1288 let mut writer = Writer::new(&schema, Vec::new());
1289
1290 writer.add_user_metadata("stringKey".to_string(), String::from("stringValue"))?;
1291 writer.add_user_metadata("strKey".to_string(), "strValue")?;
1292 writer.add_user_metadata("bytesKey".to_string(), b"bytesValue")?;
1293 writer.add_user_metadata("vecKey".to_string(), vec![1, 2, 3])?;
1294
1295 let mut record = Record::new(&schema).unwrap();
1296 record.put("a", 27i64);
1297 record.put("b", "foo");
1298
1299 writer.append(record.clone())?;
1300 writer.append(record.clone())?;
1301 writer.flush()?;
1302 let result = writer.into_inner()?;
1303
1304 assert_eq!(result.len(), 260);
1305
1306 Ok(())
1307 }
1308
1309 #[test]
1310 fn test_avro_3881_metadata_empty_body() -> TestResult {
1311 let schema = Schema::parse_str(SCHEMA)?;
1312 let mut writer = Writer::new(&schema, Vec::new());
1313 writer.add_user_metadata("a".to_string(), "b")?;
1314 let result = writer.into_inner()?;
1315
1316 let reader = Reader::with_schema(&schema, &result[..])?;
1317 let mut expected = HashMap::new();
1318 expected.insert("a".to_string(), vec![b'b']);
1319 assert_eq!(reader.user_metadata(), &expected);
1320 assert_eq!(reader.into_iter().count(), 0);
1321
1322 Ok(())
1323 }
1324
1325 #[test]
1326 fn test_avro_3405_writer_add_metadata_failure() -> TestResult {
1327 let schema = Schema::parse_str(SCHEMA)?;
1328 let mut writer = Writer::new(&schema, Vec::new());
1329
1330 let mut record = Record::new(&schema).unwrap();
1331 record.put("a", 27i64);
1332 record.put("b", "foo");
1333 writer.append(record.clone())?;
1334
1335 match writer.add_user_metadata("stringKey".to_string(), String::from("value2")) {
1336 Err(e @ Error::FileHeaderAlreadyWritten) => {
1337 assert_eq!(e.to_string(), "The file metadata is already flushed.")
1338 }
1339 Err(e) => panic!("Unexpected error occurred while writing user metadata: {e:?}"),
1340 Ok(_) => panic!("Expected an error that metadata cannot be added after adding data"),
1341 }
1342
1343 Ok(())
1344 }
1345
1346 #[test]
1347 fn test_avro_3405_writer_add_metadata_reserved_prefix_failure() -> TestResult {
1348 let schema = Schema::parse_str(SCHEMA)?;
1349 let mut writer = Writer::new(&schema, Vec::new());
1350
1351 let key = "avro.stringKey".to_string();
1352 match writer.add_user_metadata(key.clone(), "value") {
1353 Err(ref e @ Error::InvalidMetadataKey(_)) => {
1354 assert_eq!(e.to_string(), format!("Metadata keys starting with 'avro.' are reserved for internal usage: {key}."))
1355 }
1356 Err(e) => panic!(
1357 "Unexpected error occurred while writing user metadata with reserved prefix ('avro.'): {e:?}"
1358 ),
1359 Ok(_) => panic!("Expected an error that the metadata key cannot be prefixed with 'avro.'"),
1360 }
1361
1362 Ok(())
1363 }
1364
1365 #[test]
1366 fn test_avro_3405_writer_add_metadata_with_builder_api_success() -> TestResult {
1367 let schema = Schema::parse_str(SCHEMA)?;
1368
1369 let mut user_meta_data: HashMap<String, Value> = HashMap::new();
1370 user_meta_data.insert(
1371 "stringKey".to_string(),
1372 Value::String("stringValue".to_string()),
1373 );
1374 user_meta_data.insert("bytesKey".to_string(), Value::Bytes(b"bytesValue".to_vec()));
1375 user_meta_data.insert("vecKey".to_string(), Value::Bytes(vec![1, 2, 3]));
1376
1377 let writer: Writer<'_, Vec<u8>> = Writer::builder()
1378 .writer(Vec::new())
1379 .schema(&schema)
1380 .user_metadata(user_meta_data.clone())
1381 .build();
1382
1383 assert_eq!(writer.user_metadata, user_meta_data);
1384
1385 Ok(())
1386 }
1387
1388 #[derive(Serialize, Clone)]
1389 struct TestSingleObjectWriter {
1390 a: i64,
1391 b: f64,
1392 c: Vec<String>,
1393 }
1394
1395 impl AvroSchema for TestSingleObjectWriter {
1396 fn get_schema() -> Schema {
1397 let schema = r#"
1398 {
1399 "type":"record",
1400 "name":"TestSingleObjectWrtierSerialize",
1401 "fields":[
1402 {
1403 "name":"a",
1404 "type":"long"
1405 },
1406 {
1407 "name":"b",
1408 "type":"double"
1409 },
1410 {
1411 "name":"c",
1412 "type":{
1413 "type":"array",
1414 "items":"string"
1415 }
1416 }
1417 ]
1418 }
1419 "#;
1420 Schema::parse_str(schema).unwrap()
1421 }
1422 }
1423
1424 impl From<TestSingleObjectWriter> for Value {
1425 fn from(obj: TestSingleObjectWriter) -> Value {
1426 Value::Record(vec![
1427 ("a".into(), obj.a.into()),
1428 ("b".into(), obj.b.into()),
1429 (
1430 "c".into(),
1431 Value::Array(obj.c.into_iter().map(|s| s.into()).collect()),
1432 ),
1433 ])
1434 }
1435 }
1436
1437 #[test]
1438 fn test_single_object_writer() -> TestResult {
1439 let mut buf: Vec<u8> = Vec::new();
1440 let obj = TestSingleObjectWriter {
1441 a: 300,
1442 b: 34.555,
1443 c: vec!["cat".into(), "dog".into()],
1444 };
1445 let mut writer = GenericSingleObjectWriter::new_with_capacity(
1446 &TestSingleObjectWriter::get_schema(),
1447 1024,
1448 )
1449 .expect("Should resolve schema");
1450 let value = obj.into();
1451 let written_bytes = writer
1452 .write_value_ref(&value, &mut buf)
1453 .expect("Error serializing properly");
1454
1455 assert!(buf.len() > 10, "no bytes written");
1456 assert_eq!(buf.len(), written_bytes);
1457 assert_eq!(buf[0], 0xC3);
1458 assert_eq!(buf[1], 0x01);
1459 assert_eq!(
1460 &buf[2..10],
1461 &TestSingleObjectWriter::get_schema()
1462 .fingerprint::<Rabin>()
1463 .bytes[..]
1464 );
1465 let mut msg_binary = Vec::new();
1466 encode(
1467 &value,
1468 &TestSingleObjectWriter::get_schema(),
1469 &mut msg_binary,
1470 )
1471 .expect("encode should have failed by here as a dependency of any writing");
1472 assert_eq!(&buf[10..], &msg_binary[..]);
1473
1474 Ok(())
1475 }
1476
1477 #[test]
1478 fn test_single_object_writer_with_header_builder() -> TestResult {
1479 let mut buf: Vec<u8> = Vec::new();
1480 let obj = TestSingleObjectWriter {
1481 a: 300,
1482 b: 34.555,
1483 c: vec!["cat".into(), "dog".into()],
1484 };
1485 let schema_uuid = Uuid::parse_str("b2f1cf00-0434-013e-439a-125eb8485a5f")?;
1486 let header_builder = GlueSchemaUuidHeader::from_uuid(schema_uuid);
1487 let mut writer = GenericSingleObjectWriter::new_with_capacity_and_header_builder(
1488 &TestSingleObjectWriter::get_schema(),
1489 1024,
1490 header_builder,
1491 )
1492 .expect("Should resolve schema");
1493 let value = obj.into();
1494 writer
1495 .write_value_ref(&value, &mut buf)
1496 .expect("Error serializing properly");
1497
1498 assert_eq!(buf[0], 0x03);
1499 assert_eq!(buf[1], 0x00);
1500 assert_eq!(buf[2..18], schema_uuid.into_bytes()[..]);
1501 Ok(())
1502 }
1503
1504 #[test]
1505 fn test_writer_parity() -> TestResult {
1506 let obj1 = TestSingleObjectWriter {
1507 a: 300,
1508 b: 34.555,
1509 c: vec!["cat".into(), "dog".into()],
1510 };
1511
1512 let mut buf1: Vec<u8> = Vec::new();
1513 let mut buf2: Vec<u8> = Vec::new();
1514 let mut buf3: Vec<u8> = Vec::new();
1515
1516 let mut generic_writer = GenericSingleObjectWriter::new_with_capacity(
1517 &TestSingleObjectWriter::get_schema(),
1518 1024,
1519 )
1520 .expect("Should resolve schema");
1521 let mut specific_writer =
1522 SpecificSingleObjectWriter::<TestSingleObjectWriter>::with_capacity(1024)
1523 .expect("Resolved should pass");
1524 specific_writer
1525 .write(obj1.clone(), &mut buf1)
1526 .expect("Serialization expected");
1527 specific_writer
1528 .write_value(obj1.clone(), &mut buf2)
1529 .expect("Serialization expected");
1530 generic_writer
1531 .write_value(obj1.into(), &mut buf3)
1532 .expect("Serialization expected");
1533 assert_eq!(buf1, buf2);
1534 assert_eq!(buf1, buf3);
1535
1536 Ok(())
1537 }
1538
1539 #[test]
1540 fn avro_3894_take_aliases_into_account_when_serializing() -> TestResult {
1541 const SCHEMA: &str = r#"
1542 {
1543 "type": "record",
1544 "name": "Conference",
1545 "fields": [
1546 {"type": "string", "name": "name"},
1547 {"type": ["null", "long"], "name": "date", "aliases" : [ "time2", "time" ]}
1548 ]
1549 }"#;
1550
1551 #[derive(Debug, PartialEq, Eq, Clone, Serialize)]
1552 pub struct Conference {
1553 pub name: String,
1554 pub time: Option<i64>,
1555 }
1556
1557 let conf = Conference {
1558 name: "RustConf".to_string(),
1559 time: Some(1234567890),
1560 };
1561
1562 let schema = Schema::parse_str(SCHEMA)?;
1563 let mut writer = Writer::new(&schema, Vec::new());
1564
1565 let bytes = writer.append_ser(conf)?;
1566
1567 assert_eq!(198, bytes);
1568
1569 Ok(())
1570 }
1571
1572 #[test]
1573 fn avro_4014_validation_returns_a_detailed_error() -> TestResult {
1574 const SCHEMA: &str = r#"
1575 {
1576 "type": "record",
1577 "name": "Conference",
1578 "fields": [
1579 {"type": "string", "name": "name"},
1580 {"type": ["null", "long"], "name": "date", "aliases" : [ "time2", "time" ]}
1581 ]
1582 }"#;
1583
1584 #[derive(Debug, PartialEq, Clone, Serialize)]
1585 pub struct Conference {
1586 pub name: String,
1587 pub time: Option<f64>, }
1589
1590 let conf = Conference {
1591 name: "RustConf".to_string(),
1592 time: Some(12345678.90),
1593 };
1594
1595 let schema = Schema::parse_str(SCHEMA)?;
1596 let mut writer = Writer::new(&schema, Vec::new());
1597
1598 match writer.append_ser(conf) {
1599 Ok(bytes) => panic!("Expected an error, but got {bytes} bytes written"),
1600 Err(e) => {
1601 assert_eq!(
1602 e.to_string(),
1603 r#"Failed to serialize field 'time' for record Record(RecordSchema { name: Name { name: "Conference", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "name", doc: None, aliases: None, default: None, schema: String, order: Ascending, position: 0, custom_attributes: {} }, RecordField { name: "date", doc: None, aliases: Some(["time2", "time"]), default: None, schema: Union(UnionSchema { schemas: [Null, Long], variant_index: {Null: 0, Long: 1} }), order: Ascending, position: 1, custom_attributes: {} }], lookup: {"date": 1, "name": 0, "time": 1, "time2": 1}, attributes: {} }): Failed to serialize value of type f64 using schema Long: 12345678.9. Cause: Expected: Long. Got: Double"#
1604 );
1605 }
1606 }
1607 Ok(())
1608 }
1609
1610 #[test]
1611 fn avro_4063_flush_applies_to_inner_writer() -> TestResult {
1612 const SCHEMA: &str = r#"
1613 {
1614 "type": "record",
1615 "name": "ExampleSchema",
1616 "fields": [
1617 {"name": "exampleField", "type": "string"}
1618 ]
1619 }
1620 "#;
1621
1622 #[derive(Clone, Default)]
1623 struct TestBuffer(Rc<RefCell<Vec<u8>>>);
1624
1625 impl TestBuffer {
1626 fn len(&self) -> usize {
1627 self.0.borrow().len()
1628 }
1629 }
1630
1631 impl Write for TestBuffer {
1632 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
1633 self.0.borrow_mut().write(buf)
1634 }
1635
1636 fn flush(&mut self) -> std::io::Result<()> {
1637 Ok(())
1638 }
1639 }
1640
1641 let shared_buffer = TestBuffer::default();
1642
1643 let buffered_writer = std::io::BufWriter::new(shared_buffer.clone());
1644
1645 let schema = Schema::parse_str(SCHEMA)?;
1646
1647 let mut writer = Writer::new(&schema, buffered_writer);
1648
1649 let mut record = Record::new(writer.schema()).unwrap();
1650 record.put("exampleField", "value");
1651
1652 writer.append(record)?;
1653 writer.flush()?;
1654
1655 assert_eq!(
1656 shared_buffer.len(),
1657 167,
1658 "the test buffer was not fully written to after Writer::flush was called"
1659 );
1660
1661 Ok(())
1662 }
1663}