1use crate::{
20 AvroResult, Codec, Error,
21 encode::{encode, encode_internal, encode_to_vec},
22 error::Details,
23 schema::{ResolvedSchema, Schema},
24 serde::ser_schema::SchemaAwareWriteSerializer,
25 types::Value,
26};
27use serde::Serialize;
28use std::{collections::HashMap, io::Write, mem::ManuallyDrop};
29
30pub mod datum;
31pub mod single_object;
32
33const DEFAULT_BLOCK_SIZE: usize = 16000;
34const AVRO_OBJECT_HEADER: &[u8] = b"Obj\x01";
35
36pub struct Writer<'a, W: Write> {
42 schema: &'a Schema,
43 writer: W,
44 resolved_schema: ResolvedSchema<'a>,
45 codec: Codec,
46 block_size: usize,
47 buffer: Vec<u8>,
48 num_values: usize,
49 marker: [u8; 16],
50 has_header: bool,
51 user_metadata: HashMap<String, Value>,
52}
53
54#[bon::bon]
55impl<'a, W: Write> Writer<'a, W> {
56 #[builder]
57 pub fn builder(
58 schema: &'a Schema,
59 schemata: Option<Vec<&'a Schema>>,
60 writer: W,
61 #[builder(default = Codec::Null)] codec: Codec,
62 #[builder(default = DEFAULT_BLOCK_SIZE)] block_size: usize,
63 #[builder(default = generate_sync_marker())] marker: [u8; 16],
64 #[builder(default = false)]
68 has_header: bool,
69 #[builder(default)] user_metadata: HashMap<String, Value>,
70 ) -> AvroResult<Self> {
71 let resolved_schema = if let Some(schemata) = schemata {
72 ResolvedSchema::try_from(schemata)?
73 } else {
74 ResolvedSchema::try_from(schema)?
75 };
76 Ok(Self {
77 schema,
78 writer,
79 resolved_schema,
80 codec,
81 block_size,
82 buffer: Vec::with_capacity(block_size),
83 num_values: 0,
84 marker,
85 has_header,
86 user_metadata,
87 })
88 }
89}
90
91impl<'a, W: Write> Writer<'a, W> {
92 pub fn new(schema: &'a Schema, writer: W) -> AvroResult<Self> {
96 Writer::with_codec(schema, writer, Codec::Null)
97 }
98
99 pub fn with_codec(schema: &'a Schema, writer: W, codec: Codec) -> AvroResult<Self> {
102 Self::builder()
103 .schema(schema)
104 .writer(writer)
105 .codec(codec)
106 .build()
107 }
108
109 pub fn with_schemata(
114 schema: &'a Schema,
115 schemata: Vec<&'a Schema>,
116 writer: W,
117 codec: Codec,
118 ) -> AvroResult<Self> {
119 Self::builder()
120 .schema(schema)
121 .schemata(schemata)
122 .writer(writer)
123 .codec(codec)
124 .build()
125 }
126
127 pub fn append_to(schema: &'a Schema, writer: W, marker: [u8; 16]) -> AvroResult<Self> {
131 Writer::append_to_with_codec(schema, writer, Codec::Null, marker)
132 }
133
134 pub fn append_to_with_codec(
137 schema: &'a Schema,
138 writer: W,
139 codec: Codec,
140 marker: [u8; 16],
141 ) -> AvroResult<Self> {
142 Self::builder()
143 .schema(schema)
144 .writer(writer)
145 .codec(codec)
146 .marker(marker)
147 .has_header(true)
148 .build()
149 }
150
151 pub fn append_to_with_codec_schemata(
154 schema: &'a Schema,
155 schemata: Vec<&'a Schema>,
156 writer: W,
157 codec: Codec,
158 marker: [u8; 16],
159 ) -> AvroResult<Self> {
160 Self::builder()
161 .schema(schema)
162 .schemata(schemata)
163 .writer(writer)
164 .codec(codec)
165 .marker(marker)
166 .has_header(true)
167 .build()
168 }
169
170 pub fn schema(&self) -> &'a Schema {
172 self.schema
173 }
174
175 #[deprecated(since = "0.22.0", note = "Use `Writer::append_value` instead")]
177 pub fn append<T: Into<Value>>(&mut self, value: T) -> AvroResult<usize> {
178 self.append_value(value)
179 }
180
181 pub fn append_value<T: Into<Value>>(&mut self, value: T) -> AvroResult<usize> {
189 let avro = value.into();
190 self.append_value_ref(&avro)
191 }
192
193 pub fn append_value_ref(&mut self, value: &Value) -> AvroResult<usize> {
201 if let Some(reason) = value.validate_internal(
202 self.schema,
203 self.resolved_schema.get_names(),
204 self.schema.namespace(),
205 ) {
206 return Err(Details::ValidationWithReason {
207 value: value.clone(),
208 schema: self.schema.clone(),
209 reason,
210 }
211 .into());
212 }
213 self.unvalidated_append_value_ref(value)
214 }
215
216 pub fn unvalidated_append_value<T: Into<Value>>(&mut self, value: T) -> AvroResult<usize> {
228 let value = value.into();
229 self.unvalidated_append_value_ref(&value)
230 }
231
232 pub fn unvalidated_append_value_ref(&mut self, value: &Value) -> AvroResult<usize> {
244 let n = self.maybe_write_header()?;
245 encode_internal(
246 value,
247 self.schema,
248 self.resolved_schema.get_names(),
249 self.schema.namespace(),
250 &mut self.buffer,
251 )?;
252
253 self.num_values += 1;
254
255 if self.buffer.len() >= self.block_size {
256 return self.flush().map(|b| b + n);
257 }
258
259 Ok(n)
260 }
261
262 pub fn append_ser<S: Serialize>(&mut self, value: S) -> AvroResult<usize> {
272 let n = self.maybe_write_header()?;
273
274 let mut serializer = SchemaAwareWriteSerializer::new(
275 &mut self.buffer,
276 self.schema,
277 self.resolved_schema.get_names(),
278 None,
279 );
280 value.serialize(&mut serializer)?;
281 self.num_values += 1;
282
283 if self.buffer.len() >= self.block_size {
284 return self.flush().map(|b| b + n);
285 }
286
287 Ok(n)
288 }
289
290 pub fn extend<I, T: Into<Value>>(&mut self, values: I) -> AvroResult<usize>
297 where
298 I: IntoIterator<Item = T>,
299 {
300 let mut num_bytes = 0;
315 for value in values {
316 num_bytes += self.append_value(value)?;
317 }
318 num_bytes += self.flush()?;
319
320 Ok(num_bytes)
321 }
322
323 pub fn extend_ser<I, T: Serialize>(&mut self, values: I) -> AvroResult<usize>
332 where
333 I: IntoIterator<Item = T>,
334 {
335 let mut num_bytes = 0;
350 for value in values {
351 num_bytes += self.append_ser(value)?;
352 }
353 num_bytes += self.flush()?;
354
355 Ok(num_bytes)
356 }
357
358 pub fn extend_from_slice(&mut self, values: &[Value]) -> AvroResult<usize> {
366 let mut num_bytes = 0;
367 for value in values {
368 num_bytes += self.append_value_ref(value)?;
369 }
370 num_bytes += self.flush()?;
371
372 Ok(num_bytes)
373 }
374
375 pub fn flush(&mut self) -> AvroResult<usize> {
383 let mut num_bytes = self.maybe_write_header()?;
384 if self.num_values == 0 {
385 return Ok(num_bytes);
386 }
387
388 self.codec.compress(&mut self.buffer)?;
389
390 let num_values = self.num_values;
391 let stream_len = self.buffer.len();
392
393 num_bytes += self.append_raw(&num_values.try_into()?, &Schema::Long)?
394 + self.append_raw(&stream_len.try_into()?, &Schema::Long)?
395 + self
396 .writer
397 .write(self.buffer.as_ref())
398 .map_err(Details::WriteBytes)?
399 + self.append_marker()?;
400
401 self.buffer.clear();
402 self.num_values = 0;
403
404 self.writer.flush().map_err(Details::FlushWriter)?;
405
406 Ok(num_bytes)
407 }
408
409 pub fn into_inner(mut self) -> AvroResult<W> {
414 self.maybe_write_header()?;
415 self.flush()?;
416
417 let mut this = ManuallyDrop::new(self);
418
419 let _buffer = std::mem::take(&mut this.buffer);
421 let _user_metadata = std::mem::take(&mut this.user_metadata);
422 unsafe { std::ptr::drop_in_place(&mut this.resolved_schema) };
424
425 let writer = unsafe { std::ptr::read(&this.writer) };
427
428 Ok(writer)
429 }
430
431 pub fn get_ref(&self) -> &W {
436 &self.writer
437 }
438
439 pub fn get_mut(&mut self) -> &mut W {
446 &mut self.writer
447 }
448
449 fn append_marker(&mut self) -> AvroResult<usize> {
451 self.writer
454 .write(&self.marker)
455 .map_err(|e| Details::WriteMarker(e).into())
456 }
457
458 fn append_raw(&mut self, value: &Value, schema: &Schema) -> AvroResult<usize> {
460 self.append_bytes(encode_to_vec(value, schema)?.as_ref())
461 }
462
463 fn append_bytes(&mut self, bytes: &[u8]) -> AvroResult<usize> {
465 self.writer
466 .write(bytes)
467 .map_err(|e| Details::WriteBytes(e).into())
468 }
469
470 pub fn add_user_metadata<T: AsRef<[u8]>>(&mut self, key: String, value: T) -> AvroResult<()> {
473 if !self.has_header {
474 if key.starts_with("avro.") {
475 return Err(Details::InvalidMetadataKey(key).into());
476 }
477 self.user_metadata
478 .insert(key, Value::Bytes(value.as_ref().to_vec()));
479 Ok(())
480 } else {
481 Err(Details::FileHeaderAlreadyWritten.into())
482 }
483 }
484
485 fn header(&self) -> Result<Vec<u8>, Error> {
487 let schema_bytes = serde_json::to_string(self.schema)
488 .map_err(Details::ConvertJsonToString)?
489 .into_bytes();
490
491 let mut metadata = HashMap::with_capacity(2);
492 metadata.insert("avro.schema", Value::Bytes(schema_bytes));
493 if self.codec != Codec::Null {
494 metadata.insert("avro.codec", self.codec.into());
495 }
496 match self.codec {
497 #[cfg(feature = "bzip")]
498 Codec::Bzip2(settings) => {
499 metadata.insert(
500 "avro.codec.compression_level",
501 Value::Bytes(vec![settings.compression_level]),
502 );
503 }
504 #[cfg(feature = "xz")]
505 Codec::Xz(settings) => {
506 metadata.insert(
507 "avro.codec.compression_level",
508 Value::Bytes(vec![settings.compression_level]),
509 );
510 }
511 #[cfg(feature = "zstandard")]
512 Codec::Zstandard(settings) => {
513 metadata.insert(
514 "avro.codec.compression_level",
515 Value::Bytes(vec![settings.compression_level]),
516 );
517 }
518 _ => {}
519 }
520
521 for (k, v) in &self.user_metadata {
522 metadata.insert(k.as_str(), v.clone());
523 }
524
525 let mut header = Vec::new();
526 header.extend_from_slice(AVRO_OBJECT_HEADER);
527 encode(
528 &metadata.into(),
529 &Schema::map(Schema::Bytes).build(),
530 &mut header,
531 )?;
532 header.extend_from_slice(&self.marker);
533
534 Ok(header)
535 }
536
537 fn maybe_write_header(&mut self) -> AvroResult<usize> {
538 if !self.has_header {
539 let header = self.header()?;
540 let n = self.append_bytes(header.as_ref())?;
541 self.has_header = true;
542 Ok(n)
543 } else {
544 Ok(0)
545 }
546 }
547}
548
549pub trait Clearable {
551 fn clear(&mut self);
553}
554
555impl Clearable for Vec<u8> {
556 fn clear(&mut self) {
557 Vec::clear(self);
558 }
559}
560
561impl<'a, W: Clearable + Write> Writer<'a, W> {
562 pub fn reset(&mut self) {
600 self.buffer.clear();
601 self.writer.clear();
602 self.has_header = false;
603 self.num_values = 0;
604 self.user_metadata.clear();
605 self.marker = generate_sync_marker();
606 }
607}
608
609impl<W: Write> Drop for Writer<'_, W> {
610 fn drop(&mut self) {
612 let _ = self.maybe_write_header();
613 let _ = self.flush();
614 }
615}
616
617#[cfg(not(target_arch = "wasm32"))]
618fn generate_sync_marker() -> [u8; 16] {
619 rand::random()
620}
621
622#[cfg(target_arch = "wasm32")]
623fn generate_sync_marker() -> [u8; 16] {
624 let mut marker = [0_u8; 16];
625 std::iter::repeat_with(quad_rand::rand)
626 .take(4)
627 .flat_map(|i| i.to_be_bytes())
628 .enumerate()
629 .for_each(|(i, n)| marker[i] = n);
630 marker
631}
632
633#[cfg(test)]
634mod tests {
635 use std::{cell::RefCell, rc::Rc};
636
637 use super::*;
638 use crate::{Reader, types::Record, util::zig_i64};
639 use pretty_assertions::assert_eq;
640 use serde::{Deserialize, Serialize};
641
642 use crate::{codec::DeflateSettings, error::Details};
643 use apache_avro_test_helper::TestResult;
644
645 const AVRO_OBJECT_HEADER_LEN: usize = AVRO_OBJECT_HEADER.len();
646
647 const SCHEMA: &str = r#"
648 {
649 "type": "record",
650 "name": "test",
651 "fields": [
652 {
653 "name": "a",
654 "type": "long",
655 "default": 42
656 },
657 {
658 "name": "b",
659 "type": "string"
660 }
661 ]
662 }
663 "#;
664
665 #[test]
666 fn avro_rs_220_flush_write_header() -> TestResult {
667 let schema = Schema::parse_str(SCHEMA)?;
668
669 let mut writer = Writer::new(&schema, Vec::new())?;
671 writer.flush()?;
672 let result = writer.into_inner()?;
673 assert_eq!(result.len(), 147);
674
675 let mut writer = Writer::builder()
677 .has_header(true)
678 .schema(&schema)
679 .writer(Vec::new())
680 .build()?;
681 writer.flush()?;
682 let result = writer.into_inner()?;
683 assert_eq!(result.len(), 0);
684
685 Ok(())
686 }
687
688 #[test]
689 fn test_writer_append() -> TestResult {
690 let schema = Schema::parse_str(SCHEMA)?;
691 let mut writer = Writer::new(&schema, Vec::new())?;
692
693 let mut record = Record::new(&schema).unwrap();
694 record.put("a", 27i64);
695 record.put("b", "foo");
696
697 let n1 = writer.append_value(record.clone())?;
698 let n2 = writer.append_value(record.clone())?;
699 let n3 = writer.flush()?;
700 let result = writer.into_inner()?;
701
702 assert_eq!(n1 + n2 + n3, result.len());
703
704 let mut data = Vec::new();
705 zig_i64(27, &mut data)?;
706 zig_i64(3, &mut data)?;
707 data.extend(b"foo");
708 data.extend(data.clone());
709
710 assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
712 let last_data_byte = result.len() - 16;
714 assert_eq!(
715 &result[last_data_byte - data.len()..last_data_byte],
716 data.as_slice()
717 );
718
719 Ok(())
720 }
721
722 #[test]
723 fn test_writer_extend() -> TestResult {
724 let schema = Schema::parse_str(SCHEMA)?;
725 let mut writer = Writer::new(&schema, Vec::new())?;
726
727 let mut record = Record::new(&schema).unwrap();
728 record.put("a", 27i64);
729 record.put("b", "foo");
730 let record_copy = record.clone();
731 let records = vec![record, record_copy];
732
733 let n1 = writer.extend(records)?;
734 let n2 = writer.flush()?;
735 let result = writer.into_inner()?;
736
737 assert_eq!(n1 + n2, result.len());
738
739 let mut data = Vec::new();
740 zig_i64(27, &mut data)?;
741 zig_i64(3, &mut data)?;
742 data.extend(b"foo");
743 data.extend(data.clone());
744
745 assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
747 let last_data_byte = result.len() - 16;
749 assert_eq!(
750 &result[last_data_byte - data.len()..last_data_byte],
751 data.as_slice()
752 );
753
754 Ok(())
755 }
756
757 #[derive(Debug, Clone, Deserialize, Serialize)]
758 struct TestSerdeSerialize {
759 a: i64,
760 b: String,
761 }
762
763 #[test]
764 fn test_writer_append_ser() -> TestResult {
765 let schema = Schema::parse_str(SCHEMA)?;
766 let mut writer = Writer::new(&schema, Vec::new())?;
767
768 let record = TestSerdeSerialize {
769 a: 27,
770 b: "foo".to_owned(),
771 };
772
773 let n1 = writer.append_ser(record)?;
774 let n2 = writer.flush()?;
775 let result = writer.into_inner()?;
776
777 assert_eq!(n1 + n2, result.len());
778
779 let mut data = Vec::new();
780 zig_i64(27, &mut data)?;
781 zig_i64(3, &mut data)?;
782 data.extend(b"foo");
783
784 assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
786 let last_data_byte = result.len() - 16;
788 assert_eq!(
789 &result[last_data_byte - data.len()..last_data_byte],
790 data.as_slice()
791 );
792
793 Ok(())
794 }
795
796 #[test]
797 fn test_writer_extend_ser() -> TestResult {
798 let schema = Schema::parse_str(SCHEMA)?;
799 let mut writer = Writer::new(&schema, Vec::new())?;
800
801 let record = TestSerdeSerialize {
802 a: 27,
803 b: "foo".to_owned(),
804 };
805 let record_copy = record.clone();
806 let records = vec![record, record_copy];
807
808 let n1 = writer.extend_ser(records)?;
809 let n2 = writer.flush()?;
810 let result = writer.into_inner()?;
811
812 assert_eq!(n1 + n2, result.len());
813
814 let mut data = Vec::new();
815 zig_i64(27, &mut data)?;
816 zig_i64(3, &mut data)?;
817 data.extend(b"foo");
818 data.extend(data.clone());
819
820 assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
822 let last_data_byte = result.len() - 16;
824 assert_eq!(
825 &result[last_data_byte - data.len()..last_data_byte],
826 data.as_slice()
827 );
828
829 Ok(())
830 }
831
832 fn make_writer_with_codec(schema: &Schema) -> AvroResult<Writer<'_, Vec<u8>>> {
833 Writer::with_codec(
834 schema,
835 Vec::new(),
836 Codec::Deflate(DeflateSettings::default()),
837 )
838 }
839
840 fn make_writer_with_builder(schema: &Schema) -> AvroResult<Writer<'_, Vec<u8>>> {
841 Writer::builder()
842 .writer(Vec::new())
843 .schema(schema)
844 .codec(Codec::Deflate(DeflateSettings::default()))
845 .block_size(100)
846 .build()
847 }
848
849 fn check_writer(mut writer: Writer<'_, Vec<u8>>, schema: &Schema) -> TestResult {
850 let mut record = Record::new(schema).unwrap();
851 record.put("a", 27i64);
852 record.put("b", "foo");
853
854 let n1 = writer.append_value(record.clone())?;
855 let n2 = writer.append_value(record.clone())?;
856 let n3 = writer.flush()?;
857 let result = writer.into_inner()?;
858
859 assert_eq!(n1 + n2 + n3, result.len());
860
861 let mut data = Vec::new();
862 zig_i64(27, &mut data)?;
863 zig_i64(3, &mut data)?;
864 data.extend(b"foo");
865 data.extend(data.clone());
866 Codec::Deflate(DeflateSettings::default()).compress(&mut data)?;
867
868 assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
870 let last_data_byte = result.len() - 16;
872 assert_eq!(
873 &result[last_data_byte - data.len()..last_data_byte],
874 data.as_slice()
875 );
876
877 Ok(())
878 }
879
880 #[test]
881 fn test_writer_with_codec() -> TestResult {
882 let schema = Schema::parse_str(SCHEMA)?;
883 let writer = make_writer_with_codec(&schema)?;
884 check_writer(writer, &schema)
885 }
886
887 #[test]
888 fn test_writer_with_builder() -> TestResult {
889 let schema = Schema::parse_str(SCHEMA)?;
890 let writer = make_writer_with_builder(&schema)?;
891 check_writer(writer, &schema)
892 }
893
894 #[test]
895 fn test_logical_writer() -> TestResult {
896 const LOGICAL_TYPE_SCHEMA: &str = r#"
897 {
898 "type": "record",
899 "name": "logical_type_test",
900 "fields": [
901 {
902 "name": "a",
903 "type": [
904 "null",
905 {
906 "type": "long",
907 "logicalType": "timestamp-micros"
908 }
909 ]
910 }
911 ]
912 }
913 "#;
914 let codec = Codec::Deflate(DeflateSettings::default());
915 let schema = Schema::parse_str(LOGICAL_TYPE_SCHEMA)?;
916 let mut writer = Writer::builder()
917 .schema(&schema)
918 .codec(codec)
919 .writer(Vec::new())
920 .build()?;
921
922 let mut record1 = Record::new(&schema).unwrap();
923 record1.put(
924 "a",
925 Value::Union(1, Box::new(Value::TimestampMicros(1234_i64))),
926 );
927
928 let mut record2 = Record::new(&schema).unwrap();
929 record2.put("a", Value::Union(0, Box::new(Value::Null)));
930
931 let n1 = writer.append_value(record1)?;
932 let n2 = writer.append_value(record2)?;
933 let n3 = writer.flush()?;
934 let result = writer.into_inner()?;
935
936 assert_eq!(n1 + n2 + n3, result.len());
937
938 let mut data = Vec::new();
939 zig_i64(1, &mut data)?;
941 zig_i64(1234, &mut data)?;
942
943 zig_i64(0, &mut data)?;
945 codec.compress(&mut data)?;
946
947 assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
949 let last_data_byte = result.len() - 16;
951 assert_eq!(
952 &result[last_data_byte - data.len()..last_data_byte],
953 data.as_slice()
954 );
955
956 Ok(())
957 }
958
959 #[test]
960 fn test_avro_3405_writer_add_metadata_success() -> TestResult {
961 let schema = Schema::parse_str(SCHEMA)?;
962 let mut writer = Writer::new(&schema, Vec::new())?;
963
964 writer.add_user_metadata("stringKey".to_string(), String::from("stringValue"))?;
965 writer.add_user_metadata("strKey".to_string(), "strValue")?;
966 writer.add_user_metadata("bytesKey".to_string(), b"bytesValue")?;
967 writer.add_user_metadata("vecKey".to_string(), vec![1, 2, 3])?;
968
969 let mut record = Record::new(&schema).unwrap();
970 record.put("a", 27i64);
971 record.put("b", "foo");
972
973 writer.append_value(record.clone())?;
974 writer.append_value(record.clone())?;
975 writer.flush()?;
976 let result = writer.into_inner()?;
977
978 assert_eq!(result.len(), 244);
979
980 Ok(())
981 }
982
983 #[test]
984 fn test_avro_3881_metadata_empty_body() -> TestResult {
985 let schema = Schema::parse_str(SCHEMA)?;
986 let mut writer = Writer::new(&schema, Vec::new())?;
987 writer.add_user_metadata("a".to_string(), "b")?;
988 let result = writer.into_inner()?;
989
990 let reader = Reader::builder(&result[..])
991 .reader_schema(&schema)
992 .build()?;
993 let mut expected = HashMap::new();
994 expected.insert("a".to_string(), vec![b'b']);
995 assert_eq!(reader.user_metadata(), &expected);
996 assert_eq!(reader.into_iter().count(), 0);
997
998 Ok(())
999 }
1000
1001 #[test]
1002 fn test_avro_3405_writer_add_metadata_failure() -> TestResult {
1003 let schema = Schema::parse_str(SCHEMA)?;
1004 let mut writer = Writer::new(&schema, Vec::new())?;
1005
1006 let mut record = Record::new(&schema).unwrap();
1007 record.put("a", 27i64);
1008 record.put("b", "foo");
1009 writer.append_value(record.clone())?;
1010
1011 match writer
1012 .add_user_metadata("stringKey".to_string(), String::from("value2"))
1013 .map_err(Error::into_details)
1014 {
1015 Err(e @ Details::FileHeaderAlreadyWritten) => {
1016 assert_eq!(e.to_string(), "The file metadata is already flushed.")
1017 }
1018 Err(e) => panic!("Unexpected error occurred while writing user metadata: {e:?}"),
1019 Ok(_) => panic!("Expected an error that metadata cannot be added after adding data"),
1020 }
1021
1022 Ok(())
1023 }
1024
1025 #[test]
1026 fn test_avro_3405_writer_add_metadata_reserved_prefix_failure() -> TestResult {
1027 let schema = Schema::parse_str(SCHEMA)?;
1028 let mut writer = Writer::new(&schema, Vec::new())?;
1029
1030 let key = "avro.stringKey".to_string();
1031 match writer
1032 .add_user_metadata(key.clone(), "value")
1033 .map_err(Error::into_details)
1034 {
1035 Err(ref e @ Details::InvalidMetadataKey(_)) => {
1036 assert_eq!(
1037 e.to_string(),
1038 format!(
1039 "Metadata keys starting with 'avro.' are reserved for internal usage: {key}."
1040 )
1041 )
1042 }
1043 Err(e) => panic!(
1044 "Unexpected error occurred while writing user metadata with reserved prefix ('avro.'): {e:?}"
1045 ),
1046 Ok(_) => {
1047 panic!("Expected an error that the metadata key cannot be prefixed with 'avro.'")
1048 }
1049 }
1050
1051 Ok(())
1052 }
1053
1054 #[test]
1055 fn test_avro_3405_writer_add_metadata_with_builder_api_success() -> TestResult {
1056 let schema = Schema::parse_str(SCHEMA)?;
1057
1058 let mut user_meta_data: HashMap<String, Value> = HashMap::new();
1059 user_meta_data.insert(
1060 "stringKey".to_string(),
1061 Value::String("stringValue".to_string()),
1062 );
1063 user_meta_data.insert("bytesKey".to_string(), Value::Bytes(b"bytesValue".to_vec()));
1064 user_meta_data.insert("vecKey".to_string(), Value::Bytes(vec![1, 2, 3]));
1065
1066 let writer: Writer<'_, Vec<u8>> = Writer::builder()
1067 .writer(Vec::new())
1068 .schema(&schema)
1069 .user_metadata(user_meta_data.clone())
1070 .build()?;
1071
1072 assert_eq!(writer.user_metadata, user_meta_data);
1073
1074 Ok(())
1075 }
1076
1077 #[test]
1078 fn avro_3894_take_aliases_into_account_when_serializing() -> TestResult {
1079 const SCHEMA: &str = r#"
1080 {
1081 "type": "record",
1082 "name": "Conference",
1083 "fields": [
1084 {"type": "string", "name": "name"},
1085 {"type": ["null", "long"], "name": "date", "aliases" : [ "time2", "time" ]}
1086 ]
1087 }"#;
1088
1089 #[derive(Debug, PartialEq, Eq, Clone, Serialize)]
1090 pub struct Conference {
1091 pub name: String,
1092 pub time: Option<i64>,
1093 }
1094
1095 let conf = Conference {
1096 name: "RustConf".to_string(),
1097 time: Some(1234567890),
1098 };
1099
1100 let schema = Schema::parse_str(SCHEMA)?;
1101 let mut writer = Writer::new(&schema, Vec::new())?;
1102
1103 let bytes = writer.append_ser(conf)?;
1104
1105 assert_eq!(182, bytes);
1106
1107 Ok(())
1108 }
1109
1110 #[test]
1111 fn avro_4014_validation_returns_a_detailed_error() -> TestResult {
1112 const SCHEMA: &str = r#"
1113 {
1114 "type": "record",
1115 "name": "Conference",
1116 "fields": [
1117 {"type": "string", "name": "name"},
1118 {"type": ["null", "long"], "name": "date", "aliases" : [ "time2", "time" ]}
1119 ]
1120 }"#;
1121
1122 #[derive(Debug, PartialEq, Clone, Serialize)]
1123 pub struct Conference {
1124 pub name: String,
1125 pub time: Option<f64>, }
1127
1128 let conf = Conference {
1129 name: "RustConf".to_string(),
1130 time: Some(12345678.90),
1131 };
1132
1133 let schema = Schema::parse_str(SCHEMA)?;
1134 let mut writer = Writer::new(&schema, Vec::new())?;
1135
1136 match writer.append_ser(conf) {
1137 Ok(bytes) => panic!("Expected an error, but got {bytes} bytes written"),
1138 Err(e) => {
1139 assert_eq!(
1140 e.to_string(),
1141 r#"Failed to serialize field 'time' for record Record(RecordSchema { name: Name { name: "Conference", .. }, fields: [RecordField { name: "name", schema: String, .. }, RecordField { name: "date", aliases: ["time2", "time"], schema: Union(UnionSchema { schemas: [Null, Long] }), .. }], .. }): Failed to serialize value of type f64 using schema Union(UnionSchema { schemas: [Null, Long] }): 12345678.9. Cause: Cannot find a Double schema in [Null, Long]"#
1142 );
1143 }
1144 }
1145 Ok(())
1146 }
1147
1148 #[test]
1149 fn avro_4063_flush_applies_to_inner_writer() -> TestResult {
1150 const SCHEMA: &str = r#"
1151 {
1152 "type": "record",
1153 "name": "ExampleSchema",
1154 "fields": [
1155 {"name": "exampleField", "type": "string"}
1156 ]
1157 }
1158 "#;
1159
1160 #[derive(Clone, Default)]
1161 struct TestBuffer(Rc<RefCell<Vec<u8>>>);
1162
1163 impl TestBuffer {
1164 fn len(&self) -> usize {
1165 self.0.borrow().len()
1166 }
1167 }
1168
1169 impl Write for TestBuffer {
1170 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
1171 self.0.borrow_mut().write(buf)
1172 }
1173
1174 fn flush(&mut self) -> std::io::Result<()> {
1175 Ok(())
1176 }
1177 }
1178
1179 let shared_buffer = TestBuffer::default();
1180
1181 let buffered_writer = std::io::BufWriter::new(shared_buffer.clone());
1182
1183 let schema = Schema::parse_str(SCHEMA)?;
1184
1185 let mut writer = Writer::new(&schema, buffered_writer)?;
1186
1187 let mut record = Record::new(writer.schema()).unwrap();
1188 record.put("exampleField", "value");
1189
1190 writer.append_value(record)?;
1191 writer.flush()?;
1192
1193 assert_eq!(
1194 shared_buffer.len(),
1195 151,
1196 "the test buffer was not fully written to after Writer::flush was called"
1197 );
1198
1199 Ok(())
1200 }
1201
1202 #[test]
1203 fn avro_rs_310_append_unvalidated_value() -> TestResult {
1204 let schema = Schema::String;
1205 let value = Value::Int(1);
1206
1207 let mut writer = Writer::new(&schema, Vec::new())?;
1208 writer.unvalidated_append_value_ref(&value)?;
1209 writer.unvalidated_append_value(value)?;
1210 let buffer = writer.into_inner()?;
1211
1212 assert_eq!(&buffer[buffer.len() - 18..buffer.len() - 16], &[2, 2]);
1214
1215 let mut writer = Writer::new(&schema, Vec::new())?;
1216 let value = Value::Int(1);
1217 let err = writer.append_value_ref(&value).unwrap_err();
1218 assert_eq!(
1219 err.to_string(),
1220 "Value Int(1) does not match schema String: Reason: Unsupported value-schema combination! Value: Int(1), schema: String"
1221 );
1222 let err = writer.append_value(value).unwrap_err();
1223 assert_eq!(
1224 err.to_string(),
1225 "Value Int(1) does not match schema String: Reason: Unsupported value-schema combination! Value: Int(1), schema: String"
1226 );
1227
1228 Ok(())
1229 }
1230
1231 #[test]
1232 fn avro_rs_469_reset_writer() -> TestResult {
1233 let schema = Schema::Boolean;
1234 let values = [true, false, true, false];
1235 let mut writer = Writer::new(&schema, Vec::new())?;
1236
1237 for value in values {
1238 writer.append_value(value)?;
1239 }
1240
1241 writer.flush()?;
1242 let first_buffer = writer.get_ref().clone();
1243
1244 writer.reset();
1245 assert_eq!(writer.get_ref().len(), 0);
1246
1247 for value in values {
1248 writer.append_value(value)?;
1249 }
1250
1251 writer.flush()?;
1252 let second_buffer = writer.get_ref().clone();
1253 assert_eq!(first_buffer.len(), second_buffer.len());
1254 let len = first_buffer.len();
1260 let header = len - 16 - 6 - 16;
1261 let data = header + 16;
1262 assert_eq!(
1263 first_buffer[..header],
1264 second_buffer[..header],
1265 "Written header must be the same, excluding sync marker"
1266 );
1267 assert_ne!(
1268 first_buffer[header..data],
1269 second_buffer[header..data],
1270 "Sync markers should be different"
1271 );
1272 assert_eq!(
1273 first_buffer[data..data + 6],
1274 second_buffer[data..data + 6],
1275 "Written data must be the same"
1276 );
1277 assert_ne!(
1278 first_buffer[len - 16..],
1279 second_buffer[len - 16..],
1280 "Sync markers should be different"
1281 );
1282
1283 Ok(())
1284 }
1285}