1use crate::{
20 encode::{encode, encode_internal, encode_to_vec},
21 rabin::Rabin,
22 schema::{AvroSchema, Name, ResolvedOwnedSchema, ResolvedSchema, Schema},
23 ser_schema::SchemaAwareWriteSerializer,
24 types::Value,
25 AvroResult, Codec, Error,
26};
27use serde::Serialize;
28use std::{collections::HashMap, io::Write, marker::PhantomData};
29
30const DEFAULT_BLOCK_SIZE: usize = 16000;
31const AVRO_OBJECT_HEADER: &[u8] = b"Obj\x01";
32
33#[derive(bon::Builder)]
35pub struct Writer<'a, W: Write> {
36 schema: &'a Schema,
37 writer: W,
38 #[builder(skip)]
39 resolved_schema: Option<ResolvedSchema<'a>>,
40 #[builder(default = Codec::Null)]
41 codec: Codec,
42 #[builder(default = DEFAULT_BLOCK_SIZE)]
43 block_size: usize,
44 #[builder(skip = Vec::with_capacity(block_size))]
45 buffer: Vec<u8>,
46 #[builder(skip)]
47 num_values: usize,
48 #[builder(default = generate_sync_marker())]
49 marker: [u8; 16],
50 #[builder(default = false)]
51 has_header: bool,
52 #[builder(default)]
53 user_metadata: HashMap<String, Value>,
54}
55
56impl<'a, W: Write> Writer<'a, W> {
57 pub fn new(schema: &'a Schema, writer: W) -> Self {
61 Writer::with_codec(schema, writer, Codec::Null)
62 }
63
64 pub fn with_codec(schema: &'a Schema, writer: W, codec: Codec) -> Self {
67 let mut w = Self::builder()
68 .schema(schema)
69 .writer(writer)
70 .codec(codec)
71 .build();
72 w.resolved_schema = ResolvedSchema::try_from(schema).ok();
73 w
74 }
75
76 pub fn with_schemata(
81 schema: &'a Schema,
82 schemata: Vec<&'a Schema>,
83 writer: W,
84 codec: Codec,
85 ) -> Self {
86 let mut w = Self::builder()
87 .schema(schema)
88 .writer(writer)
89 .codec(codec)
90 .build();
91 w.resolved_schema = ResolvedSchema::try_from(schemata).ok();
92 w
93 }
94
95 pub fn append_to(schema: &'a Schema, writer: W, marker: [u8; 16]) -> Self {
99 Writer::append_to_with_codec(schema, writer, Codec::Null, marker)
100 }
101
102 pub fn append_to_with_codec(
105 schema: &'a Schema,
106 writer: W,
107 codec: Codec,
108 marker: [u8; 16],
109 ) -> Self {
110 let mut w = Self::builder()
111 .schema(schema)
112 .writer(writer)
113 .codec(codec)
114 .marker(marker)
115 .has_header(true)
116 .build();
117 w.resolved_schema = ResolvedSchema::try_from(schema).ok();
118 w
119 }
120
121 pub fn append_to_with_codec_schemata(
124 schema: &'a Schema,
125 schemata: Vec<&'a Schema>,
126 writer: W,
127 codec: Codec,
128 marker: [u8; 16],
129 ) -> Self {
130 let mut w = Self::builder()
131 .schema(schema)
132 .writer(writer)
133 .codec(codec)
134 .marker(marker)
135 .has_header(true)
136 .build();
137 w.resolved_schema = ResolvedSchema::try_from(schemata).ok();
138 w
139 }
140
141 pub fn schema(&self) -> &'a Schema {
143 self.schema
144 }
145
146 pub fn append<T: Into<Value>>(&mut self, value: T) -> AvroResult<usize> {
155 let n = self.maybe_write_header()?;
156
157 let avro = value.into();
158 self.append_value_ref(&avro).map(|m| m + n)
159 }
160
161 pub fn append_value_ref(&mut self, value: &Value) -> AvroResult<usize> {
169 let n = self.maybe_write_header()?;
170
171 match self.resolved_schema {
173 Some(ref rs) => {
174 write_value_ref_resolved(self.schema, rs, value, &mut self.buffer)?;
175 self.num_values += 1;
176
177 if self.buffer.len() >= self.block_size {
178 return self.flush().map(|b| b + n);
179 }
180
181 Ok(n)
182 }
183 None => {
184 let rs = ResolvedSchema::try_from(self.schema)?;
185 self.resolved_schema = Some(rs);
186 self.append_value_ref(value)
187 }
188 }
189 }
190
191 pub fn append_ser<S: Serialize>(&mut self, value: S) -> AvroResult<usize> {
201 let n = self.maybe_write_header()?;
202
203 match self.resolved_schema {
204 Some(ref rs) => {
205 let mut serializer = SchemaAwareWriteSerializer::new(
206 &mut self.buffer,
207 self.schema,
208 rs.get_names(),
209 None,
210 );
211 value.serialize(&mut serializer)?;
212 self.num_values += 1;
213
214 if self.buffer.len() >= self.block_size {
215 return self.flush().map(|b| b + n);
216 }
217
218 Ok(n)
219 }
220 None => {
221 let rs = ResolvedSchema::try_from(self.schema)?;
222 self.resolved_schema = Some(rs);
223 self.append_ser(value)
224 }
225 }
226 }
227
228 pub fn extend<I, T: Into<Value>>(&mut self, values: I) -> AvroResult<usize>
236 where
237 I: IntoIterator<Item = T>,
238 {
239 let mut num_bytes = 0;
254 for value in values {
255 num_bytes += self.append(value)?;
256 }
257 num_bytes += self.flush()?;
258
259 Ok(num_bytes)
260 }
261
262 pub fn extend_ser<I, T: Serialize>(&mut self, values: I) -> AvroResult<usize>
271 where
272 I: IntoIterator<Item = T>,
273 {
274 let mut num_bytes = 0;
289 for value in values {
290 num_bytes += self.append_ser(value)?;
291 }
292 num_bytes += self.flush()?;
293
294 Ok(num_bytes)
295 }
296
297 pub fn extend_from_slice(&mut self, values: &[Value]) -> AvroResult<usize> {
305 let mut num_bytes = 0;
306 for value in values {
307 num_bytes += self.append_value_ref(value)?;
308 }
309 num_bytes += self.flush()?;
310
311 Ok(num_bytes)
312 }
313
314 pub fn flush(&mut self) -> AvroResult<usize> {
319 if self.num_values == 0 {
320 return Ok(0);
321 }
322
323 self.codec.compress(&mut self.buffer)?;
324
325 let num_values = self.num_values;
326 let stream_len = self.buffer.len();
327
328 let num_bytes = self.append_raw(&num_values.into(), &Schema::Long)?
329 + self.append_raw(&stream_len.into(), &Schema::Long)?
330 + self
331 .writer
332 .write(self.buffer.as_ref())
333 .map_err(Error::WriteBytes)?
334 + self.append_marker()?;
335
336 self.buffer.clear();
337 self.num_values = 0;
338
339 self.writer.flush().map_err(Error::FlushWriter)?;
340
341 Ok(num_bytes)
342 }
343
344 pub fn into_inner(mut self) -> AvroResult<W> {
349 self.maybe_write_header()?;
350 self.flush()?;
351 Ok(self.writer)
352 }
353
354 fn append_marker(&mut self) -> AvroResult<usize> {
356 self.writer.write(&self.marker).map_err(Error::WriteMarker)
359 }
360
361 fn append_raw(&mut self, value: &Value, schema: &Schema) -> AvroResult<usize> {
363 self.append_bytes(encode_to_vec(value, schema)?.as_ref())
364 }
365
366 fn append_bytes(&mut self, bytes: &[u8]) -> AvroResult<usize> {
368 self.writer.write(bytes).map_err(Error::WriteBytes)
369 }
370
371 pub fn add_user_metadata<T: AsRef<[u8]>>(&mut self, key: String, value: T) -> AvroResult<()> {
374 if !self.has_header {
375 if key.starts_with("avro.") {
376 return Err(Error::InvalidMetadataKey(key));
377 }
378 self.user_metadata
379 .insert(key, Value::Bytes(value.as_ref().to_vec()));
380 Ok(())
381 } else {
382 Err(Error::FileHeaderAlreadyWritten)
383 }
384 }
385
386 fn header(&self) -> Result<Vec<u8>, Error> {
388 let schema_bytes = serde_json::to_string(self.schema)
389 .map_err(Error::ConvertJsonToString)?
390 .into_bytes();
391
392 let mut metadata = HashMap::with_capacity(2);
393 metadata.insert("avro.schema", Value::Bytes(schema_bytes));
394 metadata.insert("avro.codec", self.codec.into());
395 match self.codec {
396 #[cfg(feature = "bzip")]
397 Codec::Bzip2(settings) => {
398 metadata.insert(
399 "avro.codec.compression_level",
400 Value::Bytes(vec![settings.compression_level]),
401 );
402 }
403 #[cfg(feature = "xz")]
404 Codec::Xz(settings) => {
405 metadata.insert(
406 "avro.codec.compression_level",
407 Value::Bytes(vec![settings.compression_level]),
408 );
409 }
410 #[cfg(feature = "zstandard")]
411 Codec::Zstandard(settings) => {
412 metadata.insert(
413 "avro.codec.compression_level",
414 Value::Bytes(vec![settings.compression_level]),
415 );
416 }
417 _ => {}
418 }
419
420 for (k, v) in &self.user_metadata {
421 metadata.insert(k.as_str(), v.clone());
422 }
423
424 let mut header = Vec::new();
425 header.extend_from_slice(AVRO_OBJECT_HEADER);
426 encode(&metadata.into(), &Schema::map(Schema::Bytes), &mut header)?;
427 header.extend_from_slice(&self.marker);
428
429 Ok(header)
430 }
431
432 fn maybe_write_header(&mut self) -> AvroResult<usize> {
433 if !self.has_header {
434 let header = self.header()?;
435 let n = self.append_bytes(header.as_ref())?;
436 self.has_header = true;
437 Ok(n)
438 } else {
439 Ok(0)
440 }
441 }
442}
443
444fn write_avro_datum<T: Into<Value>, W: Write>(
450 schema: &Schema,
451 value: T,
452 writer: &mut W,
453) -> Result<(), Error> {
454 let avro = value.into();
455 if !avro.validate(schema) {
456 return Err(Error::Validation);
457 }
458 encode(&avro, schema, writer)?;
459 Ok(())
460}
461
462fn write_avro_datum_schemata<T: Into<Value>>(
463 schema: &Schema,
464 schemata: Vec<&Schema>,
465 value: T,
466 buffer: &mut Vec<u8>,
467) -> AvroResult<usize> {
468 let avro = value.into();
469 let rs = ResolvedSchema::try_from(schemata)?;
470 let names = rs.get_names();
471 let enclosing_namespace = schema.namespace();
472 if let Some(_err) = avro.validate_internal(schema, names, &enclosing_namespace) {
473 return Err(Error::Validation);
474 }
475 encode_internal(&avro, schema, names, &enclosing_namespace, buffer)
476}
477
478pub struct GenericSingleObjectWriter {
482 buffer: Vec<u8>,
483 resolved: ResolvedOwnedSchema,
484}
485
486impl GenericSingleObjectWriter {
487 pub fn new_with_capacity(
488 schema: &Schema,
489 initial_buffer_cap: usize,
490 ) -> AvroResult<GenericSingleObjectWriter> {
491 let fingerprint = schema.fingerprint::<Rabin>();
492 let mut buffer = Vec::with_capacity(initial_buffer_cap);
493 let header = [
494 0xC3,
495 0x01,
496 fingerprint.bytes[0],
497 fingerprint.bytes[1],
498 fingerprint.bytes[2],
499 fingerprint.bytes[3],
500 fingerprint.bytes[4],
501 fingerprint.bytes[5],
502 fingerprint.bytes[6],
503 fingerprint.bytes[7],
504 ];
505 buffer.extend_from_slice(&header);
506
507 Ok(GenericSingleObjectWriter {
508 buffer,
509 resolved: ResolvedOwnedSchema::try_from(schema.clone())?,
510 })
511 }
512
513 pub fn write_value_ref<W: Write>(&mut self, v: &Value, writer: &mut W) -> AvroResult<usize> {
515 if self.buffer.len() != 10 {
516 Err(Error::IllegalSingleObjectWriterState)
517 } else {
518 write_value_ref_owned_resolved(&self.resolved, v, &mut self.buffer)?;
519 writer.write_all(&self.buffer).map_err(Error::WriteBytes)?;
520 let len = self.buffer.len();
521 self.buffer.truncate(10);
522 Ok(len)
523 }
524 }
525
526 pub fn write_value<W: Write>(&mut self, v: Value, writer: &mut W) -> AvroResult<usize> {
528 self.write_value_ref(&v, writer)
529 }
530}
531
532pub struct SpecificSingleObjectWriter<T>
534where
535 T: AvroSchema,
536{
537 inner: GenericSingleObjectWriter,
538 schema: Schema,
539 header_written: bool,
540 _model: PhantomData<T>,
541}
542
543impl<T> SpecificSingleObjectWriter<T>
544where
545 T: AvroSchema,
546{
547 pub fn with_capacity(buffer_cap: usize) -> AvroResult<SpecificSingleObjectWriter<T>> {
548 let schema = T::get_schema();
549 Ok(SpecificSingleObjectWriter {
550 inner: GenericSingleObjectWriter::new_with_capacity(&schema, buffer_cap)?,
551 schema,
552 header_written: false,
553 _model: PhantomData,
554 })
555 }
556}
557
558impl<T> SpecificSingleObjectWriter<T>
559where
560 T: AvroSchema + Into<Value>,
561{
562 pub fn write_value<W: Write>(&mut self, data: T, writer: &mut W) -> AvroResult<usize> {
565 let v: Value = data.into();
566 self.inner.write_value_ref(&v, writer)
567 }
568}
569
570impl<T> SpecificSingleObjectWriter<T>
571where
572 T: AvroSchema + Serialize,
573{
574 pub fn write_ref<W: Write>(&mut self, data: &T, writer: &mut W) -> AvroResult<usize> {
577 let mut bytes_written: usize = 0;
578
579 if !self.header_written {
580 bytes_written += writer
581 .write(self.inner.buffer.as_slice())
582 .map_err(Error::WriteBytes)?;
583 self.header_written = true;
584 }
585
586 let names: HashMap<Name, &Schema> = HashMap::new();
587 let mut serializer = SchemaAwareWriteSerializer::new(writer, &self.schema, &names, None);
588 bytes_written += data.serialize(&mut serializer)?;
589
590 Ok(bytes_written)
591 }
592
593 pub fn write<W: Write>(&mut self, data: T, writer: &mut W) -> AvroResult<usize> {
596 self.write_ref(&data, writer)
597 }
598}
599
600fn write_value_ref_resolved(
601 schema: &Schema,
602 resolved_schema: &ResolvedSchema,
603 value: &Value,
604 buffer: &mut Vec<u8>,
605) -> AvroResult<usize> {
606 match value.validate_internal(schema, resolved_schema.get_names(), &schema.namespace()) {
607 Some(reason) => Err(Error::ValidationWithReason {
608 value: value.clone(),
609 schema: schema.clone(),
610 reason,
611 }),
612 None => encode_internal(
613 value,
614 schema,
615 resolved_schema.get_names(),
616 &schema.namespace(),
617 buffer,
618 ),
619 }
620}
621
622fn write_value_ref_owned_resolved(
623 resolved_schema: &ResolvedOwnedSchema,
624 value: &Value,
625 buffer: &mut Vec<u8>,
626) -> AvroResult<()> {
627 let root_schema = resolved_schema.get_root_schema();
628 if let Some(reason) = value.validate_internal(
629 root_schema,
630 resolved_schema.get_names(),
631 &root_schema.namespace(),
632 ) {
633 return Err(Error::ValidationWithReason {
634 value: value.clone(),
635 schema: root_schema.clone(),
636 reason,
637 });
638 }
639 encode_internal(
640 value,
641 root_schema,
642 resolved_schema.get_names(),
643 &root_schema.namespace(),
644 buffer,
645 )?;
646 Ok(())
647}
648
649pub fn to_avro_datum<T: Into<Value>>(schema: &Schema, value: T) -> AvroResult<Vec<u8>> {
656 let mut buffer = Vec::new();
657 write_avro_datum(schema, value, &mut buffer)?;
658 Ok(buffer)
659}
660
661pub fn to_avro_datum_schemata<T: Into<Value>>(
666 schema: &Schema,
667 schemata: Vec<&Schema>,
668 value: T,
669) -> AvroResult<Vec<u8>> {
670 let mut buffer = Vec::new();
671 write_avro_datum_schemata(schema, schemata, value, &mut buffer)?;
672 Ok(buffer)
673}
674
675#[cfg(not(target_arch = "wasm32"))]
676fn generate_sync_marker() -> [u8; 16] {
677 let mut marker = [0_u8; 16];
678 std::iter::repeat_with(rand::random)
679 .take(16)
680 .enumerate()
681 .for_each(|(i, n)| marker[i] = n);
682 marker
683}
684
685#[cfg(target_arch = "wasm32")]
686fn generate_sync_marker() -> [u8; 16] {
687 let mut marker = [0_u8; 16];
688 std::iter::repeat_with(quad_rand::rand)
689 .take(4)
690 .flat_map(|i| i.to_be_bytes())
691 .enumerate()
692 .for_each(|(i, n)| marker[i] = n);
693 marker
694}
695
696#[cfg(test)]
697mod tests {
698 use std::{cell::RefCell, rc::Rc};
699
700 use super::*;
701 use crate::{
702 decimal::Decimal,
703 duration::{Days, Duration, Millis, Months},
704 schema::{DecimalSchema, FixedSchema, Name},
705 types::Record,
706 util::zig_i64,
707 Reader,
708 };
709 use pretty_assertions::assert_eq;
710 use serde::{Deserialize, Serialize};
711
712 use crate::codec::DeflateSettings;
713 use apache_avro_test_helper::TestResult;
714
715 const AVRO_OBJECT_HEADER_LEN: usize = AVRO_OBJECT_HEADER.len();
716
717 const SCHEMA: &str = r#"
718 {
719 "type": "record",
720 "name": "test",
721 "fields": [
722 {
723 "name": "a",
724 "type": "long",
725 "default": 42
726 },
727 {
728 "name": "b",
729 "type": "string"
730 }
731 ]
732 }
733 "#;
734 const UNION_SCHEMA: &str = r#"["null", "long"]"#;
735
736 #[test]
737 fn test_to_avro_datum() -> TestResult {
738 let schema = Schema::parse_str(SCHEMA)?;
739 let mut record = Record::new(&schema).unwrap();
740 record.put("a", 27i64);
741 record.put("b", "foo");
742
743 let mut expected = Vec::new();
744 zig_i64(27, &mut expected)?;
745 zig_i64(3, &mut expected)?;
746 expected.extend([b'f', b'o', b'o']);
747
748 assert_eq!(to_avro_datum(&schema, record)?, expected);
749
750 Ok(())
751 }
752
753 #[test]
754 fn test_union_not_null() -> TestResult {
755 let schema = Schema::parse_str(UNION_SCHEMA)?;
756 let union = Value::Union(1, Box::new(Value::Long(3)));
757
758 let mut expected = Vec::new();
759 zig_i64(1, &mut expected)?;
760 zig_i64(3, &mut expected)?;
761
762 assert_eq!(to_avro_datum(&schema, union)?, expected);
763
764 Ok(())
765 }
766
767 #[test]
768 fn test_union_null() -> TestResult {
769 let schema = Schema::parse_str(UNION_SCHEMA)?;
770 let union = Value::Union(0, Box::new(Value::Null));
771
772 let mut expected = Vec::new();
773 zig_i64(0, &mut expected)?;
774
775 assert_eq!(to_avro_datum(&schema, union)?, expected);
776
777 Ok(())
778 }
779
780 fn logical_type_test<T: Into<Value> + Clone>(
781 schema_str: &'static str,
782
783 expected_schema: &Schema,
784 value: Value,
785
786 raw_schema: &Schema,
787 raw_value: T,
788 ) -> TestResult {
789 let schema = Schema::parse_str(schema_str)?;
790 assert_eq!(&schema, expected_schema);
791 let ser = to_avro_datum(&schema, value.clone())?;
793 let raw_ser = to_avro_datum(raw_schema, raw_value)?;
794 assert_eq!(ser, raw_ser);
795
796 let mut r = ser.as_slice();
798 let de = crate::from_avro_datum(&schema, &mut r, None)?;
799 assert_eq!(de, value);
800 Ok(())
801 }
802
803 #[test]
804 fn date() -> TestResult {
805 logical_type_test(
806 r#"{"type": "int", "logicalType": "date"}"#,
807 &Schema::Date,
808 Value::Date(1_i32),
809 &Schema::Int,
810 1_i32,
811 )
812 }
813
814 #[test]
815 fn time_millis() -> TestResult {
816 logical_type_test(
817 r#"{"type": "int", "logicalType": "time-millis"}"#,
818 &Schema::TimeMillis,
819 Value::TimeMillis(1_i32),
820 &Schema::Int,
821 1_i32,
822 )
823 }
824
825 #[test]
826 fn time_micros() -> TestResult {
827 logical_type_test(
828 r#"{"type": "long", "logicalType": "time-micros"}"#,
829 &Schema::TimeMicros,
830 Value::TimeMicros(1_i64),
831 &Schema::Long,
832 1_i64,
833 )
834 }
835
836 #[test]
837 fn timestamp_millis() -> TestResult {
838 logical_type_test(
839 r#"{"type": "long", "logicalType": "timestamp-millis"}"#,
840 &Schema::TimestampMillis,
841 Value::TimestampMillis(1_i64),
842 &Schema::Long,
843 1_i64,
844 )
845 }
846
847 #[test]
848 fn timestamp_micros() -> TestResult {
849 logical_type_test(
850 r#"{"type": "long", "logicalType": "timestamp-micros"}"#,
851 &Schema::TimestampMicros,
852 Value::TimestampMicros(1_i64),
853 &Schema::Long,
854 1_i64,
855 )
856 }
857
858 #[test]
859 fn decimal_fixed() -> TestResult {
860 let size = 30;
861 let inner = Schema::Fixed(FixedSchema {
862 name: Name::new("decimal")?,
863 aliases: None,
864 doc: None,
865 size,
866 default: None,
867 attributes: Default::default(),
868 });
869 let value = vec![0u8; size];
870 logical_type_test(
871 r#"{"type": {"type": "fixed", "size": 30, "name": "decimal"}, "logicalType": "decimal", "precision": 20, "scale": 5}"#,
872 &Schema::Decimal(DecimalSchema {
873 precision: 20,
874 scale: 5,
875 inner: Box::new(inner.clone()),
876 }),
877 Value::Decimal(Decimal::from(value.clone())),
878 &inner,
879 Value::Fixed(size, value),
880 )
881 }
882
883 #[test]
884 fn decimal_bytes() -> TestResult {
885 let inner = Schema::Bytes;
886 let value = vec![0u8; 10];
887 logical_type_test(
888 r#"{"type": "bytes", "logicalType": "decimal", "precision": 4, "scale": 3}"#,
889 &Schema::Decimal(DecimalSchema {
890 precision: 4,
891 scale: 3,
892 inner: Box::new(inner.clone()),
893 }),
894 Value::Decimal(Decimal::from(value.clone())),
895 &inner,
896 value,
897 )
898 }
899
900 #[test]
901 fn duration() -> TestResult {
902 let inner = Schema::Fixed(FixedSchema {
903 name: Name::new("duration")?,
904 aliases: None,
905 doc: None,
906 size: 12,
907 default: None,
908 attributes: Default::default(),
909 });
910 let value = Value::Duration(Duration::new(
911 Months::new(256),
912 Days::new(512),
913 Millis::new(1024),
914 ));
915 logical_type_test(
916 r#"{"type": {"type": "fixed", "name": "duration", "size": 12}, "logicalType": "duration"}"#,
917 &Schema::Duration,
918 value,
919 &inner,
920 Value::Fixed(12, vec![0, 1, 0, 0, 0, 2, 0, 0, 0, 4, 0, 0]),
921 )
922 }
923
924 #[test]
925 fn test_writer_append() -> TestResult {
926 let schema = Schema::parse_str(SCHEMA)?;
927 let mut writer = Writer::new(&schema, Vec::new());
928
929 let mut record = Record::new(&schema).unwrap();
930 record.put("a", 27i64);
931 record.put("b", "foo");
932
933 let n1 = writer.append(record.clone())?;
934 let n2 = writer.append(record.clone())?;
935 let n3 = writer.flush()?;
936 let result = writer.into_inner()?;
937
938 assert_eq!(n1 + n2 + n3, result.len());
939
940 let mut data = Vec::new();
941 zig_i64(27, &mut data)?;
942 zig_i64(3, &mut data)?;
943 data.extend(b"foo");
944 data.extend(data.clone());
945
946 assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
948 let last_data_byte = result.len() - 16;
950 assert_eq!(
951 &result[last_data_byte - data.len()..last_data_byte],
952 data.as_slice()
953 );
954
955 Ok(())
956 }
957
958 #[test]
959 fn test_writer_extend() -> TestResult {
960 let schema = Schema::parse_str(SCHEMA)?;
961 let mut writer = Writer::new(&schema, Vec::new());
962
963 let mut record = Record::new(&schema).unwrap();
964 record.put("a", 27i64);
965 record.put("b", "foo");
966 let record_copy = record.clone();
967 let records = vec![record, record_copy];
968
969 let n1 = writer.extend(records)?;
970 let n2 = writer.flush()?;
971 let result = writer.into_inner()?;
972
973 assert_eq!(n1 + n2, result.len());
974
975 let mut data = Vec::new();
976 zig_i64(27, &mut data)?;
977 zig_i64(3, &mut data)?;
978 data.extend(b"foo");
979 data.extend(data.clone());
980
981 assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
983 let last_data_byte = result.len() - 16;
985 assert_eq!(
986 &result[last_data_byte - data.len()..last_data_byte],
987 data.as_slice()
988 );
989
990 Ok(())
991 }
992
993 #[derive(Debug, Clone, Deserialize, Serialize)]
994 struct TestSerdeSerialize {
995 a: i64,
996 b: String,
997 }
998
999 #[test]
1000 fn test_writer_append_ser() -> TestResult {
1001 let schema = Schema::parse_str(SCHEMA)?;
1002 let mut writer = Writer::new(&schema, Vec::new());
1003
1004 let record = TestSerdeSerialize {
1005 a: 27,
1006 b: "foo".to_owned(),
1007 };
1008
1009 let n1 = writer.append_ser(record)?;
1010 let n2 = writer.flush()?;
1011 let result = writer.into_inner()?;
1012
1013 assert_eq!(n1 + n2, result.len());
1014
1015 let mut data = Vec::new();
1016 zig_i64(27, &mut data)?;
1017 zig_i64(3, &mut data)?;
1018 data.extend(b"foo");
1019
1020 assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1022 let last_data_byte = result.len() - 16;
1024 assert_eq!(
1025 &result[last_data_byte - data.len()..last_data_byte],
1026 data.as_slice()
1027 );
1028
1029 Ok(())
1030 }
1031
1032 #[test]
1033 fn test_writer_extend_ser() -> TestResult {
1034 let schema = Schema::parse_str(SCHEMA)?;
1035 let mut writer = Writer::new(&schema, Vec::new());
1036
1037 let record = TestSerdeSerialize {
1038 a: 27,
1039 b: "foo".to_owned(),
1040 };
1041 let record_copy = record.clone();
1042 let records = vec![record, record_copy];
1043
1044 let n1 = writer.extend_ser(records)?;
1045 let n2 = writer.flush()?;
1046 let result = writer.into_inner()?;
1047
1048 assert_eq!(n1 + n2, result.len());
1049
1050 let mut data = Vec::new();
1051 zig_i64(27, &mut data)?;
1052 zig_i64(3, &mut data)?;
1053 data.extend(b"foo");
1054 data.extend(data.clone());
1055
1056 assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1058 let last_data_byte = result.len() - 16;
1060 assert_eq!(
1061 &result[last_data_byte - data.len()..last_data_byte],
1062 data.as_slice()
1063 );
1064
1065 Ok(())
1066 }
1067
1068 fn make_writer_with_codec(schema: &Schema) -> Writer<'_, Vec<u8>> {
1069 Writer::with_codec(
1070 schema,
1071 Vec::new(),
1072 Codec::Deflate(DeflateSettings::default()),
1073 )
1074 }
1075
1076 fn make_writer_with_builder(schema: &Schema) -> Writer<'_, Vec<u8>> {
1077 Writer::builder()
1078 .writer(Vec::new())
1079 .schema(schema)
1080 .codec(Codec::Deflate(DeflateSettings::default()))
1081 .block_size(100)
1082 .build()
1083 }
1084
1085 fn check_writer(mut writer: Writer<'_, Vec<u8>>, schema: &Schema) -> TestResult {
1086 let mut record = Record::new(schema).unwrap();
1087 record.put("a", 27i64);
1088 record.put("b", "foo");
1089
1090 let n1 = writer.append(record.clone())?;
1091 let n2 = writer.append(record.clone())?;
1092 let n3 = writer.flush()?;
1093 let result = writer.into_inner()?;
1094
1095 assert_eq!(n1 + n2 + n3, result.len());
1096
1097 let mut data = Vec::new();
1098 zig_i64(27, &mut data)?;
1099 zig_i64(3, &mut data)?;
1100 data.extend(b"foo");
1101 data.extend(data.clone());
1102 Codec::Deflate(DeflateSettings::default()).compress(&mut data)?;
1103
1104 assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1106 let last_data_byte = result.len() - 16;
1108 assert_eq!(
1109 &result[last_data_byte - data.len()..last_data_byte],
1110 data.as_slice()
1111 );
1112
1113 Ok(())
1114 }
1115
1116 #[test]
1117 fn test_writer_with_codec() -> TestResult {
1118 let schema = Schema::parse_str(SCHEMA)?;
1119 let writer = make_writer_with_codec(&schema);
1120 check_writer(writer, &schema)
1121 }
1122
1123 #[test]
1124 fn test_writer_with_builder() -> TestResult {
1125 let schema = Schema::parse_str(SCHEMA)?;
1126 let writer = make_writer_with_builder(&schema);
1127 check_writer(writer, &schema)
1128 }
1129
1130 #[test]
1131 fn test_logical_writer() -> TestResult {
1132 const LOGICAL_TYPE_SCHEMA: &str = r#"
1133 {
1134 "type": "record",
1135 "name": "logical_type_test",
1136 "fields": [
1137 {
1138 "name": "a",
1139 "type": [
1140 "null",
1141 {
1142 "type": "long",
1143 "logicalType": "timestamp-micros"
1144 }
1145 ]
1146 }
1147 ]
1148 }
1149 "#;
1150 let codec = Codec::Deflate(DeflateSettings::default());
1151 let schema = Schema::parse_str(LOGICAL_TYPE_SCHEMA)?;
1152 let mut writer = Writer::builder()
1153 .schema(&schema)
1154 .codec(codec)
1155 .writer(Vec::new())
1156 .build();
1157
1158 let mut record1 = Record::new(&schema).unwrap();
1159 record1.put(
1160 "a",
1161 Value::Union(1, Box::new(Value::TimestampMicros(1234_i64))),
1162 );
1163
1164 let mut record2 = Record::new(&schema).unwrap();
1165 record2.put("a", Value::Union(0, Box::new(Value::Null)));
1166
1167 let n1 = writer.append(record1)?;
1168 let n2 = writer.append(record2)?;
1169 let n3 = writer.flush()?;
1170 let result = writer.into_inner()?;
1171
1172 assert_eq!(n1 + n2 + n3, result.len());
1173
1174 let mut data = Vec::new();
1175 zig_i64(1, &mut data)?;
1177 zig_i64(1234, &mut data)?;
1178
1179 zig_i64(0, &mut data)?;
1181 codec.compress(&mut data)?;
1182
1183 assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1185 let last_data_byte = result.len() - 16;
1187 assert_eq!(
1188 &result[last_data_byte - data.len()..last_data_byte],
1189 data.as_slice()
1190 );
1191
1192 Ok(())
1193 }
1194
1195 #[test]
1196 fn test_avro_3405_writer_add_metadata_success() -> TestResult {
1197 let schema = Schema::parse_str(SCHEMA)?;
1198 let mut writer = Writer::new(&schema, Vec::new());
1199
1200 writer.add_user_metadata("stringKey".to_string(), String::from("stringValue"))?;
1201 writer.add_user_metadata("strKey".to_string(), "strValue")?;
1202 writer.add_user_metadata("bytesKey".to_string(), b"bytesValue")?;
1203 writer.add_user_metadata("vecKey".to_string(), vec![1, 2, 3])?;
1204
1205 let mut record = Record::new(&schema).unwrap();
1206 record.put("a", 27i64);
1207 record.put("b", "foo");
1208
1209 writer.append(record.clone())?;
1210 writer.append(record.clone())?;
1211 writer.flush()?;
1212 let result = writer.into_inner()?;
1213
1214 assert_eq!(result.len(), 260);
1215
1216 Ok(())
1217 }
1218
1219 #[test]
1220 fn test_avro_3881_metadata_empty_body() -> TestResult {
1221 let schema = Schema::parse_str(SCHEMA)?;
1222 let mut writer = Writer::new(&schema, Vec::new());
1223 writer.add_user_metadata("a".to_string(), "b")?;
1224 let result = writer.into_inner()?;
1225
1226 let reader = Reader::with_schema(&schema, &result[..])?;
1227 let mut expected = HashMap::new();
1228 expected.insert("a".to_string(), vec![b'b']);
1229 assert_eq!(reader.user_metadata(), &expected);
1230 assert_eq!(reader.into_iter().count(), 0);
1231
1232 Ok(())
1233 }
1234
1235 #[test]
1236 fn test_avro_3405_writer_add_metadata_failure() -> TestResult {
1237 let schema = Schema::parse_str(SCHEMA)?;
1238 let mut writer = Writer::new(&schema, Vec::new());
1239
1240 let mut record = Record::new(&schema).unwrap();
1241 record.put("a", 27i64);
1242 record.put("b", "foo");
1243 writer.append(record.clone())?;
1244
1245 match writer.add_user_metadata("stringKey".to_string(), String::from("value2")) {
1246 Err(e @ Error::FileHeaderAlreadyWritten) => {
1247 assert_eq!(e.to_string(), "The file metadata is already flushed.")
1248 }
1249 Err(e) => panic!("Unexpected error occurred while writing user metadata: {e:?}"),
1250 Ok(_) => panic!("Expected an error that metadata cannot be added after adding data"),
1251 }
1252
1253 Ok(())
1254 }
1255
1256 #[test]
1257 fn test_avro_3405_writer_add_metadata_reserved_prefix_failure() -> TestResult {
1258 let schema = Schema::parse_str(SCHEMA)?;
1259 let mut writer = Writer::new(&schema, Vec::new());
1260
1261 let key = "avro.stringKey".to_string();
1262 match writer.add_user_metadata(key.clone(), "value") {
1263 Err(ref e @ Error::InvalidMetadataKey(_)) => {
1264 assert_eq!(e.to_string(), format!("Metadata keys starting with 'avro.' are reserved for internal usage: {key}."))
1265 }
1266 Err(e) => panic!(
1267 "Unexpected error occurred while writing user metadata with reserved prefix ('avro.'): {e:?}"
1268 ),
1269 Ok(_) => panic!("Expected an error that the metadata key cannot be prefixed with 'avro.'"),
1270 }
1271
1272 Ok(())
1273 }
1274
1275 #[test]
1276 fn test_avro_3405_writer_add_metadata_with_builder_api_success() -> TestResult {
1277 let schema = Schema::parse_str(SCHEMA)?;
1278
1279 let mut user_meta_data: HashMap<String, Value> = HashMap::new();
1280 user_meta_data.insert(
1281 "stringKey".to_string(),
1282 Value::String("stringValue".to_string()),
1283 );
1284 user_meta_data.insert("bytesKey".to_string(), Value::Bytes(b"bytesValue".to_vec()));
1285 user_meta_data.insert("vecKey".to_string(), Value::Bytes(vec![1, 2, 3]));
1286
1287 let writer: Writer<'_, Vec<u8>> = Writer::builder()
1288 .writer(Vec::new())
1289 .schema(&schema)
1290 .user_metadata(user_meta_data.clone())
1291 .build();
1292
1293 assert_eq!(writer.user_metadata, user_meta_data);
1294
1295 Ok(())
1296 }
1297
1298 #[derive(Serialize, Clone)]
1299 struct TestSingleObjectWriter {
1300 a: i64,
1301 b: f64,
1302 c: Vec<String>,
1303 }
1304
1305 impl AvroSchema for TestSingleObjectWriter {
1306 fn get_schema() -> Schema {
1307 let schema = r#"
1308 {
1309 "type":"record",
1310 "name":"TestSingleObjectWrtierSerialize",
1311 "fields":[
1312 {
1313 "name":"a",
1314 "type":"long"
1315 },
1316 {
1317 "name":"b",
1318 "type":"double"
1319 },
1320 {
1321 "name":"c",
1322 "type":{
1323 "type":"array",
1324 "items":"string"
1325 }
1326 }
1327 ]
1328 }
1329 "#;
1330 Schema::parse_str(schema).unwrap()
1331 }
1332 }
1333
1334 impl From<TestSingleObjectWriter> for Value {
1335 fn from(obj: TestSingleObjectWriter) -> Value {
1336 Value::Record(vec![
1337 ("a".into(), obj.a.into()),
1338 ("b".into(), obj.b.into()),
1339 (
1340 "c".into(),
1341 Value::Array(obj.c.into_iter().map(|s| s.into()).collect()),
1342 ),
1343 ])
1344 }
1345 }
1346
1347 #[test]
1348 fn test_single_object_writer() -> TestResult {
1349 let mut buf: Vec<u8> = Vec::new();
1350 let obj = TestSingleObjectWriter {
1351 a: 300,
1352 b: 34.555,
1353 c: vec!["cat".into(), "dog".into()],
1354 };
1355 let mut writer = GenericSingleObjectWriter::new_with_capacity(
1356 &TestSingleObjectWriter::get_schema(),
1357 1024,
1358 )
1359 .expect("Should resolve schema");
1360 let value = obj.into();
1361 let written_bytes = writer
1362 .write_value_ref(&value, &mut buf)
1363 .expect("Error serializing properly");
1364
1365 assert!(buf.len() > 10, "no bytes written");
1366 assert_eq!(buf.len(), written_bytes);
1367 assert_eq!(buf[0], 0xC3);
1368 assert_eq!(buf[1], 0x01);
1369 assert_eq!(
1370 &buf[2..10],
1371 &TestSingleObjectWriter::get_schema()
1372 .fingerprint::<Rabin>()
1373 .bytes[..]
1374 );
1375 let mut msg_binary = Vec::new();
1376 encode(
1377 &value,
1378 &TestSingleObjectWriter::get_schema(),
1379 &mut msg_binary,
1380 )
1381 .expect("encode should have failed by here as a dependency of any writing");
1382 assert_eq!(&buf[10..], &msg_binary[..]);
1383
1384 Ok(())
1385 }
1386
1387 #[test]
1388 fn test_writer_parity() -> TestResult {
1389 let obj1 = TestSingleObjectWriter {
1390 a: 300,
1391 b: 34.555,
1392 c: vec!["cat".into(), "dog".into()],
1393 };
1394
1395 let mut buf1: Vec<u8> = Vec::new();
1396 let mut buf2: Vec<u8> = Vec::new();
1397 let mut buf3: Vec<u8> = Vec::new();
1398
1399 let mut generic_writer = GenericSingleObjectWriter::new_with_capacity(
1400 &TestSingleObjectWriter::get_schema(),
1401 1024,
1402 )
1403 .expect("Should resolve schema");
1404 let mut specific_writer =
1405 SpecificSingleObjectWriter::<TestSingleObjectWriter>::with_capacity(1024)
1406 .expect("Resolved should pass");
1407 specific_writer
1408 .write(obj1.clone(), &mut buf1)
1409 .expect("Serialization expected");
1410 specific_writer
1411 .write_value(obj1.clone(), &mut buf2)
1412 .expect("Serialization expected");
1413 generic_writer
1414 .write_value(obj1.into(), &mut buf3)
1415 .expect("Serialization expected");
1416 assert_eq!(buf1, buf2);
1417 assert_eq!(buf1, buf3);
1418
1419 Ok(())
1420 }
1421
1422 #[test]
1423 fn avro_3894_take_aliases_into_account_when_serializing() -> TestResult {
1424 const SCHEMA: &str = r#"
1425 {
1426 "type": "record",
1427 "name": "Conference",
1428 "fields": [
1429 {"type": "string", "name": "name"},
1430 {"type": ["null", "long"], "name": "date", "aliases" : [ "time2", "time" ]}
1431 ]
1432 }"#;
1433
1434 #[derive(Debug, PartialEq, Eq, Clone, Serialize)]
1435 pub struct Conference {
1436 pub name: String,
1437 pub time: Option<i64>,
1438 }
1439
1440 let conf = Conference {
1441 name: "RustConf".to_string(),
1442 time: Some(1234567890),
1443 };
1444
1445 let schema = Schema::parse_str(SCHEMA)?;
1446 let mut writer = Writer::new(&schema, Vec::new());
1447
1448 let bytes = writer.append_ser(conf)?;
1449
1450 assert_eq!(198, bytes);
1451
1452 Ok(())
1453 }
1454
1455 #[test]
1456 fn avro_4014_validation_returns_a_detailed_error() -> TestResult {
1457 const SCHEMA: &str = r#"
1458 {
1459 "type": "record",
1460 "name": "Conference",
1461 "fields": [
1462 {"type": "string", "name": "name"},
1463 {"type": ["null", "long"], "name": "date", "aliases" : [ "time2", "time" ]}
1464 ]
1465 }"#;
1466
1467 #[derive(Debug, PartialEq, Clone, Serialize)]
1468 pub struct Conference {
1469 pub name: String,
1470 pub time: Option<f64>, }
1472
1473 let conf = Conference {
1474 name: "RustConf".to_string(),
1475 time: Some(12345678.90),
1476 };
1477
1478 let schema = Schema::parse_str(SCHEMA)?;
1479 let mut writer = Writer::new(&schema, Vec::new());
1480
1481 match writer.append_ser(conf) {
1482 Ok(bytes) => panic!("Expected an error, but got {} bytes written", bytes),
1483 Err(e) => {
1484 assert_eq!(
1485 e.to_string(),
1486 r#"Failed to serialize field 'time' for record Record(RecordSchema { name: Name { name: "Conference", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "name", doc: None, aliases: None, default: None, schema: String, order: Ascending, position: 0, custom_attributes: {} }, RecordField { name: "date", doc: None, aliases: Some(["time2", "time"]), default: None, schema: Union(UnionSchema { schemas: [Null, Long], variant_index: {Null: 0, Long: 1} }), order: Ascending, position: 1, custom_attributes: {} }], lookup: {"date": 1, "name": 0, "time": 1, "time2": 1}, attributes: {} }): Failed to serialize value of type f64 using schema Long: 12345678.9. Cause: Expected: Long. Got: Double"#
1487 );
1488 }
1489 }
1490 Ok(())
1491 }
1492
1493 #[test]
1494 fn avro_4063_flush_applies_to_inner_writer() -> TestResult {
1495 const SCHEMA: &str = r#"
1496 {
1497 "type": "record",
1498 "name": "ExampleSchema",
1499 "fields": [
1500 {"name": "exampleField", "type": "string"}
1501 ]
1502 }
1503 "#;
1504
1505 #[derive(Clone, Default)]
1506 struct TestBuffer(Rc<RefCell<Vec<u8>>>);
1507
1508 impl TestBuffer {
1509 fn len(&self) -> usize {
1510 self.0.borrow().len()
1511 }
1512 }
1513
1514 impl Write for TestBuffer {
1515 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
1516 self.0.borrow_mut().write(buf)
1517 }
1518
1519 fn flush(&mut self) -> std::io::Result<()> {
1520 Ok(())
1521 }
1522 }
1523
1524 let shared_buffer = TestBuffer::default();
1525
1526 let buffered_writer = std::io::BufWriter::new(shared_buffer.clone());
1527
1528 let schema = Schema::parse_str(SCHEMA)?;
1529
1530 let mut writer = Writer::new(&schema, buffered_writer);
1531
1532 let mut record = Record::new(writer.schema()).unwrap();
1533 record.put("exampleField", "value");
1534
1535 writer.append(record)?;
1536 writer.flush()?;
1537
1538 assert_eq!(
1539 shared_buffer.len(),
1540 167,
1541 "the test buffer was not fully written to after Writer::flush was called"
1542 );
1543
1544 Ok(())
1545 }
1546}