1use crate::{
20 AvroResult, Codec, Error,
21 encode::{encode, encode_internal, encode_to_vec},
22 error::Details,
23 headers::{HeaderBuilder, RabinFingerprintHeader},
24 schema::{AvroSchema, Name, ResolvedOwnedSchema, ResolvedSchema, Schema},
25 serde::ser_schema::SchemaAwareWriteSerializer,
26 types::Value,
27};
28use serde::Serialize;
29use std::{
30 collections::HashMap, io::Write, marker::PhantomData, mem::ManuallyDrop, ops::RangeInclusive,
31};
32
33const DEFAULT_BLOCK_SIZE: usize = 16000;
34const AVRO_OBJECT_HEADER: &[u8] = b"Obj\x01";
35
36pub struct Writer<'a, W: Write> {
42 schema: &'a Schema,
43 writer: W,
44 resolved_schema: ResolvedSchema<'a>,
45 codec: Codec,
46 block_size: usize,
47 buffer: Vec<u8>,
48 num_values: usize,
49 marker: [u8; 16],
50 has_header: bool,
51 user_metadata: HashMap<String, Value>,
52}
53
54#[bon::bon]
55impl<'a, W: Write> Writer<'a, W> {
56 #[builder]
57 pub fn builder(
58 schema: &'a Schema,
59 schemata: Option<Vec<&'a Schema>>,
60 writer: W,
61 #[builder(default = Codec::Null)] codec: Codec,
62 #[builder(default = DEFAULT_BLOCK_SIZE)] block_size: usize,
63 #[builder(default = generate_sync_marker())] marker: [u8; 16],
64 #[builder(default = false)]
68 has_header: bool,
69 #[builder(default)] user_metadata: HashMap<String, Value>,
70 ) -> AvroResult<Self> {
71 let resolved_schema = if let Some(schemata) = schemata {
72 ResolvedSchema::try_from(schemata)?
73 } else {
74 ResolvedSchema::try_from(schema)?
75 };
76 Ok(Self {
77 schema,
78 writer,
79 resolved_schema,
80 codec,
81 block_size,
82 buffer: Vec::with_capacity(block_size),
83 num_values: 0,
84 marker,
85 has_header,
86 user_metadata,
87 })
88 }
89}
90
91impl<'a, W: Write> Writer<'a, W> {
92 pub fn new(schema: &'a Schema, writer: W) -> AvroResult<Self> {
96 Writer::with_codec(schema, writer, Codec::Null)
97 }
98
99 pub fn with_codec(schema: &'a Schema, writer: W, codec: Codec) -> AvroResult<Self> {
102 Self::builder()
103 .schema(schema)
104 .writer(writer)
105 .codec(codec)
106 .build()
107 }
108
109 pub fn with_schemata(
114 schema: &'a Schema,
115 schemata: Vec<&'a Schema>,
116 writer: W,
117 codec: Codec,
118 ) -> AvroResult<Self> {
119 Self::builder()
120 .schema(schema)
121 .schemata(schemata)
122 .writer(writer)
123 .codec(codec)
124 .build()
125 }
126
127 pub fn append_to(schema: &'a Schema, writer: W, marker: [u8; 16]) -> AvroResult<Self> {
131 Writer::append_to_with_codec(schema, writer, Codec::Null, marker)
132 }
133
134 pub fn append_to_with_codec(
137 schema: &'a Schema,
138 writer: W,
139 codec: Codec,
140 marker: [u8; 16],
141 ) -> AvroResult<Self> {
142 Self::builder()
143 .schema(schema)
144 .writer(writer)
145 .codec(codec)
146 .marker(marker)
147 .has_header(true)
148 .build()
149 }
150
151 pub fn append_to_with_codec_schemata(
154 schema: &'a Schema,
155 schemata: Vec<&'a Schema>,
156 writer: W,
157 codec: Codec,
158 marker: [u8; 16],
159 ) -> AvroResult<Self> {
160 Self::builder()
161 .schema(schema)
162 .schemata(schemata)
163 .writer(writer)
164 .codec(codec)
165 .marker(marker)
166 .has_header(true)
167 .build()
168 }
169
170 pub fn schema(&self) -> &'a Schema {
172 self.schema
173 }
174
175 pub fn append<T: Into<Value>>(&mut self, value: T) -> AvroResult<usize> {
184 let n = self.maybe_write_header()?;
185
186 let avro = value.into();
187 self.append_value_ref(&avro).map(|m| m + n)
188 }
189
190 pub fn append_value_ref(&mut self, value: &Value) -> AvroResult<usize> {
198 let n = self.maybe_write_header()?;
199
200 write_value_ref_resolved(self.schema, &self.resolved_schema, value, &mut self.buffer)?;
201 self.num_values += 1;
202
203 if self.buffer.len() >= self.block_size {
204 return self.flush().map(|b| b + n);
205 }
206
207 Ok(n)
208 }
209
210 pub fn append_ser<S: Serialize>(&mut self, value: S) -> AvroResult<usize> {
220 let n = self.maybe_write_header()?;
221
222 let mut serializer = SchemaAwareWriteSerializer::new(
223 &mut self.buffer,
224 self.schema,
225 self.resolved_schema.get_names(),
226 None,
227 );
228 value.serialize(&mut serializer)?;
229 self.num_values += 1;
230
231 if self.buffer.len() >= self.block_size {
232 return self.flush().map(|b| b + n);
233 }
234
235 Ok(n)
236 }
237
238 pub fn extend<I, T: Into<Value>>(&mut self, values: I) -> AvroResult<usize>
246 where
247 I: IntoIterator<Item = T>,
248 {
249 let mut num_bytes = 0;
264 for value in values {
265 num_bytes += self.append(value)?;
266 }
267 num_bytes += self.flush()?;
268
269 Ok(num_bytes)
270 }
271
272 pub fn extend_ser<I, T: Serialize>(&mut self, values: I) -> AvroResult<usize>
281 where
282 I: IntoIterator<Item = T>,
283 {
284 let mut num_bytes = 0;
299 for value in values {
300 num_bytes += self.append_ser(value)?;
301 }
302 num_bytes += self.flush()?;
303
304 Ok(num_bytes)
305 }
306
307 pub fn extend_from_slice(&mut self, values: &[Value]) -> AvroResult<usize> {
315 let mut num_bytes = 0;
316 for value in values {
317 num_bytes += self.append_value_ref(value)?;
318 }
319 num_bytes += self.flush()?;
320
321 Ok(num_bytes)
322 }
323
324 pub fn flush(&mut self) -> AvroResult<usize> {
332 let mut num_bytes = self.maybe_write_header()?;
333 if self.num_values == 0 {
334 return Ok(num_bytes);
335 }
336
337 self.codec.compress(&mut self.buffer)?;
338
339 let num_values = self.num_values;
340 let stream_len = self.buffer.len();
341
342 num_bytes += self.append_raw(&num_values.into(), &Schema::Long)?
343 + self.append_raw(&stream_len.into(), &Schema::Long)?
344 + self
345 .writer
346 .write(self.buffer.as_ref())
347 .map_err(Details::WriteBytes)?
348 + self.append_marker()?;
349
350 self.buffer.clear();
351 self.num_values = 0;
352
353 self.writer.flush().map_err(Details::FlushWriter)?;
354
355 Ok(num_bytes)
356 }
357
358 pub fn into_inner(mut self) -> AvroResult<W> {
363 self.maybe_write_header()?;
364 self.flush()?;
365
366 let mut this = ManuallyDrop::new(self);
367
368 let _buffer = std::mem::take(&mut this.buffer);
370 let _user_metadata = std::mem::take(&mut this.user_metadata);
371 unsafe { std::ptr::drop_in_place(&mut this.resolved_schema) };
373
374 let writer = unsafe { std::ptr::read(&this.writer) };
376
377 Ok(writer)
378 }
379
380 pub fn get_ref(&self) -> &W {
385 &self.writer
386 }
387
388 pub fn get_mut(&mut self) -> &mut W {
395 &mut self.writer
396 }
397
398 fn append_marker(&mut self) -> AvroResult<usize> {
400 self.writer
403 .write(&self.marker)
404 .map_err(|e| Details::WriteMarker(e).into())
405 }
406
407 fn append_raw(&mut self, value: &Value, schema: &Schema) -> AvroResult<usize> {
409 self.append_bytes(encode_to_vec(value, schema)?.as_ref())
410 }
411
412 fn append_bytes(&mut self, bytes: &[u8]) -> AvroResult<usize> {
414 self.writer
415 .write(bytes)
416 .map_err(|e| Details::WriteBytes(e).into())
417 }
418
419 pub fn add_user_metadata<T: AsRef<[u8]>>(&mut self, key: String, value: T) -> AvroResult<()> {
422 if !self.has_header {
423 if key.starts_with("avro.") {
424 return Err(Details::InvalidMetadataKey(key).into());
425 }
426 self.user_metadata
427 .insert(key, Value::Bytes(value.as_ref().to_vec()));
428 Ok(())
429 } else {
430 Err(Details::FileHeaderAlreadyWritten.into())
431 }
432 }
433
434 fn header(&self) -> Result<Vec<u8>, Error> {
436 let schema_bytes = serde_json::to_string(self.schema)
437 .map_err(Details::ConvertJsonToString)?
438 .into_bytes();
439
440 let mut metadata = HashMap::with_capacity(2);
441 metadata.insert("avro.schema", Value::Bytes(schema_bytes));
442 if self.codec != Codec::Null {
443 metadata.insert("avro.codec", self.codec.into());
444 }
445 match self.codec {
446 #[cfg(feature = "bzip")]
447 Codec::Bzip2(settings) => {
448 metadata.insert(
449 "avro.codec.compression_level",
450 Value::Bytes(vec![settings.compression_level]),
451 );
452 }
453 #[cfg(feature = "xz")]
454 Codec::Xz(settings) => {
455 metadata.insert(
456 "avro.codec.compression_level",
457 Value::Bytes(vec![settings.compression_level]),
458 );
459 }
460 #[cfg(feature = "zstandard")]
461 Codec::Zstandard(settings) => {
462 metadata.insert(
463 "avro.codec.compression_level",
464 Value::Bytes(vec![settings.compression_level]),
465 );
466 }
467 _ => {}
468 }
469
470 for (k, v) in &self.user_metadata {
471 metadata.insert(k.as_str(), v.clone());
472 }
473
474 let mut header = Vec::new();
475 header.extend_from_slice(AVRO_OBJECT_HEADER);
476 encode(&metadata.into(), &Schema::map(Schema::Bytes), &mut header)?;
477 header.extend_from_slice(&self.marker);
478
479 Ok(header)
480 }
481
482 fn maybe_write_header(&mut self) -> AvroResult<usize> {
483 if !self.has_header {
484 let header = self.header()?;
485 let n = self.append_bytes(header.as_ref())?;
486 self.has_header = true;
487 Ok(n)
488 } else {
489 Ok(0)
490 }
491 }
492}
493
494impl<W: Write> Drop for Writer<'_, W> {
495 fn drop(&mut self) {
497 let _ = self.maybe_write_header();
498 let _ = self.flush();
499 }
500}
501
502fn write_avro_datum<T: Into<Value>, W: Write>(
508 schema: &Schema,
509 value: T,
510 writer: &mut W,
511) -> Result<(), Error> {
512 let avro = value.into();
513 if !avro.validate(schema) {
514 return Err(Details::Validation.into());
515 }
516 encode(&avro, schema, writer)?;
517 Ok(())
518}
519
520fn write_avro_datum_schemata<T: Into<Value>>(
521 schema: &Schema,
522 schemata: Vec<&Schema>,
523 value: T,
524 buffer: &mut Vec<u8>,
525) -> AvroResult<usize> {
526 let avro = value.into();
527 let rs = ResolvedSchema::try_from(schemata)?;
528 let names = rs.get_names();
529 let enclosing_namespace = schema.namespace();
530 if let Some(_err) = avro.validate_internal(schema, names, &enclosing_namespace) {
531 return Err(Details::Validation.into());
532 }
533 encode_internal(&avro, schema, names, &enclosing_namespace, buffer)
534}
535
536pub struct GenericSingleObjectWriter {
540 buffer: Vec<u8>,
541 resolved: ResolvedOwnedSchema,
542}
543
544impl GenericSingleObjectWriter {
545 pub fn new_with_capacity(
546 schema: &Schema,
547 initial_buffer_cap: usize,
548 ) -> AvroResult<GenericSingleObjectWriter> {
549 let header_builder = RabinFingerprintHeader::from_schema(schema);
550 Self::new_with_capacity_and_header_builder(schema, initial_buffer_cap, header_builder)
551 }
552
553 pub fn new_with_capacity_and_header_builder<HB: HeaderBuilder>(
554 schema: &Schema,
555 initial_buffer_cap: usize,
556 header_builder: HB,
557 ) -> AvroResult<GenericSingleObjectWriter> {
558 let mut buffer = Vec::with_capacity(initial_buffer_cap);
559 let header = header_builder.build_header();
560 buffer.extend_from_slice(&header);
561
562 Ok(GenericSingleObjectWriter {
563 buffer,
564 resolved: ResolvedOwnedSchema::try_from(schema.clone())?,
565 })
566 }
567
568 const HEADER_LENGTH_RANGE: RangeInclusive<usize> = 10_usize..=20_usize;
569
570 pub fn write_value_ref<W: Write>(&mut self, v: &Value, writer: &mut W) -> AvroResult<usize> {
572 let original_length = self.buffer.len();
573 if !Self::HEADER_LENGTH_RANGE.contains(&original_length) {
574 Err(Details::IllegalSingleObjectWriterState.into())
575 } else {
576 write_value_ref_owned_resolved(&self.resolved, v, &mut self.buffer)?;
577 writer
578 .write_all(&self.buffer)
579 .map_err(Details::WriteBytes)?;
580 let len = self.buffer.len();
581 self.buffer.truncate(original_length);
582 Ok(len)
583 }
584 }
585
586 pub fn write_value<W: Write>(&mut self, v: Value, writer: &mut W) -> AvroResult<usize> {
588 self.write_value_ref(&v, writer)
589 }
590}
591
592pub struct SpecificSingleObjectWriter<T>
594where
595 T: AvroSchema,
596{
597 inner: GenericSingleObjectWriter,
598 schema: Schema,
599 header_written: bool,
600 _model: PhantomData<T>,
601}
602
603impl<T> SpecificSingleObjectWriter<T>
604where
605 T: AvroSchema,
606{
607 pub fn with_capacity(buffer_cap: usize) -> AvroResult<SpecificSingleObjectWriter<T>> {
608 let schema = T::get_schema();
609 Ok(SpecificSingleObjectWriter {
610 inner: GenericSingleObjectWriter::new_with_capacity(&schema, buffer_cap)?,
611 schema,
612 header_written: false,
613 _model: PhantomData,
614 })
615 }
616}
617
618impl<T> SpecificSingleObjectWriter<T>
619where
620 T: AvroSchema + Into<Value>,
621{
622 pub fn write_value<W: Write>(&mut self, data: T, writer: &mut W) -> AvroResult<usize> {
625 let v: Value = data.into();
626 self.inner.write_value_ref(&v, writer)
627 }
628}
629
630impl<T> SpecificSingleObjectWriter<T>
631where
632 T: AvroSchema + Serialize,
633{
634 pub fn write_ref<W: Write>(&mut self, data: &T, writer: &mut W) -> AvroResult<usize> {
637 let mut bytes_written: usize = 0;
638
639 if !self.header_written {
640 bytes_written += writer
641 .write(self.inner.buffer.as_slice())
642 .map_err(Details::WriteBytes)?;
643 self.header_written = true;
644 }
645
646 bytes_written += write_avro_datum_ref(&self.schema, data, writer)?;
647
648 Ok(bytes_written)
649 }
650
651 pub fn write<W: Write>(&mut self, data: T, writer: &mut W) -> AvroResult<usize> {
654 self.write_ref(&data, writer)
655 }
656}
657
658fn write_value_ref_resolved(
659 schema: &Schema,
660 resolved_schema: &ResolvedSchema,
661 value: &Value,
662 buffer: &mut Vec<u8>,
663) -> AvroResult<usize> {
664 match value.validate_internal(schema, resolved_schema.get_names(), &schema.namespace()) {
665 Some(reason) => Err(Details::ValidationWithReason {
666 value: value.clone(),
667 schema: schema.clone(),
668 reason,
669 }
670 .into()),
671 None => encode_internal(
672 value,
673 schema,
674 resolved_schema.get_names(),
675 &schema.namespace(),
676 buffer,
677 ),
678 }
679}
680
681fn write_value_ref_owned_resolved(
682 resolved_schema: &ResolvedOwnedSchema,
683 value: &Value,
684 buffer: &mut Vec<u8>,
685) -> AvroResult<()> {
686 let root_schema = resolved_schema.get_root_schema();
687 if let Some(reason) = value.validate_internal(
688 root_schema,
689 resolved_schema.get_names(),
690 &root_schema.namespace(),
691 ) {
692 return Err(Details::ValidationWithReason {
693 value: value.clone(),
694 schema: root_schema.clone(),
695 reason,
696 }
697 .into());
698 }
699 encode_internal(
700 value,
701 root_schema,
702 resolved_schema.get_names(),
703 &root_schema.namespace(),
704 buffer,
705 )?;
706 Ok(())
707}
708
709pub fn to_avro_datum<T: Into<Value>>(schema: &Schema, value: T) -> AvroResult<Vec<u8>> {
716 let mut buffer = Vec::new();
717 write_avro_datum(schema, value, &mut buffer)?;
718 Ok(buffer)
719}
720
721pub fn write_avro_datum_ref<T: Serialize, W: Write>(
728 schema: &Schema,
729 data: &T,
730 writer: &mut W,
731) -> AvroResult<usize> {
732 let names: HashMap<Name, &Schema> = HashMap::new();
733 let mut serializer = SchemaAwareWriteSerializer::new(writer, schema, &names, None);
734 let bytes_written = data.serialize(&mut serializer)?;
735 Ok(bytes_written)
736}
737
738pub fn to_avro_datum_schemata<T: Into<Value>>(
743 schema: &Schema,
744 schemata: Vec<&Schema>,
745 value: T,
746) -> AvroResult<Vec<u8>> {
747 let mut buffer = Vec::new();
748 write_avro_datum_schemata(schema, schemata, value, &mut buffer)?;
749 Ok(buffer)
750}
751
752#[cfg(not(target_arch = "wasm32"))]
753fn generate_sync_marker() -> [u8; 16] {
754 let mut marker = [0_u8; 16];
755 std::iter::repeat_with(rand::random)
756 .take(16)
757 .enumerate()
758 .for_each(|(i, n)| marker[i] = n);
759 marker
760}
761
762#[cfg(target_arch = "wasm32")]
763fn generate_sync_marker() -> [u8; 16] {
764 let mut marker = [0_u8; 16];
765 std::iter::repeat_with(quad_rand::rand)
766 .take(4)
767 .flat_map(|i| i.to_be_bytes())
768 .enumerate()
769 .for_each(|(i, n)| marker[i] = n);
770 marker
771}
772
773#[cfg(test)]
774mod tests {
775 use std::{cell::RefCell, rc::Rc};
776
777 use super::*;
778 use crate::{
779 Reader,
780 decimal::Decimal,
781 duration::{Days, Duration, Millis, Months},
782 headers::GlueSchemaUuidHeader,
783 rabin::Rabin,
784 schema::{DecimalSchema, FixedSchema, Name},
785 types::Record,
786 util::zig_i64,
787 };
788 use pretty_assertions::assert_eq;
789 use serde::{Deserialize, Serialize};
790 use uuid::Uuid;
791
792 use crate::schema::InnerDecimalSchema;
793 use crate::{codec::DeflateSettings, error::Details};
794 use apache_avro_test_helper::TestResult;
795
796 const AVRO_OBJECT_HEADER_LEN: usize = AVRO_OBJECT_HEADER.len();
797
798 const SCHEMA: &str = r#"
799 {
800 "type": "record",
801 "name": "test",
802 "fields": [
803 {
804 "name": "a",
805 "type": "long",
806 "default": 42
807 },
808 {
809 "name": "b",
810 "type": "string"
811 }
812 ]
813 }
814 "#;
815
816 const UNION_SCHEMA: &str = r#"["null", "long"]"#;
817
818 #[test]
819 fn test_to_avro_datum() -> TestResult {
820 let schema = Schema::parse_str(SCHEMA)?;
821 let mut record = Record::new(&schema).unwrap();
822 record.put("a", 27i64);
823 record.put("b", "foo");
824
825 let mut expected = Vec::new();
826 zig_i64(27, &mut expected)?;
827 zig_i64(3, &mut expected)?;
828 expected.extend([b'f', b'o', b'o']);
829
830 assert_eq!(to_avro_datum(&schema, record)?, expected);
831
832 Ok(())
833 }
834
835 #[test]
836 fn avro_rs_193_write_avro_datum_ref() -> TestResult {
837 #[derive(Serialize)]
838 struct TestStruct {
839 a: i64,
840 b: String,
841 }
842
843 let schema = Schema::parse_str(SCHEMA)?;
844 let mut writer: Vec<u8> = Vec::new();
845 let data = TestStruct {
846 a: 27,
847 b: "foo".to_string(),
848 };
849
850 let mut expected = Vec::new();
851 zig_i64(27, &mut expected)?;
852 zig_i64(3, &mut expected)?;
853 expected.extend([b'f', b'o', b'o']);
854
855 let bytes = write_avro_datum_ref(&schema, &data, &mut writer)?;
856
857 assert_eq!(bytes, expected.len());
858 assert_eq!(writer, expected);
859
860 Ok(())
861 }
862
863 #[test]
864 fn avro_rs_220_flush_write_header() -> TestResult {
865 let schema = Schema::parse_str(SCHEMA)?;
866
867 let mut writer = Writer::new(&schema, Vec::new())?;
869 writer.flush()?;
870 let result = writer.into_inner()?;
871 assert_eq!(result.len(), 147);
872
873 let mut writer = Writer::builder()
875 .has_header(true)
876 .schema(&schema)
877 .writer(Vec::new())
878 .build()?;
879 writer.flush()?;
880 let result = writer.into_inner()?;
881 assert_eq!(result.len(), 0);
882
883 Ok(())
884 }
885
886 #[test]
887 fn test_union_not_null() -> TestResult {
888 let schema = Schema::parse_str(UNION_SCHEMA)?;
889 let union = Value::Union(1, Box::new(Value::Long(3)));
890
891 let mut expected = Vec::new();
892 zig_i64(1, &mut expected)?;
893 zig_i64(3, &mut expected)?;
894
895 assert_eq!(to_avro_datum(&schema, union)?, expected);
896
897 Ok(())
898 }
899
900 #[test]
901 fn test_union_null() -> TestResult {
902 let schema = Schema::parse_str(UNION_SCHEMA)?;
903 let union = Value::Union(0, Box::new(Value::Null));
904
905 let mut expected = Vec::new();
906 zig_i64(0, &mut expected)?;
907
908 assert_eq!(to_avro_datum(&schema, union)?, expected);
909
910 Ok(())
911 }
912
913 fn logical_type_test<T: Into<Value> + Clone>(
914 schema_str: &'static str,
915
916 expected_schema: &Schema,
917 value: Value,
918
919 raw_schema: &Schema,
920 raw_value: T,
921 ) -> TestResult {
922 let schema = Schema::parse_str(schema_str)?;
923 assert_eq!(&schema, expected_schema);
924 let ser = to_avro_datum(&schema, value.clone())?;
926 let raw_ser = to_avro_datum(raw_schema, raw_value)?;
927 assert_eq!(ser, raw_ser);
928
929 let mut r = ser.as_slice();
931 let de = crate::from_avro_datum(&schema, &mut r, None)?;
932 assert_eq!(de, value);
933 Ok(())
934 }
935
936 #[test]
937 fn date() -> TestResult {
938 logical_type_test(
939 r#"{"type": "int", "logicalType": "date"}"#,
940 &Schema::Date,
941 Value::Date(1_i32),
942 &Schema::Int,
943 1_i32,
944 )
945 }
946
947 #[test]
948 fn time_millis() -> TestResult {
949 logical_type_test(
950 r#"{"type": "int", "logicalType": "time-millis"}"#,
951 &Schema::TimeMillis,
952 Value::TimeMillis(1_i32),
953 &Schema::Int,
954 1_i32,
955 )
956 }
957
958 #[test]
959 fn time_micros() -> TestResult {
960 logical_type_test(
961 r#"{"type": "long", "logicalType": "time-micros"}"#,
962 &Schema::TimeMicros,
963 Value::TimeMicros(1_i64),
964 &Schema::Long,
965 1_i64,
966 )
967 }
968
969 #[test]
970 fn timestamp_millis() -> TestResult {
971 logical_type_test(
972 r#"{"type": "long", "logicalType": "timestamp-millis"}"#,
973 &Schema::TimestampMillis,
974 Value::TimestampMillis(1_i64),
975 &Schema::Long,
976 1_i64,
977 )
978 }
979
980 #[test]
981 fn timestamp_micros() -> TestResult {
982 logical_type_test(
983 r#"{"type": "long", "logicalType": "timestamp-micros"}"#,
984 &Schema::TimestampMicros,
985 Value::TimestampMicros(1_i64),
986 &Schema::Long,
987 1_i64,
988 )
989 }
990
991 #[test]
992 fn decimal_fixed() -> TestResult {
993 let size = 30;
994 let fixed = FixedSchema {
995 name: Name::new("decimal")?,
996 aliases: None,
997 doc: None,
998 size,
999 default: None,
1000 attributes: Default::default(),
1001 };
1002 let inner = InnerDecimalSchema::Fixed(fixed.clone());
1003 let value = vec![0u8; size];
1004 logical_type_test(
1005 r#"{"type": {"type": "fixed", "size": 30, "name": "decimal"}, "logicalType": "decimal", "precision": 20, "scale": 5}"#,
1006 &Schema::Decimal(DecimalSchema {
1007 precision: 20,
1008 scale: 5,
1009 inner,
1010 }),
1011 Value::Decimal(Decimal::from(value.clone())),
1012 &Schema::Fixed(fixed),
1013 Value::Fixed(size, value),
1014 )
1015 }
1016
1017 #[test]
1018 fn decimal_bytes() -> TestResult {
1019 let value = vec![0u8; 10];
1020 logical_type_test(
1021 r#"{"type": "bytes", "logicalType": "decimal", "precision": 4, "scale": 3}"#,
1022 &Schema::Decimal(DecimalSchema {
1023 precision: 4,
1024 scale: 3,
1025 inner: InnerDecimalSchema::Bytes,
1026 }),
1027 Value::Decimal(Decimal::from(value.clone())),
1028 &Schema::Bytes,
1029 value,
1030 )
1031 }
1032
1033 #[test]
1034 fn duration() -> TestResult {
1035 let inner = Schema::Fixed(FixedSchema {
1036 name: Name::new("duration")?,
1037 aliases: None,
1038 doc: None,
1039 size: 12,
1040 default: None,
1041 attributes: Default::default(),
1042 });
1043 let value = Value::Duration(Duration::new(
1044 Months::new(256),
1045 Days::new(512),
1046 Millis::new(1024),
1047 ));
1048 logical_type_test(
1049 r#"{"type": {"type": "fixed", "name": "duration", "size": 12}, "logicalType": "duration"}"#,
1050 &Schema::Duration(FixedSchema {
1051 name: Name::from("duration"),
1052 aliases: None,
1053 doc: None,
1054 size: 12,
1055 default: None,
1056 attributes: Default::default(),
1057 }),
1058 value,
1059 &inner,
1060 Value::Fixed(12, vec![0, 1, 0, 0, 0, 2, 0, 0, 0, 4, 0, 0]),
1061 )
1062 }
1063
1064 #[test]
1065 fn test_writer_append() -> TestResult {
1066 let schema = Schema::parse_str(SCHEMA)?;
1067 let mut writer = Writer::new(&schema, Vec::new())?;
1068
1069 let mut record = Record::new(&schema).unwrap();
1070 record.put("a", 27i64);
1071 record.put("b", "foo");
1072
1073 let n1 = writer.append(record.clone())?;
1074 let n2 = writer.append(record.clone())?;
1075 let n3 = writer.flush()?;
1076 let result = writer.into_inner()?;
1077
1078 assert_eq!(n1 + n2 + n3, result.len());
1079
1080 let mut data = Vec::new();
1081 zig_i64(27, &mut data)?;
1082 zig_i64(3, &mut data)?;
1083 data.extend(b"foo");
1084 data.extend(data.clone());
1085
1086 assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1088 let last_data_byte = result.len() - 16;
1090 assert_eq!(
1091 &result[last_data_byte - data.len()..last_data_byte],
1092 data.as_slice()
1093 );
1094
1095 Ok(())
1096 }
1097
1098 #[test]
1099 fn test_writer_extend() -> TestResult {
1100 let schema = Schema::parse_str(SCHEMA)?;
1101 let mut writer = Writer::new(&schema, Vec::new())?;
1102
1103 let mut record = Record::new(&schema).unwrap();
1104 record.put("a", 27i64);
1105 record.put("b", "foo");
1106 let record_copy = record.clone();
1107 let records = vec![record, record_copy];
1108
1109 let n1 = writer.extend(records)?;
1110 let n2 = writer.flush()?;
1111 let result = writer.into_inner()?;
1112
1113 assert_eq!(n1 + n2, result.len());
1114
1115 let mut data = Vec::new();
1116 zig_i64(27, &mut data)?;
1117 zig_i64(3, &mut data)?;
1118 data.extend(b"foo");
1119 data.extend(data.clone());
1120
1121 assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1123 let last_data_byte = result.len() - 16;
1125 assert_eq!(
1126 &result[last_data_byte - data.len()..last_data_byte],
1127 data.as_slice()
1128 );
1129
1130 Ok(())
1131 }
1132
1133 #[derive(Debug, Clone, Deserialize, Serialize)]
1134 struct TestSerdeSerialize {
1135 a: i64,
1136 b: String,
1137 }
1138
1139 #[test]
1140 fn test_writer_append_ser() -> TestResult {
1141 let schema = Schema::parse_str(SCHEMA)?;
1142 let mut writer = Writer::new(&schema, Vec::new())?;
1143
1144 let record = TestSerdeSerialize {
1145 a: 27,
1146 b: "foo".to_owned(),
1147 };
1148
1149 let n1 = writer.append_ser(record)?;
1150 let n2 = writer.flush()?;
1151 let result = writer.into_inner()?;
1152
1153 assert_eq!(n1 + n2, result.len());
1154
1155 let mut data = Vec::new();
1156 zig_i64(27, &mut data)?;
1157 zig_i64(3, &mut data)?;
1158 data.extend(b"foo");
1159
1160 assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1162 let last_data_byte = result.len() - 16;
1164 assert_eq!(
1165 &result[last_data_byte - data.len()..last_data_byte],
1166 data.as_slice()
1167 );
1168
1169 Ok(())
1170 }
1171
1172 #[test]
1173 fn test_writer_extend_ser() -> TestResult {
1174 let schema = Schema::parse_str(SCHEMA)?;
1175 let mut writer = Writer::new(&schema, Vec::new())?;
1176
1177 let record = TestSerdeSerialize {
1178 a: 27,
1179 b: "foo".to_owned(),
1180 };
1181 let record_copy = record.clone();
1182 let records = vec![record, record_copy];
1183
1184 let n1 = writer.extend_ser(records)?;
1185 let n2 = writer.flush()?;
1186 let result = writer.into_inner()?;
1187
1188 assert_eq!(n1 + n2, result.len());
1189
1190 let mut data = Vec::new();
1191 zig_i64(27, &mut data)?;
1192 zig_i64(3, &mut data)?;
1193 data.extend(b"foo");
1194 data.extend(data.clone());
1195
1196 assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1198 let last_data_byte = result.len() - 16;
1200 assert_eq!(
1201 &result[last_data_byte - data.len()..last_data_byte],
1202 data.as_slice()
1203 );
1204
1205 Ok(())
1206 }
1207
1208 fn make_writer_with_codec(schema: &Schema) -> AvroResult<Writer<'_, Vec<u8>>> {
1209 Writer::with_codec(
1210 schema,
1211 Vec::new(),
1212 Codec::Deflate(DeflateSettings::default()),
1213 )
1214 }
1215
1216 fn make_writer_with_builder(schema: &Schema) -> AvroResult<Writer<'_, Vec<u8>>> {
1217 Writer::builder()
1218 .writer(Vec::new())
1219 .schema(schema)
1220 .codec(Codec::Deflate(DeflateSettings::default()))
1221 .block_size(100)
1222 .build()
1223 }
1224
1225 fn check_writer(mut writer: Writer<'_, Vec<u8>>, schema: &Schema) -> TestResult {
1226 let mut record = Record::new(schema).unwrap();
1227 record.put("a", 27i64);
1228 record.put("b", "foo");
1229
1230 let n1 = writer.append(record.clone())?;
1231 let n2 = writer.append(record.clone())?;
1232 let n3 = writer.flush()?;
1233 let result = writer.into_inner()?;
1234
1235 assert_eq!(n1 + n2 + n3, result.len());
1236
1237 let mut data = Vec::new();
1238 zig_i64(27, &mut data)?;
1239 zig_i64(3, &mut data)?;
1240 data.extend(b"foo");
1241 data.extend(data.clone());
1242 Codec::Deflate(DeflateSettings::default()).compress(&mut data)?;
1243
1244 assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1246 let last_data_byte = result.len() - 16;
1248 assert_eq!(
1249 &result[last_data_byte - data.len()..last_data_byte],
1250 data.as_slice()
1251 );
1252
1253 Ok(())
1254 }
1255
1256 #[test]
1257 fn test_writer_with_codec() -> TestResult {
1258 let schema = Schema::parse_str(SCHEMA)?;
1259 let writer = make_writer_with_codec(&schema)?;
1260 check_writer(writer, &schema)
1261 }
1262
1263 #[test]
1264 fn test_writer_with_builder() -> TestResult {
1265 let schema = Schema::parse_str(SCHEMA)?;
1266 let writer = make_writer_with_builder(&schema)?;
1267 check_writer(writer, &schema)
1268 }
1269
1270 #[test]
1271 fn test_logical_writer() -> TestResult {
1272 const LOGICAL_TYPE_SCHEMA: &str = r#"
1273 {
1274 "type": "record",
1275 "name": "logical_type_test",
1276 "fields": [
1277 {
1278 "name": "a",
1279 "type": [
1280 "null",
1281 {
1282 "type": "long",
1283 "logicalType": "timestamp-micros"
1284 }
1285 ]
1286 }
1287 ]
1288 }
1289 "#;
1290 let codec = Codec::Deflate(DeflateSettings::default());
1291 let schema = Schema::parse_str(LOGICAL_TYPE_SCHEMA)?;
1292 let mut writer = Writer::builder()
1293 .schema(&schema)
1294 .codec(codec)
1295 .writer(Vec::new())
1296 .build()?;
1297
1298 let mut record1 = Record::new(&schema).unwrap();
1299 record1.put(
1300 "a",
1301 Value::Union(1, Box::new(Value::TimestampMicros(1234_i64))),
1302 );
1303
1304 let mut record2 = Record::new(&schema).unwrap();
1305 record2.put("a", Value::Union(0, Box::new(Value::Null)));
1306
1307 let n1 = writer.append(record1)?;
1308 let n2 = writer.append(record2)?;
1309 let n3 = writer.flush()?;
1310 let result = writer.into_inner()?;
1311
1312 assert_eq!(n1 + n2 + n3, result.len());
1313
1314 let mut data = Vec::new();
1315 zig_i64(1, &mut data)?;
1317 zig_i64(1234, &mut data)?;
1318
1319 zig_i64(0, &mut data)?;
1321 codec.compress(&mut data)?;
1322
1323 assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1325 let last_data_byte = result.len() - 16;
1327 assert_eq!(
1328 &result[last_data_byte - data.len()..last_data_byte],
1329 data.as_slice()
1330 );
1331
1332 Ok(())
1333 }
1334
1335 #[test]
1336 fn test_avro_3405_writer_add_metadata_success() -> TestResult {
1337 let schema = Schema::parse_str(SCHEMA)?;
1338 let mut writer = Writer::new(&schema, Vec::new())?;
1339
1340 writer.add_user_metadata("stringKey".to_string(), String::from("stringValue"))?;
1341 writer.add_user_metadata("strKey".to_string(), "strValue")?;
1342 writer.add_user_metadata("bytesKey".to_string(), b"bytesValue")?;
1343 writer.add_user_metadata("vecKey".to_string(), vec![1, 2, 3])?;
1344
1345 let mut record = Record::new(&schema).unwrap();
1346 record.put("a", 27i64);
1347 record.put("b", "foo");
1348
1349 writer.append(record.clone())?;
1350 writer.append(record.clone())?;
1351 writer.flush()?;
1352 let result = writer.into_inner()?;
1353
1354 assert_eq!(result.len(), 244);
1355
1356 Ok(())
1357 }
1358
1359 #[test]
1360 fn test_avro_3881_metadata_empty_body() -> TestResult {
1361 let schema = Schema::parse_str(SCHEMA)?;
1362 let mut writer = Writer::new(&schema, Vec::new())?;
1363 writer.add_user_metadata("a".to_string(), "b")?;
1364 let result = writer.into_inner()?;
1365
1366 let reader = Reader::with_schema(&schema, &result[..])?;
1367 let mut expected = HashMap::new();
1368 expected.insert("a".to_string(), vec![b'b']);
1369 assert_eq!(reader.user_metadata(), &expected);
1370 assert_eq!(reader.into_iter().count(), 0);
1371
1372 Ok(())
1373 }
1374
1375 #[test]
1376 fn test_avro_3405_writer_add_metadata_failure() -> TestResult {
1377 let schema = Schema::parse_str(SCHEMA)?;
1378 let mut writer = Writer::new(&schema, Vec::new())?;
1379
1380 let mut record = Record::new(&schema).unwrap();
1381 record.put("a", 27i64);
1382 record.put("b", "foo");
1383 writer.append(record.clone())?;
1384
1385 match writer
1386 .add_user_metadata("stringKey".to_string(), String::from("value2"))
1387 .map_err(Error::into_details)
1388 {
1389 Err(e @ Details::FileHeaderAlreadyWritten) => {
1390 assert_eq!(e.to_string(), "The file metadata is already flushed.")
1391 }
1392 Err(e) => panic!("Unexpected error occurred while writing user metadata: {e:?}"),
1393 Ok(_) => panic!("Expected an error that metadata cannot be added after adding data"),
1394 }
1395
1396 Ok(())
1397 }
1398
1399 #[test]
1400 fn test_avro_3405_writer_add_metadata_reserved_prefix_failure() -> TestResult {
1401 let schema = Schema::parse_str(SCHEMA)?;
1402 let mut writer = Writer::new(&schema, Vec::new())?;
1403
1404 let key = "avro.stringKey".to_string();
1405 match writer
1406 .add_user_metadata(key.clone(), "value")
1407 .map_err(Error::into_details)
1408 {
1409 Err(ref e @ Details::InvalidMetadataKey(_)) => {
1410 assert_eq!(
1411 e.to_string(),
1412 format!(
1413 "Metadata keys starting with 'avro.' are reserved for internal usage: {key}."
1414 )
1415 )
1416 }
1417 Err(e) => panic!(
1418 "Unexpected error occurred while writing user metadata with reserved prefix ('avro.'): {e:?}"
1419 ),
1420 Ok(_) => {
1421 panic!("Expected an error that the metadata key cannot be prefixed with 'avro.'")
1422 }
1423 }
1424
1425 Ok(())
1426 }
1427
1428 #[test]
1429 fn test_avro_3405_writer_add_metadata_with_builder_api_success() -> TestResult {
1430 let schema = Schema::parse_str(SCHEMA)?;
1431
1432 let mut user_meta_data: HashMap<String, Value> = HashMap::new();
1433 user_meta_data.insert(
1434 "stringKey".to_string(),
1435 Value::String("stringValue".to_string()),
1436 );
1437 user_meta_data.insert("bytesKey".to_string(), Value::Bytes(b"bytesValue".to_vec()));
1438 user_meta_data.insert("vecKey".to_string(), Value::Bytes(vec![1, 2, 3]));
1439
1440 let writer: Writer<'_, Vec<u8>> = Writer::builder()
1441 .writer(Vec::new())
1442 .schema(&schema)
1443 .user_metadata(user_meta_data.clone())
1444 .build()?;
1445
1446 assert_eq!(writer.user_metadata, user_meta_data);
1447
1448 Ok(())
1449 }
1450
1451 #[derive(Serialize, Clone)]
1452 struct TestSingleObjectWriter {
1453 a: i64,
1454 b: f64,
1455 c: Vec<String>,
1456 }
1457
1458 impl AvroSchema for TestSingleObjectWriter {
1459 fn get_schema() -> Schema {
1460 let schema = r#"
1461 {
1462 "type":"record",
1463 "name":"TestSingleObjectWrtierSerialize",
1464 "fields":[
1465 {
1466 "name":"a",
1467 "type":"long"
1468 },
1469 {
1470 "name":"b",
1471 "type":"double"
1472 },
1473 {
1474 "name":"c",
1475 "type":{
1476 "type":"array",
1477 "items":"string"
1478 }
1479 }
1480 ]
1481 }
1482 "#;
1483 Schema::parse_str(schema).unwrap()
1484 }
1485 }
1486
1487 impl From<TestSingleObjectWriter> for Value {
1488 fn from(obj: TestSingleObjectWriter) -> Value {
1489 Value::Record(vec![
1490 ("a".into(), obj.a.into()),
1491 ("b".into(), obj.b.into()),
1492 (
1493 "c".into(),
1494 Value::Array(obj.c.into_iter().map(|s| s.into()).collect()),
1495 ),
1496 ])
1497 }
1498 }
1499
1500 #[test]
1501 fn test_single_object_writer() -> TestResult {
1502 let mut buf: Vec<u8> = Vec::new();
1503 let obj = TestSingleObjectWriter {
1504 a: 300,
1505 b: 34.555,
1506 c: vec!["cat".into(), "dog".into()],
1507 };
1508 let mut writer = GenericSingleObjectWriter::new_with_capacity(
1509 &TestSingleObjectWriter::get_schema(),
1510 1024,
1511 )
1512 .expect("Should resolve schema");
1513 let value = obj.into();
1514 let written_bytes = writer
1515 .write_value_ref(&value, &mut buf)
1516 .expect("Error serializing properly");
1517
1518 assert!(buf.len() > 10, "no bytes written");
1519 assert_eq!(buf.len(), written_bytes);
1520 assert_eq!(buf[0], 0xC3);
1521 assert_eq!(buf[1], 0x01);
1522 assert_eq!(
1523 &buf[2..10],
1524 &TestSingleObjectWriter::get_schema()
1525 .fingerprint::<Rabin>()
1526 .bytes[..]
1527 );
1528 let mut msg_binary = Vec::new();
1529 encode(
1530 &value,
1531 &TestSingleObjectWriter::get_schema(),
1532 &mut msg_binary,
1533 )
1534 .expect("encode should have failed by here as a dependency of any writing");
1535 assert_eq!(&buf[10..], &msg_binary[..]);
1536
1537 Ok(())
1538 }
1539
1540 #[test]
1541 fn test_single_object_writer_with_header_builder() -> TestResult {
1542 let mut buf: Vec<u8> = Vec::new();
1543 let obj = TestSingleObjectWriter {
1544 a: 300,
1545 b: 34.555,
1546 c: vec!["cat".into(), "dog".into()],
1547 };
1548 let schema_uuid = Uuid::parse_str("b2f1cf00-0434-013e-439a-125eb8485a5f")?;
1549 let header_builder = GlueSchemaUuidHeader::from_uuid(schema_uuid);
1550 let mut writer = GenericSingleObjectWriter::new_with_capacity_and_header_builder(
1551 &TestSingleObjectWriter::get_schema(),
1552 1024,
1553 header_builder,
1554 )
1555 .expect("Should resolve schema");
1556 let value = obj.into();
1557 writer
1558 .write_value_ref(&value, &mut buf)
1559 .expect("Error serializing properly");
1560
1561 assert_eq!(buf[0], 0x03);
1562 assert_eq!(buf[1], 0x00);
1563 assert_eq!(buf[2..18], schema_uuid.into_bytes()[..]);
1564 Ok(())
1565 }
1566
1567 #[test]
1568 fn test_writer_parity() -> TestResult {
1569 let obj1 = TestSingleObjectWriter {
1570 a: 300,
1571 b: 34.555,
1572 c: vec!["cat".into(), "dog".into()],
1573 };
1574
1575 let mut buf1: Vec<u8> = Vec::new();
1576 let mut buf2: Vec<u8> = Vec::new();
1577 let mut buf3: Vec<u8> = Vec::new();
1578
1579 let mut generic_writer = GenericSingleObjectWriter::new_with_capacity(
1580 &TestSingleObjectWriter::get_schema(),
1581 1024,
1582 )
1583 .expect("Should resolve schema");
1584 let mut specific_writer =
1585 SpecificSingleObjectWriter::<TestSingleObjectWriter>::with_capacity(1024)
1586 .expect("Resolved should pass");
1587 specific_writer
1588 .write(obj1.clone(), &mut buf1)
1589 .expect("Serialization expected");
1590 specific_writer
1591 .write_value(obj1.clone(), &mut buf2)
1592 .expect("Serialization expected");
1593 generic_writer
1594 .write_value(obj1.into(), &mut buf3)
1595 .expect("Serialization expected");
1596 assert_eq!(buf1, buf2);
1597 assert_eq!(buf1, buf3);
1598
1599 Ok(())
1600 }
1601
1602 #[test]
1603 fn avro_3894_take_aliases_into_account_when_serializing() -> TestResult {
1604 const SCHEMA: &str = r#"
1605 {
1606 "type": "record",
1607 "name": "Conference",
1608 "fields": [
1609 {"type": "string", "name": "name"},
1610 {"type": ["null", "long"], "name": "date", "aliases" : [ "time2", "time" ]}
1611 ]
1612 }"#;
1613
1614 #[derive(Debug, PartialEq, Eq, Clone, Serialize)]
1615 pub struct Conference {
1616 pub name: String,
1617 pub time: Option<i64>,
1618 }
1619
1620 let conf = Conference {
1621 name: "RustConf".to_string(),
1622 time: Some(1234567890),
1623 };
1624
1625 let schema = Schema::parse_str(SCHEMA)?;
1626 let mut writer = Writer::new(&schema, Vec::new())?;
1627
1628 let bytes = writer.append_ser(conf)?;
1629
1630 assert_eq!(182, bytes);
1631
1632 Ok(())
1633 }
1634
1635 #[test]
1636 fn avro_4014_validation_returns_a_detailed_error() -> TestResult {
1637 const SCHEMA: &str = r#"
1638 {
1639 "type": "record",
1640 "name": "Conference",
1641 "fields": [
1642 {"type": "string", "name": "name"},
1643 {"type": ["null", "long"], "name": "date", "aliases" : [ "time2", "time" ]}
1644 ]
1645 }"#;
1646
1647 #[derive(Debug, PartialEq, Clone, Serialize)]
1648 pub struct Conference {
1649 pub name: String,
1650 pub time: Option<f64>, }
1652
1653 let conf = Conference {
1654 name: "RustConf".to_string(),
1655 time: Some(12345678.90),
1656 };
1657
1658 let schema = Schema::parse_str(SCHEMA)?;
1659 let mut writer = Writer::new(&schema, Vec::new())?;
1660
1661 match writer.append_ser(conf) {
1662 Ok(bytes) => panic!("Expected an error, but got {bytes} bytes written"),
1663 Err(e) => {
1664 assert_eq!(
1665 e.to_string(),
1666 r#"Failed to serialize field 'time' for record Record(RecordSchema { name: Name { name: "Conference", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "name", doc: None, aliases: None, default: None, schema: String, order: Ascending, position: 0, custom_attributes: {} }, RecordField { name: "date", doc: None, aliases: Some(["time2", "time"]), default: None, schema: Union(UnionSchema { schemas: [Null, Long], variant_index: {Null: 0, Long: 1} }), order: Ascending, position: 1, custom_attributes: {} }], lookup: {"date": 1, "name": 0, "time": 1, "time2": 1}, attributes: {} }): Failed to serialize value of type f64 using schema Union(UnionSchema { schemas: [Null, Long], variant_index: {Null: 0, Long: 1} }): 12345678.9. Cause: Cannot find a Double schema in [Null, Long]"#
1667 );
1668 }
1669 }
1670 Ok(())
1671 }
1672
1673 #[test]
1674 fn avro_4063_flush_applies_to_inner_writer() -> TestResult {
1675 const SCHEMA: &str = r#"
1676 {
1677 "type": "record",
1678 "name": "ExampleSchema",
1679 "fields": [
1680 {"name": "exampleField", "type": "string"}
1681 ]
1682 }
1683 "#;
1684
1685 #[derive(Clone, Default)]
1686 struct TestBuffer(Rc<RefCell<Vec<u8>>>);
1687
1688 impl TestBuffer {
1689 fn len(&self) -> usize {
1690 self.0.borrow().len()
1691 }
1692 }
1693
1694 impl Write for TestBuffer {
1695 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
1696 self.0.borrow_mut().write(buf)
1697 }
1698
1699 fn flush(&mut self) -> std::io::Result<()> {
1700 Ok(())
1701 }
1702 }
1703
1704 let shared_buffer = TestBuffer::default();
1705
1706 let buffered_writer = std::io::BufWriter::new(shared_buffer.clone());
1707
1708 let schema = Schema::parse_str(SCHEMA)?;
1709
1710 let mut writer = Writer::new(&schema, buffered_writer)?;
1711
1712 let mut record = Record::new(writer.schema()).unwrap();
1713 record.put("exampleField", "value");
1714
1715 writer.append(record)?;
1716 writer.flush()?;
1717
1718 assert_eq!(
1719 shared_buffer.len(),
1720 151,
1721 "the test buffer was not fully written to after Writer::flush was called"
1722 );
1723
1724 Ok(())
1725 }
1726}