1use crate::{
20 AvroResult, Codec, Error,
21 encode::{encode, encode_internal, encode_to_vec},
22 error::Details,
23 schema::{ResolvedSchema, Schema},
24 serde::ser_schema::{Config, SchemaAwareSerializer},
25 types::Value,
26 util::is_human_readable,
27};
28use serde::Serialize;
29use std::{collections::HashMap, io::Write, mem::ManuallyDrop};
30
31pub mod datum;
32pub mod single_object;
33
34const DEFAULT_BLOCK_SIZE: usize = 16000;
35const AVRO_OBJECT_HEADER: &[u8] = b"Obj\x01";
36
37pub struct Writer<'a, W: Write> {
43 schema: &'a Schema,
44 writer: W,
45 resolved_schema: ResolvedSchema<'a>,
46 codec: Codec,
47 block_size: usize,
48 buffer: Vec<u8>,
49 num_values: usize,
50 marker: [u8; 16],
51 has_header: bool,
52 user_metadata: HashMap<String, Value>,
53 human_readable: bool,
54 map_array_target_block_size: Option<usize>,
55}
56
57#[bon::bon]
58impl<'a, W: Write> Writer<'a, W> {
59 #[builder]
60 pub fn builder(
61 schema: &'a Schema,
62 schemata: Option<Vec<&'a Schema>>,
63 writer: W,
64 #[builder(default = Codec::Null)] codec: Codec,
65 #[builder(default = DEFAULT_BLOCK_SIZE)] block_size: usize,
66 #[builder(default = generate_sync_marker())] marker: [u8; 16],
67 #[builder(default = false)]
71 has_header: bool,
72 #[builder(default)] user_metadata: HashMap<String, Value>,
73 #[builder(default = is_human_readable())]
77 human_readable: bool,
78 map_array_target_block_size: Option<usize>,
86 ) -> AvroResult<Self> {
87 let resolved_schema = if let Some(schemata) = schemata {
88 ResolvedSchema::try_from(schemata)?
89 } else {
90 ResolvedSchema::try_from(schema)?
91 };
92 Ok(Self {
93 schema,
94 writer,
95 resolved_schema,
96 codec,
97 block_size,
98 buffer: Vec::with_capacity(block_size),
99 num_values: 0,
100 marker,
101 has_header,
102 user_metadata,
103 human_readable,
104 map_array_target_block_size,
105 })
106 }
107}
108
109impl<'a, W: Write> Writer<'a, W> {
110 pub fn new(schema: &'a Schema, writer: W) -> AvroResult<Self> {
114 Writer::with_codec(schema, writer, Codec::Null)
115 }
116
117 pub fn with_codec(schema: &'a Schema, writer: W, codec: Codec) -> AvroResult<Self> {
120 Self::builder()
121 .schema(schema)
122 .writer(writer)
123 .codec(codec)
124 .build()
125 }
126
127 pub fn with_schemata(
132 schema: &'a Schema,
133 schemata: Vec<&'a Schema>,
134 writer: W,
135 codec: Codec,
136 ) -> AvroResult<Self> {
137 Self::builder()
138 .schema(schema)
139 .schemata(schemata)
140 .writer(writer)
141 .codec(codec)
142 .build()
143 }
144
145 pub fn append_to(schema: &'a Schema, writer: W, marker: [u8; 16]) -> AvroResult<Self> {
149 Writer::append_to_with_codec(schema, writer, Codec::Null, marker)
150 }
151
152 pub fn append_to_with_codec(
155 schema: &'a Schema,
156 writer: W,
157 codec: Codec,
158 marker: [u8; 16],
159 ) -> AvroResult<Self> {
160 Self::builder()
161 .schema(schema)
162 .writer(writer)
163 .codec(codec)
164 .marker(marker)
165 .has_header(true)
166 .build()
167 }
168
169 pub fn append_to_with_codec_schemata(
172 schema: &'a Schema,
173 schemata: Vec<&'a Schema>,
174 writer: W,
175 codec: Codec,
176 marker: [u8; 16],
177 ) -> AvroResult<Self> {
178 Self::builder()
179 .schema(schema)
180 .schemata(schemata)
181 .writer(writer)
182 .codec(codec)
183 .marker(marker)
184 .has_header(true)
185 .build()
186 }
187
188 pub fn schema(&self) -> &'a Schema {
190 self.schema
191 }
192
193 #[deprecated(since = "0.22.0", note = "Use `Writer::append_value` instead")]
195 pub fn append<T: Into<Value>>(&mut self, value: T) -> AvroResult<usize> {
196 self.append_value(value)
197 }
198
199 pub fn append_value<T: Into<Value>>(&mut self, value: T) -> AvroResult<usize> {
207 let avro = value.into();
208 self.append_value_ref(&avro)
209 }
210
211 pub fn append_value_ref(&mut self, value: &Value) -> AvroResult<usize> {
219 if let Some(reason) = value.validate_internal(
220 self.schema,
221 self.resolved_schema.get_names(),
222 self.schema.namespace(),
223 ) {
224 return Err(Details::ValidationWithReason {
225 value: value.clone(),
226 schema: self.schema.clone(),
227 reason,
228 }
229 .into());
230 }
231 self.unvalidated_append_value_ref(value)
232 }
233
234 pub fn unvalidated_append_value<T: Into<Value>>(&mut self, value: T) -> AvroResult<usize> {
246 let value = value.into();
247 self.unvalidated_append_value_ref(&value)
248 }
249
250 pub fn unvalidated_append_value_ref(&mut self, value: &Value) -> AvroResult<usize> {
262 let n = self.maybe_write_header()?;
263 encode_internal(
264 value,
265 self.schema,
266 self.resolved_schema.get_names(),
267 self.schema.namespace(),
268 &mut self.buffer,
269 )?;
270
271 self.num_values += 1;
272
273 if self.buffer.len() >= self.block_size {
274 return self.flush().map(|b| b + n);
275 }
276
277 Ok(n)
278 }
279
280 pub fn append_ser<S: Serialize>(&mut self, value: S) -> AvroResult<usize> {
290 let n = self.maybe_write_header()?;
291
292 let config = Config {
293 names: self.resolved_schema.get_names(),
294 target_block_size: self.map_array_target_block_size,
295 human_readable: self.human_readable,
296 };
297
298 value.serialize(SchemaAwareSerializer::new(
299 &mut self.buffer,
300 self.schema,
301 config,
302 )?)?;
303 self.num_values += 1;
304
305 if self.buffer.len() >= self.block_size {
306 return self.flush().map(|b| b + n);
307 }
308
309 Ok(n)
310 }
311
312 pub fn extend<I, T: Into<Value>>(&mut self, values: I) -> AvroResult<usize>
319 where
320 I: IntoIterator<Item = T>,
321 {
322 let mut num_bytes = 0;
337 for value in values {
338 num_bytes += self.append_value(value)?;
339 }
340 num_bytes += self.flush()?;
341
342 Ok(num_bytes)
343 }
344
345 pub fn extend_ser<I, T: Serialize>(&mut self, values: I) -> AvroResult<usize>
354 where
355 I: IntoIterator<Item = T>,
356 {
357 let mut num_bytes = 0;
372 for value in values {
373 num_bytes += self.append_ser(value)?;
374 }
375 num_bytes += self.flush()?;
376
377 Ok(num_bytes)
378 }
379
380 pub fn extend_from_slice(&mut self, values: &[Value]) -> AvroResult<usize> {
388 let mut num_bytes = 0;
389 for value in values {
390 num_bytes += self.append_value_ref(value)?;
391 }
392 num_bytes += self.flush()?;
393
394 Ok(num_bytes)
395 }
396
397 pub fn flush(&mut self) -> AvroResult<usize> {
405 let mut num_bytes = self.maybe_write_header()?;
406 if self.num_values == 0 {
407 return Ok(num_bytes);
408 }
409
410 self.codec.compress(&mut self.buffer)?;
411
412 let num_values = self.num_values;
413 let stream_len = self.buffer.len();
414
415 num_bytes += self.append_raw(&num_values.try_into()?, &Schema::Long)?
416 + self.append_raw(&stream_len.try_into()?, &Schema::Long)?
417 + self
418 .writer
419 .write(self.buffer.as_ref())
420 .map_err(Details::WriteBytes)?
421 + self.append_marker()?;
422
423 self.buffer.clear();
424 self.num_values = 0;
425
426 self.writer.flush().map_err(Details::FlushWriter)?;
427
428 Ok(num_bytes)
429 }
430
431 pub fn into_inner(mut self) -> AvroResult<W> {
436 self.maybe_write_header()?;
437 self.flush()?;
438
439 let mut this = ManuallyDrop::new(self);
440
441 let _buffer = std::mem::take(&mut this.buffer);
443 let _user_metadata = std::mem::take(&mut this.user_metadata);
444 unsafe { std::ptr::drop_in_place(&mut this.resolved_schema) };
446
447 let writer = unsafe { std::ptr::read(&this.writer) };
449
450 Ok(writer)
451 }
452
453 pub fn get_ref(&self) -> &W {
458 &self.writer
459 }
460
461 pub fn get_mut(&mut self) -> &mut W {
468 &mut self.writer
469 }
470
471 fn append_marker(&mut self) -> AvroResult<usize> {
473 self.writer
476 .write(&self.marker)
477 .map_err(|e| Details::WriteMarker(e).into())
478 }
479
480 fn append_raw(&mut self, value: &Value, schema: &Schema) -> AvroResult<usize> {
482 self.append_bytes(encode_to_vec(value, schema)?.as_ref())
483 }
484
485 fn append_bytes(&mut self, bytes: &[u8]) -> AvroResult<usize> {
487 self.writer
488 .write(bytes)
489 .map_err(|e| Details::WriteBytes(e).into())
490 }
491
492 pub fn add_user_metadata<T: AsRef<[u8]>>(&mut self, key: String, value: T) -> AvroResult<()> {
495 if !self.has_header {
496 if key.starts_with("avro.") {
497 return Err(Details::InvalidMetadataKey(key).into());
498 }
499 self.user_metadata
500 .insert(key, Value::Bytes(value.as_ref().to_vec()));
501 Ok(())
502 } else {
503 Err(Details::FileHeaderAlreadyWritten.into())
504 }
505 }
506
507 fn header(&self) -> Result<Vec<u8>, Error> {
509 let schema_bytes = serde_json::to_string(self.schema)
510 .map_err(Details::ConvertJsonToString)?
511 .into_bytes();
512
513 let mut metadata = HashMap::with_capacity(2);
514 metadata.insert("avro.schema", Value::Bytes(schema_bytes));
515 if self.codec != Codec::Null {
516 metadata.insert("avro.codec", self.codec.into());
517 }
518 match self.codec {
519 #[cfg(feature = "bzip")]
520 Codec::Bzip2(settings) => {
521 metadata.insert(
522 "avro.codec.compression_level",
523 Value::Bytes(vec![settings.compression_level]),
524 );
525 }
526 #[cfg(feature = "xz")]
527 Codec::Xz(settings) => {
528 metadata.insert(
529 "avro.codec.compression_level",
530 Value::Bytes(vec![settings.compression_level]),
531 );
532 }
533 #[cfg(feature = "zstandard")]
534 Codec::Zstandard(settings) => {
535 metadata.insert(
536 "avro.codec.compression_level",
537 Value::Bytes(vec![settings.compression_level]),
538 );
539 }
540 _ => {}
541 }
542
543 for (k, v) in &self.user_metadata {
544 metadata.insert(k.as_str(), v.clone());
545 }
546
547 let mut header = Vec::new();
548 header.extend_from_slice(AVRO_OBJECT_HEADER);
549 encode(
550 &metadata.into(),
551 &Schema::map(Schema::Bytes).build(),
552 &mut header,
553 )?;
554 header.extend_from_slice(&self.marker);
555
556 Ok(header)
557 }
558
559 fn maybe_write_header(&mut self) -> AvroResult<usize> {
560 if !self.has_header {
561 let header = self.header()?;
562 let n = self.append_bytes(header.as_ref())?;
563 self.has_header = true;
564 Ok(n)
565 } else {
566 Ok(0)
567 }
568 }
569}
570
571pub trait Clearable {
573 fn clear(&mut self);
575}
576
577impl Clearable for Vec<u8> {
578 fn clear(&mut self) {
579 Vec::clear(self);
580 }
581}
582
583impl<'a, W: Clearable + Write> Writer<'a, W> {
584 pub fn reset(&mut self) {
622 self.buffer.clear();
623 self.writer.clear();
624 self.has_header = false;
625 self.num_values = 0;
626 self.user_metadata.clear();
627 self.marker = generate_sync_marker();
628 }
629}
630
631impl<W: Write> Drop for Writer<'_, W> {
632 fn drop(&mut self) {
634 let _ = self.maybe_write_header();
635 let _ = self.flush();
636 }
637}
638
639#[cfg(not(target_arch = "wasm32"))]
640fn generate_sync_marker() -> [u8; 16] {
641 rand::random()
642}
643
644#[cfg(target_arch = "wasm32")]
645fn generate_sync_marker() -> [u8; 16] {
646 let mut marker = [0_u8; 16];
647 std::iter::repeat_with(quad_rand::rand)
648 .take(4)
649 .flat_map(|i| i.to_be_bytes())
650 .enumerate()
651 .for_each(|(i, n)| marker[i] = n);
652 marker
653}
654
655#[cfg(test)]
656mod tests {
657 use std::{cell::RefCell, rc::Rc};
658
659 use super::*;
660 use crate::{Reader, types::Record, util::zig_i64};
661 use pretty_assertions::assert_eq;
662 use serde::{Deserialize, Serialize};
663
664 use crate::{codec::DeflateSettings, error::Details};
665 use apache_avro_test_helper::TestResult;
666
667 const AVRO_OBJECT_HEADER_LEN: usize = AVRO_OBJECT_HEADER.len();
668
669 const SCHEMA: &str = r#"
670 {
671 "type": "record",
672 "name": "test",
673 "fields": [
674 {
675 "name": "a",
676 "type": "long",
677 "default": 42
678 },
679 {
680 "name": "b",
681 "type": "string"
682 }
683 ]
684 }
685 "#;
686
687 #[test]
688 fn avro_rs_220_flush_write_header() -> TestResult {
689 let schema = Schema::parse_str(SCHEMA)?;
690
691 let mut writer = Writer::new(&schema, Vec::new())?;
693 writer.flush()?;
694 let result = writer.into_inner()?;
695 assert_eq!(result.len(), 147);
696
697 let mut writer = Writer::builder()
699 .has_header(true)
700 .schema(&schema)
701 .writer(Vec::new())
702 .build()?;
703 writer.flush()?;
704 let result = writer.into_inner()?;
705 assert_eq!(result.len(), 0);
706
707 Ok(())
708 }
709
710 #[test]
711 fn test_writer_append() -> TestResult {
712 let schema = Schema::parse_str(SCHEMA)?;
713 let mut writer = Writer::new(&schema, Vec::new())?;
714
715 let mut record = Record::new(&schema).unwrap();
716 record.put("a", 27i64);
717 record.put("b", "foo");
718
719 let n1 = writer.append_value(record.clone())?;
720 let n2 = writer.append_value(record.clone())?;
721 let n3 = writer.flush()?;
722 let result = writer.into_inner()?;
723
724 assert_eq!(n1 + n2 + n3, result.len());
725
726 let mut data = Vec::new();
727 zig_i64(27, &mut data)?;
728 zig_i64(3, &mut data)?;
729 data.extend(b"foo");
730 data.extend(data.clone());
731
732 assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
734 let last_data_byte = result.len() - 16;
736 assert_eq!(
737 &result[last_data_byte - data.len()..last_data_byte],
738 data.as_slice()
739 );
740
741 Ok(())
742 }
743
744 #[test]
745 fn test_writer_extend() -> TestResult {
746 let schema = Schema::parse_str(SCHEMA)?;
747 let mut writer = Writer::new(&schema, Vec::new())?;
748
749 let mut record = Record::new(&schema).unwrap();
750 record.put("a", 27i64);
751 record.put("b", "foo");
752 let record_copy = record.clone();
753 let records = vec![record, record_copy];
754
755 let n1 = writer.extend(records)?;
756 let n2 = writer.flush()?;
757 let result = writer.into_inner()?;
758
759 assert_eq!(n1 + n2, result.len());
760
761 let mut data = Vec::new();
762 zig_i64(27, &mut data)?;
763 zig_i64(3, &mut data)?;
764 data.extend(b"foo");
765 data.extend(data.clone());
766
767 assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
769 let last_data_byte = result.len() - 16;
771 assert_eq!(
772 &result[last_data_byte - data.len()..last_data_byte],
773 data.as_slice()
774 );
775
776 Ok(())
777 }
778
779 #[derive(Debug, Clone, Deserialize, Serialize)]
780 struct TestSerdeSerialize {
781 a: i64,
782 b: String,
783 }
784
785 #[test]
786 fn test_writer_append_ser() -> TestResult {
787 let schema = Schema::parse_str(SCHEMA)?;
788 let mut writer = Writer::new(&schema, Vec::new())?;
789
790 let record = TestSerdeSerialize {
791 a: 27,
792 b: "foo".to_owned(),
793 };
794
795 let n1 = writer.append_ser(record)?;
796 let n2 = writer.flush()?;
797 let result = writer.into_inner()?;
798
799 assert_eq!(n1 + n2, result.len());
800
801 let mut data = Vec::new();
802 zig_i64(27, &mut data)?;
803 zig_i64(3, &mut data)?;
804 data.extend(b"foo");
805
806 assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
808 let last_data_byte = result.len() - 16;
810 assert_eq!(
811 &result[last_data_byte - data.len()..last_data_byte],
812 data.as_slice()
813 );
814
815 Ok(())
816 }
817
818 #[test]
819 fn test_writer_extend_ser() -> TestResult {
820 let schema = Schema::parse_str(SCHEMA)?;
821 let mut writer = Writer::new(&schema, Vec::new())?;
822
823 let record = TestSerdeSerialize {
824 a: 27,
825 b: "foo".to_owned(),
826 };
827 let record_copy = record.clone();
828 let records = vec![record, record_copy];
829
830 let n1 = writer.extend_ser(records)?;
831 let n2 = writer.flush()?;
832 let result = writer.into_inner()?;
833
834 assert_eq!(n1 + n2, result.len());
835
836 let mut data = Vec::new();
837 zig_i64(27, &mut data)?;
838 zig_i64(3, &mut data)?;
839 data.extend(b"foo");
840 data.extend(data.clone());
841
842 assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
844 let last_data_byte = result.len() - 16;
846 assert_eq!(
847 &result[last_data_byte - data.len()..last_data_byte],
848 data.as_slice()
849 );
850
851 Ok(())
852 }
853
854 fn make_writer_with_codec(schema: &Schema) -> AvroResult<Writer<'_, Vec<u8>>> {
855 Writer::with_codec(
856 schema,
857 Vec::new(),
858 Codec::Deflate(DeflateSettings::default()),
859 )
860 }
861
862 fn make_writer_with_builder(schema: &Schema) -> AvroResult<Writer<'_, Vec<u8>>> {
863 Writer::builder()
864 .writer(Vec::new())
865 .schema(schema)
866 .codec(Codec::Deflate(DeflateSettings::default()))
867 .block_size(100)
868 .build()
869 }
870
871 fn check_writer(mut writer: Writer<'_, Vec<u8>>, schema: &Schema) -> TestResult {
872 let mut record = Record::new(schema).unwrap();
873 record.put("a", 27i64);
874 record.put("b", "foo");
875
876 let n1 = writer.append_value(record.clone())?;
877 let n2 = writer.append_value(record.clone())?;
878 let n3 = writer.flush()?;
879 let result = writer.into_inner()?;
880
881 assert_eq!(n1 + n2 + n3, result.len());
882
883 let mut data = Vec::new();
884 zig_i64(27, &mut data)?;
885 zig_i64(3, &mut data)?;
886 data.extend(b"foo");
887 data.extend(data.clone());
888 Codec::Deflate(DeflateSettings::default()).compress(&mut data)?;
889
890 assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
892 let last_data_byte = result.len() - 16;
894 assert_eq!(
895 &result[last_data_byte - data.len()..last_data_byte],
896 data.as_slice()
897 );
898
899 Ok(())
900 }
901
902 #[test]
903 fn test_writer_with_codec() -> TestResult {
904 let schema = Schema::parse_str(SCHEMA)?;
905 let writer = make_writer_with_codec(&schema)?;
906 check_writer(writer, &schema)
907 }
908
909 #[test]
910 fn test_writer_with_builder() -> TestResult {
911 let schema = Schema::parse_str(SCHEMA)?;
912 let writer = make_writer_with_builder(&schema)?;
913 check_writer(writer, &schema)
914 }
915
916 #[test]
917 fn test_logical_writer() -> TestResult {
918 const LOGICAL_TYPE_SCHEMA: &str = r#"
919 {
920 "type": "record",
921 "name": "logical_type_test",
922 "fields": [
923 {
924 "name": "a",
925 "type": [
926 "null",
927 {
928 "type": "long",
929 "logicalType": "timestamp-micros"
930 }
931 ]
932 }
933 ]
934 }
935 "#;
936 let codec = Codec::Deflate(DeflateSettings::default());
937 let schema = Schema::parse_str(LOGICAL_TYPE_SCHEMA)?;
938 let mut writer = Writer::builder()
939 .schema(&schema)
940 .codec(codec)
941 .writer(Vec::new())
942 .build()?;
943
944 let mut record1 = Record::new(&schema).unwrap();
945 record1.put(
946 "a",
947 Value::Union(1, Box::new(Value::TimestampMicros(1234_i64))),
948 );
949
950 let mut record2 = Record::new(&schema).unwrap();
951 record2.put("a", Value::Union(0, Box::new(Value::Null)));
952
953 let n1 = writer.append_value(record1)?;
954 let n2 = writer.append_value(record2)?;
955 let n3 = writer.flush()?;
956 let result = writer.into_inner()?;
957
958 assert_eq!(n1 + n2 + n3, result.len());
959
960 let mut data = Vec::new();
961 zig_i64(1, &mut data)?;
963 zig_i64(1234, &mut data)?;
964
965 zig_i64(0, &mut data)?;
967 codec.compress(&mut data)?;
968
969 assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
971 let last_data_byte = result.len() - 16;
973 assert_eq!(
974 &result[last_data_byte - data.len()..last_data_byte],
975 data.as_slice()
976 );
977
978 Ok(())
979 }
980
981 #[test]
982 fn test_avro_3405_writer_add_metadata_success() -> TestResult {
983 let schema = Schema::parse_str(SCHEMA)?;
984 let mut writer = Writer::new(&schema, Vec::new())?;
985
986 writer.add_user_metadata("stringKey".to_string(), String::from("stringValue"))?;
987 writer.add_user_metadata("strKey".to_string(), "strValue")?;
988 writer.add_user_metadata("bytesKey".to_string(), b"bytesValue")?;
989 writer.add_user_metadata("vecKey".to_string(), vec![1, 2, 3])?;
990
991 let mut record = Record::new(&schema).unwrap();
992 record.put("a", 27i64);
993 record.put("b", "foo");
994
995 writer.append_value(record.clone())?;
996 writer.append_value(record.clone())?;
997 writer.flush()?;
998 let result = writer.into_inner()?;
999
1000 assert_eq!(result.len(), 244);
1001
1002 Ok(())
1003 }
1004
1005 #[test]
1006 fn test_avro_3881_metadata_empty_body() -> TestResult {
1007 let schema = Schema::parse_str(SCHEMA)?;
1008 let mut writer = Writer::new(&schema, Vec::new())?;
1009 writer.add_user_metadata("a".to_string(), "b")?;
1010 let result = writer.into_inner()?;
1011
1012 let reader = Reader::builder(&result[..])
1013 .reader_schema(&schema)
1014 .build()?;
1015 let mut expected = HashMap::new();
1016 expected.insert("a".to_string(), vec![b'b']);
1017 assert_eq!(reader.user_metadata(), &expected);
1018 assert_eq!(reader.into_iter().count(), 0);
1019
1020 Ok(())
1021 }
1022
1023 #[test]
1024 fn test_avro_3405_writer_add_metadata_failure() -> TestResult {
1025 let schema = Schema::parse_str(SCHEMA)?;
1026 let mut writer = Writer::new(&schema, Vec::new())?;
1027
1028 let mut record = Record::new(&schema).unwrap();
1029 record.put("a", 27i64);
1030 record.put("b", "foo");
1031 writer.append_value(record.clone())?;
1032
1033 match writer
1034 .add_user_metadata("stringKey".to_string(), String::from("value2"))
1035 .map_err(Error::into_details)
1036 {
1037 Err(e @ Details::FileHeaderAlreadyWritten) => {
1038 assert_eq!(e.to_string(), "The file metadata is already flushed.");
1039 }
1040 Err(e) => panic!("Unexpected error occurred while writing user metadata: {e:?}"),
1041 Ok(_) => panic!("Expected an error that metadata cannot be added after adding data"),
1042 }
1043
1044 Ok(())
1045 }
1046
1047 #[test]
1048 fn test_avro_3405_writer_add_metadata_reserved_prefix_failure() -> TestResult {
1049 let schema = Schema::parse_str(SCHEMA)?;
1050 let mut writer = Writer::new(&schema, Vec::new())?;
1051
1052 let key = "avro.stringKey".to_string();
1053 match writer
1054 .add_user_metadata(key.clone(), "value")
1055 .map_err(Error::into_details)
1056 {
1057 Err(ref e @ Details::InvalidMetadataKey(_)) => {
1058 assert_eq!(
1059 e.to_string(),
1060 format!(
1061 "Metadata keys starting with 'avro.' are reserved for internal usage: {key}."
1062 )
1063 );
1064 }
1065 Err(e) => panic!(
1066 "Unexpected error occurred while writing user metadata with reserved prefix ('avro.'): {e:?}"
1067 ),
1068 Ok(_) => {
1069 panic!("Expected an error that the metadata key cannot be prefixed with 'avro.'")
1070 }
1071 }
1072
1073 Ok(())
1074 }
1075
1076 #[test]
1077 fn test_avro_3405_writer_add_metadata_with_builder_api_success() -> TestResult {
1078 let schema = Schema::parse_str(SCHEMA)?;
1079
1080 let mut user_meta_data: HashMap<String, Value> = HashMap::new();
1081 user_meta_data.insert(
1082 "stringKey".to_string(),
1083 Value::String("stringValue".to_string()),
1084 );
1085 user_meta_data.insert("bytesKey".to_string(), Value::Bytes(b"bytesValue".to_vec()));
1086 user_meta_data.insert("vecKey".to_string(), Value::Bytes(vec![1, 2, 3]));
1087
1088 let writer: Writer<'_, Vec<u8>> = Writer::builder()
1089 .writer(Vec::new())
1090 .schema(&schema)
1091 .user_metadata(user_meta_data.clone())
1092 .build()?;
1093
1094 assert_eq!(writer.user_metadata, user_meta_data);
1095
1096 Ok(())
1097 }
1098
1099 #[test]
1100 fn avro_3894_take_aliases_into_account_when_serializing() -> TestResult {
1101 const SCHEMA: &str = r#"
1102 {
1103 "type": "record",
1104 "name": "Conference",
1105 "fields": [
1106 {"type": "string", "name": "name"},
1107 {"type": ["null", "long"], "name": "date", "aliases" : [ "time2", "time" ]}
1108 ]
1109 }"#;
1110
1111 #[derive(Debug, PartialEq, Eq, Clone, Serialize)]
1112 pub struct Conference {
1113 pub name: String,
1114 pub time: Option<i64>,
1115 }
1116
1117 let conf = Conference {
1118 name: "RustConf".to_string(),
1119 time: Some(1234567890),
1120 };
1121
1122 let schema = Schema::parse_str(SCHEMA)?;
1123 let mut writer = Writer::new(&schema, Vec::new())?;
1124
1125 let bytes = writer.append_ser(conf)?;
1126
1127 assert_eq!(182, bytes);
1128
1129 Ok(())
1130 }
1131
1132 #[test]
1133 fn avro_4014_validation_returns_a_detailed_error() -> TestResult {
1134 const SCHEMA: &str = r#"
1135 {
1136 "type": "record",
1137 "name": "Conference",
1138 "fields": [
1139 {"type": "string", "name": "name"},
1140 {"type": ["null", "long"], "name": "date", "aliases" : [ "time2", "time" ]}
1141 ]
1142 }"#;
1143
1144 #[derive(Debug, PartialEq, Clone, Serialize)]
1145 pub struct Conference {
1146 pub name: String,
1147 pub time: Option<f64>, }
1149
1150 let conf = Conference {
1151 name: "RustConf".to_string(),
1152 time: Some(12345678.90),
1153 };
1154
1155 let schema = Schema::parse_str(SCHEMA)?;
1156 let mut writer = Writer::new(&schema, Vec::new())?;
1157
1158 match writer.append_ser(conf) {
1159 Ok(bytes) => panic!("Expected an error, but got {bytes} bytes written"),
1160 Err(e) => {
1161 assert_eq!(
1162 e.to_string(),
1163 r#"Failed to serialize field 'date' of record RecordSchema { name: Name { name: "Conference", .. }, fields: [RecordField { name: "name", schema: String, .. }, RecordField { name: "date", aliases: ["time2", "time"], schema: Union(UnionSchema { schemas: [Null, Long] }), .. }], .. }: Failed to serialize value of type `f64`: Expected Schema::Double"#
1164 );
1165 }
1166 }
1167 Ok(())
1168 }
1169
1170 #[test]
1171 fn avro_4063_flush_applies_to_inner_writer() -> TestResult {
1172 const SCHEMA: &str = r#"
1173 {
1174 "type": "record",
1175 "name": "ExampleSchema",
1176 "fields": [
1177 {"name": "exampleField", "type": "string"}
1178 ]
1179 }
1180 "#;
1181
1182 #[derive(Clone, Default)]
1183 struct TestBuffer(Rc<RefCell<Vec<u8>>>);
1184
1185 impl TestBuffer {
1186 fn len(&self) -> usize {
1187 self.0.borrow().len()
1188 }
1189 }
1190
1191 impl Write for TestBuffer {
1192 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
1193 self.0.borrow_mut().write(buf)
1194 }
1195
1196 fn flush(&mut self) -> std::io::Result<()> {
1197 Ok(())
1198 }
1199 }
1200
1201 let shared_buffer = TestBuffer::default();
1202
1203 let buffered_writer = std::io::BufWriter::new(shared_buffer.clone());
1204
1205 let schema = Schema::parse_str(SCHEMA)?;
1206
1207 let mut writer = Writer::new(&schema, buffered_writer)?;
1208
1209 let mut record = Record::new(writer.schema()).unwrap();
1210 record.put("exampleField", "value");
1211
1212 writer.append_value(record)?;
1213 writer.flush()?;
1214
1215 assert_eq!(
1216 shared_buffer.len(),
1217 151,
1218 "the test buffer was not fully written to after Writer::flush was called"
1219 );
1220
1221 Ok(())
1222 }
1223
1224 #[test]
1225 fn avro_rs_310_append_unvalidated_value() -> TestResult {
1226 let schema = Schema::String;
1227 let value = Value::Int(1);
1228
1229 let mut writer = Writer::new(&schema, Vec::new())?;
1230 writer.unvalidated_append_value_ref(&value)?;
1231 writer.unvalidated_append_value(value)?;
1232 let buffer = writer.into_inner()?;
1233
1234 assert_eq!(&buffer[buffer.len() - 18..buffer.len() - 16], &[2, 2]);
1236
1237 let mut writer = Writer::new(&schema, Vec::new())?;
1238 let value = Value::Int(1);
1239 let err = writer.append_value_ref(&value).unwrap_err();
1240 assert_eq!(
1241 err.to_string(),
1242 "Value Int(1) does not match schema String: Reason: Unsupported value-schema combination! Value: Int(1), schema: String"
1243 );
1244 let err = writer.append_value(value).unwrap_err();
1245 assert_eq!(
1246 err.to_string(),
1247 "Value Int(1) does not match schema String: Reason: Unsupported value-schema combination! Value: Int(1), schema: String"
1248 );
1249
1250 Ok(())
1251 }
1252
1253 #[test]
1254 fn avro_rs_469_reset_writer() -> TestResult {
1255 let schema = Schema::Boolean;
1256 let values = [true, false, true, false];
1257 let mut writer = Writer::new(&schema, Vec::new())?;
1258
1259 for value in values {
1260 writer.append_value(value)?;
1261 }
1262
1263 writer.flush()?;
1264 let first_buffer = writer.get_ref().clone();
1265
1266 writer.reset();
1267 assert_eq!(writer.get_ref().len(), 0);
1268
1269 for value in values {
1270 writer.append_value(value)?;
1271 }
1272
1273 writer.flush()?;
1274 let second_buffer = writer.get_ref().clone();
1275 assert_eq!(first_buffer.len(), second_buffer.len());
1276 let len = first_buffer.len();
1282 let header = len - 16 - 6 - 16;
1283 let data = header + 16;
1284 assert_eq!(
1285 first_buffer[..header],
1286 second_buffer[..header],
1287 "Written header must be the same, excluding sync marker"
1288 );
1289 assert_ne!(
1290 first_buffer[header..data],
1291 second_buffer[header..data],
1292 "Sync markers should be different"
1293 );
1294 assert_eq!(
1295 first_buffer[data..data + 6],
1296 second_buffer[data..data + 6],
1297 "Written data must be the same"
1298 );
1299 assert_ne!(
1300 first_buffer[len - 16..],
1301 second_buffer[len - 16..],
1302 "Sync markers should be different"
1303 );
1304
1305 Ok(())
1306 }
1307}