1use crate::{
20 AvroResult, Codec, Error,
21 encode::{encode, encode_internal, encode_to_vec},
22 error::Details,
23 headers::{HeaderBuilder, RabinFingerprintHeader},
24 schema::{AvroSchema, Name, ResolvedOwnedSchema, ResolvedSchema, Schema},
25 serde::ser_schema::SchemaAwareWriteSerializer,
26 types::Value,
27};
28use serde::Serialize;
29use std::{
30 collections::HashMap, io::Write, marker::PhantomData, mem::ManuallyDrop, ops::RangeInclusive,
31};
32
33const DEFAULT_BLOCK_SIZE: usize = 16000;
34const AVRO_OBJECT_HEADER: &[u8] = b"Obj\x01";
35
36pub struct Writer<'a, W: Write> {
42 schema: &'a Schema,
43 writer: W,
44 resolved_schema: ResolvedSchema<'a>,
45 codec: Codec,
46 block_size: usize,
47 buffer: Vec<u8>,
48 num_values: usize,
49 marker: [u8; 16],
50 has_header: bool,
51 user_metadata: HashMap<String, Value>,
52}
53
54#[bon::bon]
55impl<'a, W: Write> Writer<'a, W> {
56 #[builder]
57 pub fn builder(
58 schema: &'a Schema,
59 schemata: Option<Vec<&'a Schema>>,
60 writer: W,
61 #[builder(default = Codec::Null)] codec: Codec,
62 #[builder(default = DEFAULT_BLOCK_SIZE)] block_size: usize,
63 #[builder(default = generate_sync_marker())] marker: [u8; 16],
64 #[builder(default = false)]
68 has_header: bool,
69 #[builder(default)] user_metadata: HashMap<String, Value>,
70 ) -> AvroResult<Self> {
71 let resolved_schema = if let Some(schemata) = schemata {
72 ResolvedSchema::try_from(schemata)?
73 } else {
74 ResolvedSchema::try_from(schema)?
75 };
76 Ok(Self {
77 schema,
78 writer,
79 resolved_schema,
80 codec,
81 block_size,
82 buffer: Vec::with_capacity(block_size),
83 num_values: 0,
84 marker,
85 has_header,
86 user_metadata,
87 })
88 }
89}
90
91impl<'a, W: Write> Writer<'a, W> {
92 pub fn new(schema: &'a Schema, writer: W) -> AvroResult<Self> {
96 Writer::with_codec(schema, writer, Codec::Null)
97 }
98
99 pub fn with_codec(schema: &'a Schema, writer: W, codec: Codec) -> AvroResult<Self> {
102 Self::builder()
103 .schema(schema)
104 .writer(writer)
105 .codec(codec)
106 .build()
107 }
108
109 pub fn with_schemata(
114 schema: &'a Schema,
115 schemata: Vec<&'a Schema>,
116 writer: W,
117 codec: Codec,
118 ) -> AvroResult<Self> {
119 Self::builder()
120 .schema(schema)
121 .schemata(schemata)
122 .writer(writer)
123 .codec(codec)
124 .build()
125 }
126
127 pub fn append_to(schema: &'a Schema, writer: W, marker: [u8; 16]) -> AvroResult<Self> {
131 Writer::append_to_with_codec(schema, writer, Codec::Null, marker)
132 }
133
134 pub fn append_to_with_codec(
137 schema: &'a Schema,
138 writer: W,
139 codec: Codec,
140 marker: [u8; 16],
141 ) -> AvroResult<Self> {
142 Self::builder()
143 .schema(schema)
144 .writer(writer)
145 .codec(codec)
146 .marker(marker)
147 .has_header(true)
148 .build()
149 }
150
151 pub fn append_to_with_codec_schemata(
154 schema: &'a Schema,
155 schemata: Vec<&'a Schema>,
156 writer: W,
157 codec: Codec,
158 marker: [u8; 16],
159 ) -> AvroResult<Self> {
160 Self::builder()
161 .schema(schema)
162 .schemata(schemata)
163 .writer(writer)
164 .codec(codec)
165 .marker(marker)
166 .has_header(true)
167 .build()
168 }
169
170 pub fn schema(&self) -> &'a Schema {
172 self.schema
173 }
174
175 pub fn append<T: Into<Value>>(&mut self, value: T) -> AvroResult<usize> {
184 let n = self.maybe_write_header()?;
185
186 let avro = value.into();
187 self.append_value_ref(&avro).map(|m| m + n)
188 }
189
190 pub fn append_value_ref(&mut self, value: &Value) -> AvroResult<usize> {
198 let n = self.maybe_write_header()?;
199
200 write_value_ref_resolved(self.schema, &self.resolved_schema, value, &mut self.buffer)?;
201 self.num_values += 1;
202
203 if self.buffer.len() >= self.block_size {
204 return self.flush().map(|b| b + n);
205 }
206
207 Ok(n)
208 }
209
210 pub fn append_ser<S: Serialize>(&mut self, value: S) -> AvroResult<usize> {
220 let n = self.maybe_write_header()?;
221
222 let mut serializer = SchemaAwareWriteSerializer::new(
223 &mut self.buffer,
224 self.schema,
225 self.resolved_schema.get_names(),
226 None,
227 );
228 value.serialize(&mut serializer)?;
229 self.num_values += 1;
230
231 if self.buffer.len() >= self.block_size {
232 return self.flush().map(|b| b + n);
233 }
234
235 Ok(n)
236 }
237
238 pub fn extend<I, T: Into<Value>>(&mut self, values: I) -> AvroResult<usize>
246 where
247 I: IntoIterator<Item = T>,
248 {
249 let mut num_bytes = 0;
264 for value in values {
265 num_bytes += self.append(value)?;
266 }
267 num_bytes += self.flush()?;
268
269 Ok(num_bytes)
270 }
271
272 pub fn extend_ser<I, T: Serialize>(&mut self, values: I) -> AvroResult<usize>
281 where
282 I: IntoIterator<Item = T>,
283 {
284 let mut num_bytes = 0;
299 for value in values {
300 num_bytes += self.append_ser(value)?;
301 }
302 num_bytes += self.flush()?;
303
304 Ok(num_bytes)
305 }
306
307 pub fn extend_from_slice(&mut self, values: &[Value]) -> AvroResult<usize> {
315 let mut num_bytes = 0;
316 for value in values {
317 num_bytes += self.append_value_ref(value)?;
318 }
319 num_bytes += self.flush()?;
320
321 Ok(num_bytes)
322 }
323
324 pub fn flush(&mut self) -> AvroResult<usize> {
332 let mut num_bytes = self.maybe_write_header()?;
333 if self.num_values == 0 {
334 return Ok(num_bytes);
335 }
336
337 self.codec.compress(&mut self.buffer)?;
338
339 let num_values = self.num_values;
340 let stream_len = self.buffer.len();
341
342 num_bytes += self.append_raw(&num_values.into(), &Schema::Long)?
343 + self.append_raw(&stream_len.into(), &Schema::Long)?
344 + self
345 .writer
346 .write(self.buffer.as_ref())
347 .map_err(Details::WriteBytes)?
348 + self.append_marker()?;
349
350 self.buffer.clear();
351 self.num_values = 0;
352
353 self.writer.flush().map_err(Details::FlushWriter)?;
354
355 Ok(num_bytes)
356 }
357
358 pub fn into_inner(mut self) -> AvroResult<W> {
363 self.maybe_write_header()?;
364 self.flush()?;
365
366 let mut this = ManuallyDrop::new(self);
367
368 let _buffer = std::mem::take(&mut this.buffer);
370 let _user_metadata = std::mem::take(&mut this.user_metadata);
371 unsafe { std::ptr::drop_in_place(&mut this.resolved_schema) };
373
374 let writer = unsafe { std::ptr::read(&this.writer) };
376
377 Ok(writer)
378 }
379
380 pub fn get_ref(&self) -> &W {
385 &self.writer
386 }
387
388 pub fn get_mut(&mut self) -> &mut W {
395 &mut self.writer
396 }
397
398 fn append_marker(&mut self) -> AvroResult<usize> {
400 self.writer
403 .write(&self.marker)
404 .map_err(|e| Details::WriteMarker(e).into())
405 }
406
407 fn append_raw(&mut self, value: &Value, schema: &Schema) -> AvroResult<usize> {
409 self.append_bytes(encode_to_vec(value, schema)?.as_ref())
410 }
411
412 fn append_bytes(&mut self, bytes: &[u8]) -> AvroResult<usize> {
414 self.writer
415 .write(bytes)
416 .map_err(|e| Details::WriteBytes(e).into())
417 }
418
419 pub fn add_user_metadata<T: AsRef<[u8]>>(&mut self, key: String, value: T) -> AvroResult<()> {
422 if !self.has_header {
423 if key.starts_with("avro.") {
424 return Err(Details::InvalidMetadataKey(key).into());
425 }
426 self.user_metadata
427 .insert(key, Value::Bytes(value.as_ref().to_vec()));
428 Ok(())
429 } else {
430 Err(Details::FileHeaderAlreadyWritten.into())
431 }
432 }
433
434 fn header(&self) -> Result<Vec<u8>, Error> {
436 let schema_bytes = serde_json::to_string(self.schema)
437 .map_err(Details::ConvertJsonToString)?
438 .into_bytes();
439
440 let mut metadata = HashMap::with_capacity(2);
441 metadata.insert("avro.schema", Value::Bytes(schema_bytes));
442 if self.codec != Codec::Null {
443 metadata.insert("avro.codec", self.codec.into());
444 }
445 match self.codec {
446 #[cfg(feature = "bzip")]
447 Codec::Bzip2(settings) => {
448 metadata.insert(
449 "avro.codec.compression_level",
450 Value::Bytes(vec![settings.compression_level]),
451 );
452 }
453 #[cfg(feature = "xz")]
454 Codec::Xz(settings) => {
455 metadata.insert(
456 "avro.codec.compression_level",
457 Value::Bytes(vec![settings.compression_level]),
458 );
459 }
460 #[cfg(feature = "zstandard")]
461 Codec::Zstandard(settings) => {
462 metadata.insert(
463 "avro.codec.compression_level",
464 Value::Bytes(vec![settings.compression_level]),
465 );
466 }
467 _ => {}
468 }
469
470 for (k, v) in &self.user_metadata {
471 metadata.insert(k.as_str(), v.clone());
472 }
473
474 let mut header = Vec::new();
475 header.extend_from_slice(AVRO_OBJECT_HEADER);
476 encode(&metadata.into(), &Schema::map(Schema::Bytes), &mut header)?;
477 header.extend_from_slice(&self.marker);
478
479 Ok(header)
480 }
481
482 fn maybe_write_header(&mut self) -> AvroResult<usize> {
483 if !self.has_header {
484 let header = self.header()?;
485 let n = self.append_bytes(header.as_ref())?;
486 self.has_header = true;
487 Ok(n)
488 } else {
489 Ok(0)
490 }
491 }
492}
493
494impl<W: Write> Drop for Writer<'_, W> {
495 fn drop(&mut self) {
497 let _ = self.maybe_write_header();
498 let _ = self.flush();
499 }
500}
501
502fn write_avro_datum<T: Into<Value>, W: Write>(
508 schema: &Schema,
509 value: T,
510 writer: &mut W,
511) -> Result<(), Error> {
512 let avro = value.into();
513 if !avro.validate(schema) {
514 return Err(Details::Validation.into());
515 }
516 encode(&avro, schema, writer)?;
517 Ok(())
518}
519
520fn write_avro_datum_schemata<T: Into<Value>>(
521 schema: &Schema,
522 schemata: Vec<&Schema>,
523 value: T,
524 buffer: &mut Vec<u8>,
525) -> AvroResult<usize> {
526 let avro = value.into();
527 let rs = ResolvedSchema::try_from(schemata)?;
528 let names = rs.get_names();
529 let enclosing_namespace = schema.namespace();
530 if let Some(_err) = avro.validate_internal(schema, names, &enclosing_namespace) {
531 return Err(Details::Validation.into());
532 }
533 encode_internal(&avro, schema, names, &enclosing_namespace, buffer)
534}
535
536pub struct GenericSingleObjectWriter {
540 buffer: Vec<u8>,
541 resolved: ResolvedOwnedSchema,
542}
543
544impl GenericSingleObjectWriter {
545 pub fn new_with_capacity(
546 schema: &Schema,
547 initial_buffer_cap: usize,
548 ) -> AvroResult<GenericSingleObjectWriter> {
549 let header_builder = RabinFingerprintHeader::from_schema(schema);
550 Self::new_with_capacity_and_header_builder(schema, initial_buffer_cap, header_builder)
551 }
552
553 pub fn new_with_capacity_and_header_builder<HB: HeaderBuilder>(
554 schema: &Schema,
555 initial_buffer_cap: usize,
556 header_builder: HB,
557 ) -> AvroResult<GenericSingleObjectWriter> {
558 let mut buffer = Vec::with_capacity(initial_buffer_cap);
559 let header = header_builder.build_header();
560 buffer.extend_from_slice(&header);
561
562 Ok(GenericSingleObjectWriter {
563 buffer,
564 resolved: ResolvedOwnedSchema::try_from(schema.clone())?,
565 })
566 }
567
568 const HEADER_LENGTH_RANGE: RangeInclusive<usize> = 10_usize..=20_usize;
569
570 pub fn write_value_ref<W: Write>(&mut self, v: &Value, writer: &mut W) -> AvroResult<usize> {
572 let original_length = self.buffer.len();
573 if !Self::HEADER_LENGTH_RANGE.contains(&original_length) {
574 Err(Details::IllegalSingleObjectWriterState.into())
575 } else {
576 write_value_ref_owned_resolved(&self.resolved, v, &mut self.buffer)?;
577 writer
578 .write_all(&self.buffer)
579 .map_err(Details::WriteBytes)?;
580 let len = self.buffer.len();
581 self.buffer.truncate(original_length);
582 Ok(len)
583 }
584 }
585
586 pub fn write_value<W: Write>(&mut self, v: Value, writer: &mut W) -> AvroResult<usize> {
588 self.write_value_ref(&v, writer)
589 }
590}
591
592pub struct SpecificSingleObjectWriter<T>
594where
595 T: AvroSchema,
596{
597 inner: GenericSingleObjectWriter,
598 schema: Schema,
599 header_written: bool,
600 _model: PhantomData<T>,
601}
602
603impl<T> SpecificSingleObjectWriter<T>
604where
605 T: AvroSchema,
606{
607 pub fn with_capacity(buffer_cap: usize) -> AvroResult<SpecificSingleObjectWriter<T>> {
608 let schema = T::get_schema();
609 Ok(SpecificSingleObjectWriter {
610 inner: GenericSingleObjectWriter::new_with_capacity(&schema, buffer_cap)?,
611 schema,
612 header_written: false,
613 _model: PhantomData,
614 })
615 }
616}
617
618impl<T> SpecificSingleObjectWriter<T>
619where
620 T: AvroSchema + Into<Value>,
621{
622 pub fn write_value<W: Write>(&mut self, data: T, writer: &mut W) -> AvroResult<usize> {
625 let v: Value = data.into();
626 self.inner.write_value_ref(&v, writer)
627 }
628}
629
630impl<T> SpecificSingleObjectWriter<T>
631where
632 T: AvroSchema + Serialize,
633{
634 pub fn write_ref<W: Write>(&mut self, data: &T, writer: &mut W) -> AvroResult<usize> {
637 let mut bytes_written: usize = 0;
638
639 if !self.header_written {
640 bytes_written += writer
641 .write(self.inner.buffer.as_slice())
642 .map_err(Details::WriteBytes)?;
643 self.header_written = true;
644 }
645
646 bytes_written += write_avro_datum_ref(&self.schema, data, writer)?;
647
648 Ok(bytes_written)
649 }
650
651 pub fn write<W: Write>(&mut self, data: T, writer: &mut W) -> AvroResult<usize> {
654 self.write_ref(&data, writer)
655 }
656}
657
658fn write_value_ref_resolved(
659 schema: &Schema,
660 resolved_schema: &ResolvedSchema,
661 value: &Value,
662 buffer: &mut Vec<u8>,
663) -> AvroResult<usize> {
664 match value.validate_internal(schema, resolved_schema.get_names(), &schema.namespace()) {
665 Some(reason) => Err(Details::ValidationWithReason {
666 value: value.clone(),
667 schema: schema.clone(),
668 reason,
669 }
670 .into()),
671 None => encode_internal(
672 value,
673 schema,
674 resolved_schema.get_names(),
675 &schema.namespace(),
676 buffer,
677 ),
678 }
679}
680
681fn write_value_ref_owned_resolved(
682 resolved_schema: &ResolvedOwnedSchema,
683 value: &Value,
684 buffer: &mut Vec<u8>,
685) -> AvroResult<()> {
686 let root_schema = resolved_schema.get_root_schema();
687 if let Some(reason) = value.validate_internal(
688 root_schema,
689 resolved_schema.get_names(),
690 &root_schema.namespace(),
691 ) {
692 return Err(Details::ValidationWithReason {
693 value: value.clone(),
694 schema: root_schema.clone(),
695 reason,
696 }
697 .into());
698 }
699 encode_internal(
700 value,
701 root_schema,
702 resolved_schema.get_names(),
703 &root_schema.namespace(),
704 buffer,
705 )?;
706 Ok(())
707}
708
709pub fn to_avro_datum<T: Into<Value>>(schema: &Schema, value: T) -> AvroResult<Vec<u8>> {
716 let mut buffer = Vec::new();
717 write_avro_datum(schema, value, &mut buffer)?;
718 Ok(buffer)
719}
720
721pub fn write_avro_datum_ref<T: Serialize, W: Write>(
728 schema: &Schema,
729 data: &T,
730 writer: &mut W,
731) -> AvroResult<usize> {
732 let names: HashMap<Name, &Schema> = HashMap::new();
733 let mut serializer = SchemaAwareWriteSerializer::new(writer, schema, &names, None);
734 let bytes_written = data.serialize(&mut serializer)?;
735 Ok(bytes_written)
736}
737
738pub fn to_avro_datum_schemata<T: Into<Value>>(
743 schema: &Schema,
744 schemata: Vec<&Schema>,
745 value: T,
746) -> AvroResult<Vec<u8>> {
747 let mut buffer = Vec::new();
748 write_avro_datum_schemata(schema, schemata, value, &mut buffer)?;
749 Ok(buffer)
750}
751
752#[cfg(not(target_arch = "wasm32"))]
753fn generate_sync_marker() -> [u8; 16] {
754 let mut marker = [0_u8; 16];
755 std::iter::repeat_with(rand::random)
756 .take(16)
757 .enumerate()
758 .for_each(|(i, n)| marker[i] = n);
759 marker
760}
761
762#[cfg(target_arch = "wasm32")]
763fn generate_sync_marker() -> [u8; 16] {
764 let mut marker = [0_u8; 16];
765 std::iter::repeat_with(quad_rand::rand)
766 .take(4)
767 .flat_map(|i| i.to_be_bytes())
768 .enumerate()
769 .for_each(|(i, n)| marker[i] = n);
770 marker
771}
772
773#[cfg(test)]
774mod tests {
775 use std::{cell::RefCell, rc::Rc};
776
777 use super::*;
778 use crate::{
779 Reader,
780 decimal::Decimal,
781 duration::{Days, Duration, Millis, Months},
782 headers::GlueSchemaUuidHeader,
783 rabin::Rabin,
784 schema::{DecimalSchema, FixedSchema, Name},
785 types::Record,
786 util::zig_i64,
787 };
788 use pretty_assertions::assert_eq;
789 use serde::{Deserialize, Serialize};
790 use uuid::Uuid;
791
792 use crate::schema::InnerDecimalSchema;
793 use crate::{codec::DeflateSettings, error::Details};
794 use apache_avro_test_helper::TestResult;
795
796 const AVRO_OBJECT_HEADER_LEN: usize = AVRO_OBJECT_HEADER.len();
797
798 const SCHEMA: &str = r#"
799 {
800 "type": "record",
801 "name": "test",
802 "fields": [
803 {
804 "name": "a",
805 "type": "long",
806 "default": 42
807 },
808 {
809 "name": "b",
810 "type": "string"
811 }
812 ]
813 }
814 "#;
815
816 const UNION_SCHEMA: &str = r#"["null", "long"]"#;
817
818 #[test]
819 fn test_to_avro_datum() -> TestResult {
820 let schema = Schema::parse_str(SCHEMA)?;
821 let mut record = Record::new(&schema).unwrap();
822 record.put("a", 27i64);
823 record.put("b", "foo");
824
825 let mut expected = Vec::new();
826 zig_i64(27, &mut expected)?;
827 zig_i64(3, &mut expected)?;
828 expected.extend([b'f', b'o', b'o']);
829
830 assert_eq!(to_avro_datum(&schema, record)?, expected);
831
832 Ok(())
833 }
834
835 #[test]
836 fn avro_rs_193_write_avro_datum_ref() -> TestResult {
837 #[derive(Serialize)]
838 struct TestStruct {
839 a: i64,
840 b: String,
841 }
842
843 let schema = Schema::parse_str(SCHEMA)?;
844 let mut writer: Vec<u8> = Vec::new();
845 let data = TestStruct {
846 a: 27,
847 b: "foo".to_string(),
848 };
849
850 let mut expected = Vec::new();
851 zig_i64(27, &mut expected)?;
852 zig_i64(3, &mut expected)?;
853 expected.extend([b'f', b'o', b'o']);
854
855 let bytes = write_avro_datum_ref(&schema, &data, &mut writer)?;
856
857 assert_eq!(bytes, expected.len());
858 assert_eq!(writer, expected);
859
860 Ok(())
861 }
862
863 #[test]
864 fn avro_rs_220_flush_write_header() -> TestResult {
865 let schema = Schema::parse_str(SCHEMA)?;
866
867 let mut writer = Writer::new(&schema, Vec::new())?;
869 writer.flush()?;
870 let result = writer.into_inner()?;
871 assert_eq!(result.len(), 147);
872
873 let mut writer = Writer::builder()
875 .has_header(true)
876 .schema(&schema)
877 .writer(Vec::new())
878 .build()?;
879 writer.flush()?;
880 let result = writer.into_inner()?;
881 assert_eq!(result.len(), 0);
882
883 Ok(())
884 }
885
886 #[test]
887 fn test_union_not_null() -> TestResult {
888 let schema = Schema::parse_str(UNION_SCHEMA)?;
889 let union = Value::Union(1, Box::new(Value::Long(3)));
890
891 let mut expected = Vec::new();
892 zig_i64(1, &mut expected)?;
893 zig_i64(3, &mut expected)?;
894
895 assert_eq!(to_avro_datum(&schema, union)?, expected);
896
897 Ok(())
898 }
899
900 #[test]
901 fn test_union_null() -> TestResult {
902 let schema = Schema::parse_str(UNION_SCHEMA)?;
903 let union = Value::Union(0, Box::new(Value::Null));
904
905 let mut expected = Vec::new();
906 zig_i64(0, &mut expected)?;
907
908 assert_eq!(to_avro_datum(&schema, union)?, expected);
909
910 Ok(())
911 }
912
913 fn logical_type_test<T: Into<Value> + Clone>(
914 schema_str: &'static str,
915
916 expected_schema: &Schema,
917 value: Value,
918
919 raw_schema: &Schema,
920 raw_value: T,
921 ) -> TestResult {
922 let schema = Schema::parse_str(schema_str)?;
923 assert_eq!(&schema, expected_schema);
924 let ser = to_avro_datum(&schema, value.clone())?;
926 let raw_ser = to_avro_datum(raw_schema, raw_value)?;
927 assert_eq!(ser, raw_ser);
928
929 let mut r = ser.as_slice();
931 let de = crate::from_avro_datum(&schema, &mut r, None)?;
932 assert_eq!(de, value);
933 Ok(())
934 }
935
936 #[test]
937 fn date() -> TestResult {
938 logical_type_test(
939 r#"{"type": "int", "logicalType": "date"}"#,
940 &Schema::Date,
941 Value::Date(1_i32),
942 &Schema::Int,
943 1_i32,
944 )
945 }
946
947 #[test]
948 fn time_millis() -> TestResult {
949 logical_type_test(
950 r#"{"type": "int", "logicalType": "time-millis"}"#,
951 &Schema::TimeMillis,
952 Value::TimeMillis(1_i32),
953 &Schema::Int,
954 1_i32,
955 )
956 }
957
958 #[test]
959 fn time_micros() -> TestResult {
960 logical_type_test(
961 r#"{"type": "long", "logicalType": "time-micros"}"#,
962 &Schema::TimeMicros,
963 Value::TimeMicros(1_i64),
964 &Schema::Long,
965 1_i64,
966 )
967 }
968
969 #[test]
970 fn timestamp_millis() -> TestResult {
971 logical_type_test(
972 r#"{"type": "long", "logicalType": "timestamp-millis"}"#,
973 &Schema::TimestampMillis,
974 Value::TimestampMillis(1_i64),
975 &Schema::Long,
976 1_i64,
977 )
978 }
979
980 #[test]
981 fn timestamp_micros() -> TestResult {
982 logical_type_test(
983 r#"{"type": "long", "logicalType": "timestamp-micros"}"#,
984 &Schema::TimestampMicros,
985 Value::TimestampMicros(1_i64),
986 &Schema::Long,
987 1_i64,
988 )
989 }
990
991 #[test]
992 fn decimal_fixed() -> TestResult {
993 let size = 30;
994 let fixed = FixedSchema {
995 name: Name::new("decimal")?,
996 aliases: None,
997 doc: None,
998 size,
999 default: None,
1000 attributes: Default::default(),
1001 };
1002 let inner = InnerDecimalSchema::Fixed(fixed.clone());
1003 let value = vec![0u8; size];
1004 logical_type_test(
1005 r#"{"type": {"type": "fixed", "size": 30, "name": "decimal"}, "logicalType": "decimal", "precision": 20, "scale": 5}"#,
1006 &Schema::Decimal(DecimalSchema {
1007 precision: 20,
1008 scale: 5,
1009 inner,
1010 }),
1011 Value::Decimal(Decimal::from(value.clone())),
1012 &Schema::Fixed(fixed),
1013 Value::Fixed(size, value),
1014 )
1015 }
1016
1017 #[test]
1018 fn decimal_bytes() -> TestResult {
1019 let value = vec![0u8; 10];
1020 logical_type_test(
1021 r#"{"type": "bytes", "logicalType": "decimal", "precision": 4, "scale": 3}"#,
1022 &Schema::Decimal(DecimalSchema {
1023 precision: 4,
1024 scale: 3,
1025 inner: InnerDecimalSchema::Bytes,
1026 }),
1027 Value::Decimal(Decimal::from(value.clone())),
1028 &Schema::Bytes,
1029 value,
1030 )
1031 }
1032
1033 #[test]
1034 fn duration() -> TestResult {
1035 let inner = Schema::Fixed(FixedSchema {
1036 name: Name::new("duration")?,
1037 aliases: None,
1038 doc: None,
1039 size: 12,
1040 default: None,
1041 attributes: Default::default(),
1042 });
1043 let value = Value::Duration(Duration::new(
1044 Months::new(256),
1045 Days::new(512),
1046 Millis::new(1024),
1047 ));
1048 logical_type_test(
1049 r#"{"type": {"type": "fixed", "name": "duration", "size": 12}, "logicalType": "duration"}"#,
1050 &Schema::Duration,
1051 value,
1052 &inner,
1053 Value::Fixed(12, vec![0, 1, 0, 0, 0, 2, 0, 0, 0, 4, 0, 0]),
1054 )
1055 }
1056
1057 #[test]
1058 fn test_writer_append() -> TestResult {
1059 let schema = Schema::parse_str(SCHEMA)?;
1060 let mut writer = Writer::new(&schema, Vec::new())?;
1061
1062 let mut record = Record::new(&schema).unwrap();
1063 record.put("a", 27i64);
1064 record.put("b", "foo");
1065
1066 let n1 = writer.append(record.clone())?;
1067 let n2 = writer.append(record.clone())?;
1068 let n3 = writer.flush()?;
1069 let result = writer.into_inner()?;
1070
1071 assert_eq!(n1 + n2 + n3, result.len());
1072
1073 let mut data = Vec::new();
1074 zig_i64(27, &mut data)?;
1075 zig_i64(3, &mut data)?;
1076 data.extend(b"foo");
1077 data.extend(data.clone());
1078
1079 assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1081 let last_data_byte = result.len() - 16;
1083 assert_eq!(
1084 &result[last_data_byte - data.len()..last_data_byte],
1085 data.as_slice()
1086 );
1087
1088 Ok(())
1089 }
1090
1091 #[test]
1092 fn test_writer_extend() -> TestResult {
1093 let schema = Schema::parse_str(SCHEMA)?;
1094 let mut writer = Writer::new(&schema, Vec::new())?;
1095
1096 let mut record = Record::new(&schema).unwrap();
1097 record.put("a", 27i64);
1098 record.put("b", "foo");
1099 let record_copy = record.clone();
1100 let records = vec![record, record_copy];
1101
1102 let n1 = writer.extend(records)?;
1103 let n2 = writer.flush()?;
1104 let result = writer.into_inner()?;
1105
1106 assert_eq!(n1 + n2, result.len());
1107
1108 let mut data = Vec::new();
1109 zig_i64(27, &mut data)?;
1110 zig_i64(3, &mut data)?;
1111 data.extend(b"foo");
1112 data.extend(data.clone());
1113
1114 assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1116 let last_data_byte = result.len() - 16;
1118 assert_eq!(
1119 &result[last_data_byte - data.len()..last_data_byte],
1120 data.as_slice()
1121 );
1122
1123 Ok(())
1124 }
1125
1126 #[derive(Debug, Clone, Deserialize, Serialize)]
1127 struct TestSerdeSerialize {
1128 a: i64,
1129 b: String,
1130 }
1131
1132 #[test]
1133 fn test_writer_append_ser() -> TestResult {
1134 let schema = Schema::parse_str(SCHEMA)?;
1135 let mut writer = Writer::new(&schema, Vec::new())?;
1136
1137 let record = TestSerdeSerialize {
1138 a: 27,
1139 b: "foo".to_owned(),
1140 };
1141
1142 let n1 = writer.append_ser(record)?;
1143 let n2 = writer.flush()?;
1144 let result = writer.into_inner()?;
1145
1146 assert_eq!(n1 + n2, result.len());
1147
1148 let mut data = Vec::new();
1149 zig_i64(27, &mut data)?;
1150 zig_i64(3, &mut data)?;
1151 data.extend(b"foo");
1152
1153 assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1155 let last_data_byte = result.len() - 16;
1157 assert_eq!(
1158 &result[last_data_byte - data.len()..last_data_byte],
1159 data.as_slice()
1160 );
1161
1162 Ok(())
1163 }
1164
1165 #[test]
1166 fn test_writer_extend_ser() -> TestResult {
1167 let schema = Schema::parse_str(SCHEMA)?;
1168 let mut writer = Writer::new(&schema, Vec::new())?;
1169
1170 let record = TestSerdeSerialize {
1171 a: 27,
1172 b: "foo".to_owned(),
1173 };
1174 let record_copy = record.clone();
1175 let records = vec![record, record_copy];
1176
1177 let n1 = writer.extend_ser(records)?;
1178 let n2 = writer.flush()?;
1179 let result = writer.into_inner()?;
1180
1181 assert_eq!(n1 + n2, result.len());
1182
1183 let mut data = Vec::new();
1184 zig_i64(27, &mut data)?;
1185 zig_i64(3, &mut data)?;
1186 data.extend(b"foo");
1187 data.extend(data.clone());
1188
1189 assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1191 let last_data_byte = result.len() - 16;
1193 assert_eq!(
1194 &result[last_data_byte - data.len()..last_data_byte],
1195 data.as_slice()
1196 );
1197
1198 Ok(())
1199 }
1200
1201 fn make_writer_with_codec(schema: &Schema) -> AvroResult<Writer<'_, Vec<u8>>> {
1202 Writer::with_codec(
1203 schema,
1204 Vec::new(),
1205 Codec::Deflate(DeflateSettings::default()),
1206 )
1207 }
1208
1209 fn make_writer_with_builder(schema: &Schema) -> AvroResult<Writer<'_, Vec<u8>>> {
1210 Writer::builder()
1211 .writer(Vec::new())
1212 .schema(schema)
1213 .codec(Codec::Deflate(DeflateSettings::default()))
1214 .block_size(100)
1215 .build()
1216 }
1217
1218 fn check_writer(mut writer: Writer<'_, Vec<u8>>, schema: &Schema) -> TestResult {
1219 let mut record = Record::new(schema).unwrap();
1220 record.put("a", 27i64);
1221 record.put("b", "foo");
1222
1223 let n1 = writer.append(record.clone())?;
1224 let n2 = writer.append(record.clone())?;
1225 let n3 = writer.flush()?;
1226 let result = writer.into_inner()?;
1227
1228 assert_eq!(n1 + n2 + n3, result.len());
1229
1230 let mut data = Vec::new();
1231 zig_i64(27, &mut data)?;
1232 zig_i64(3, &mut data)?;
1233 data.extend(b"foo");
1234 data.extend(data.clone());
1235 Codec::Deflate(DeflateSettings::default()).compress(&mut data)?;
1236
1237 assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1239 let last_data_byte = result.len() - 16;
1241 assert_eq!(
1242 &result[last_data_byte - data.len()..last_data_byte],
1243 data.as_slice()
1244 );
1245
1246 Ok(())
1247 }
1248
1249 #[test]
1250 fn test_writer_with_codec() -> TestResult {
1251 let schema = Schema::parse_str(SCHEMA)?;
1252 let writer = make_writer_with_codec(&schema)?;
1253 check_writer(writer, &schema)
1254 }
1255
1256 #[test]
1257 fn test_writer_with_builder() -> TestResult {
1258 let schema = Schema::parse_str(SCHEMA)?;
1259 let writer = make_writer_with_builder(&schema)?;
1260 check_writer(writer, &schema)
1261 }
1262
1263 #[test]
1264 fn test_logical_writer() -> TestResult {
1265 const LOGICAL_TYPE_SCHEMA: &str = r#"
1266 {
1267 "type": "record",
1268 "name": "logical_type_test",
1269 "fields": [
1270 {
1271 "name": "a",
1272 "type": [
1273 "null",
1274 {
1275 "type": "long",
1276 "logicalType": "timestamp-micros"
1277 }
1278 ]
1279 }
1280 ]
1281 }
1282 "#;
1283 let codec = Codec::Deflate(DeflateSettings::default());
1284 let schema = Schema::parse_str(LOGICAL_TYPE_SCHEMA)?;
1285 let mut writer = Writer::builder()
1286 .schema(&schema)
1287 .codec(codec)
1288 .writer(Vec::new())
1289 .build()?;
1290
1291 let mut record1 = Record::new(&schema).unwrap();
1292 record1.put(
1293 "a",
1294 Value::Union(1, Box::new(Value::TimestampMicros(1234_i64))),
1295 );
1296
1297 let mut record2 = Record::new(&schema).unwrap();
1298 record2.put("a", Value::Union(0, Box::new(Value::Null)));
1299
1300 let n1 = writer.append(record1)?;
1301 let n2 = writer.append(record2)?;
1302 let n3 = writer.flush()?;
1303 let result = writer.into_inner()?;
1304
1305 assert_eq!(n1 + n2 + n3, result.len());
1306
1307 let mut data = Vec::new();
1308 zig_i64(1, &mut data)?;
1310 zig_i64(1234, &mut data)?;
1311
1312 zig_i64(0, &mut data)?;
1314 codec.compress(&mut data)?;
1315
1316 assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1318 let last_data_byte = result.len() - 16;
1320 assert_eq!(
1321 &result[last_data_byte - data.len()..last_data_byte],
1322 data.as_slice()
1323 );
1324
1325 Ok(())
1326 }
1327
1328 #[test]
1329 fn test_avro_3405_writer_add_metadata_success() -> TestResult {
1330 let schema = Schema::parse_str(SCHEMA)?;
1331 let mut writer = Writer::new(&schema, Vec::new())?;
1332
1333 writer.add_user_metadata("stringKey".to_string(), String::from("stringValue"))?;
1334 writer.add_user_metadata("strKey".to_string(), "strValue")?;
1335 writer.add_user_metadata("bytesKey".to_string(), b"bytesValue")?;
1336 writer.add_user_metadata("vecKey".to_string(), vec![1, 2, 3])?;
1337
1338 let mut record = Record::new(&schema).unwrap();
1339 record.put("a", 27i64);
1340 record.put("b", "foo");
1341
1342 writer.append(record.clone())?;
1343 writer.append(record.clone())?;
1344 writer.flush()?;
1345 let result = writer.into_inner()?;
1346
1347 assert_eq!(result.len(), 244);
1348
1349 Ok(())
1350 }
1351
1352 #[test]
1353 fn test_avro_3881_metadata_empty_body() -> TestResult {
1354 let schema = Schema::parse_str(SCHEMA)?;
1355 let mut writer = Writer::new(&schema, Vec::new())?;
1356 writer.add_user_metadata("a".to_string(), "b")?;
1357 let result = writer.into_inner()?;
1358
1359 let reader = Reader::with_schema(&schema, &result[..])?;
1360 let mut expected = HashMap::new();
1361 expected.insert("a".to_string(), vec![b'b']);
1362 assert_eq!(reader.user_metadata(), &expected);
1363 assert_eq!(reader.into_iter().count(), 0);
1364
1365 Ok(())
1366 }
1367
1368 #[test]
1369 fn test_avro_3405_writer_add_metadata_failure() -> TestResult {
1370 let schema = Schema::parse_str(SCHEMA)?;
1371 let mut writer = Writer::new(&schema, Vec::new())?;
1372
1373 let mut record = Record::new(&schema).unwrap();
1374 record.put("a", 27i64);
1375 record.put("b", "foo");
1376 writer.append(record.clone())?;
1377
1378 match writer
1379 .add_user_metadata("stringKey".to_string(), String::from("value2"))
1380 .map_err(Error::into_details)
1381 {
1382 Err(e @ Details::FileHeaderAlreadyWritten) => {
1383 assert_eq!(e.to_string(), "The file metadata is already flushed.")
1384 }
1385 Err(e) => panic!("Unexpected error occurred while writing user metadata: {e:?}"),
1386 Ok(_) => panic!("Expected an error that metadata cannot be added after adding data"),
1387 }
1388
1389 Ok(())
1390 }
1391
1392 #[test]
1393 fn test_avro_3405_writer_add_metadata_reserved_prefix_failure() -> TestResult {
1394 let schema = Schema::parse_str(SCHEMA)?;
1395 let mut writer = Writer::new(&schema, Vec::new())?;
1396
1397 let key = "avro.stringKey".to_string();
1398 match writer
1399 .add_user_metadata(key.clone(), "value")
1400 .map_err(Error::into_details)
1401 {
1402 Err(ref e @ Details::InvalidMetadataKey(_)) => {
1403 assert_eq!(
1404 e.to_string(),
1405 format!(
1406 "Metadata keys starting with 'avro.' are reserved for internal usage: {key}."
1407 )
1408 )
1409 }
1410 Err(e) => panic!(
1411 "Unexpected error occurred while writing user metadata with reserved prefix ('avro.'): {e:?}"
1412 ),
1413 Ok(_) => {
1414 panic!("Expected an error that the metadata key cannot be prefixed with 'avro.'")
1415 }
1416 }
1417
1418 Ok(())
1419 }
1420
1421 #[test]
1422 fn test_avro_3405_writer_add_metadata_with_builder_api_success() -> TestResult {
1423 let schema = Schema::parse_str(SCHEMA)?;
1424
1425 let mut user_meta_data: HashMap<String, Value> = HashMap::new();
1426 user_meta_data.insert(
1427 "stringKey".to_string(),
1428 Value::String("stringValue".to_string()),
1429 );
1430 user_meta_data.insert("bytesKey".to_string(), Value::Bytes(b"bytesValue".to_vec()));
1431 user_meta_data.insert("vecKey".to_string(), Value::Bytes(vec![1, 2, 3]));
1432
1433 let writer: Writer<'_, Vec<u8>> = Writer::builder()
1434 .writer(Vec::new())
1435 .schema(&schema)
1436 .user_metadata(user_meta_data.clone())
1437 .build()?;
1438
1439 assert_eq!(writer.user_metadata, user_meta_data);
1440
1441 Ok(())
1442 }
1443
1444 #[derive(Serialize, Clone)]
1445 struct TestSingleObjectWriter {
1446 a: i64,
1447 b: f64,
1448 c: Vec<String>,
1449 }
1450
1451 impl AvroSchema for TestSingleObjectWriter {
1452 fn get_schema() -> Schema {
1453 let schema = r#"
1454 {
1455 "type":"record",
1456 "name":"TestSingleObjectWrtierSerialize",
1457 "fields":[
1458 {
1459 "name":"a",
1460 "type":"long"
1461 },
1462 {
1463 "name":"b",
1464 "type":"double"
1465 },
1466 {
1467 "name":"c",
1468 "type":{
1469 "type":"array",
1470 "items":"string"
1471 }
1472 }
1473 ]
1474 }
1475 "#;
1476 Schema::parse_str(schema).unwrap()
1477 }
1478 }
1479
1480 impl From<TestSingleObjectWriter> for Value {
1481 fn from(obj: TestSingleObjectWriter) -> Value {
1482 Value::Record(vec![
1483 ("a".into(), obj.a.into()),
1484 ("b".into(), obj.b.into()),
1485 (
1486 "c".into(),
1487 Value::Array(obj.c.into_iter().map(|s| s.into()).collect()),
1488 ),
1489 ])
1490 }
1491 }
1492
1493 #[test]
1494 fn test_single_object_writer() -> TestResult {
1495 let mut buf: Vec<u8> = Vec::new();
1496 let obj = TestSingleObjectWriter {
1497 a: 300,
1498 b: 34.555,
1499 c: vec!["cat".into(), "dog".into()],
1500 };
1501 let mut writer = GenericSingleObjectWriter::new_with_capacity(
1502 &TestSingleObjectWriter::get_schema(),
1503 1024,
1504 )
1505 .expect("Should resolve schema");
1506 let value = obj.into();
1507 let written_bytes = writer
1508 .write_value_ref(&value, &mut buf)
1509 .expect("Error serializing properly");
1510
1511 assert!(buf.len() > 10, "no bytes written");
1512 assert_eq!(buf.len(), written_bytes);
1513 assert_eq!(buf[0], 0xC3);
1514 assert_eq!(buf[1], 0x01);
1515 assert_eq!(
1516 &buf[2..10],
1517 &TestSingleObjectWriter::get_schema()
1518 .fingerprint::<Rabin>()
1519 .bytes[..]
1520 );
1521 let mut msg_binary = Vec::new();
1522 encode(
1523 &value,
1524 &TestSingleObjectWriter::get_schema(),
1525 &mut msg_binary,
1526 )
1527 .expect("encode should have failed by here as a dependency of any writing");
1528 assert_eq!(&buf[10..], &msg_binary[..]);
1529
1530 Ok(())
1531 }
1532
1533 #[test]
1534 fn test_single_object_writer_with_header_builder() -> TestResult {
1535 let mut buf: Vec<u8> = Vec::new();
1536 let obj = TestSingleObjectWriter {
1537 a: 300,
1538 b: 34.555,
1539 c: vec!["cat".into(), "dog".into()],
1540 };
1541 let schema_uuid = Uuid::parse_str("b2f1cf00-0434-013e-439a-125eb8485a5f")?;
1542 let header_builder = GlueSchemaUuidHeader::from_uuid(schema_uuid);
1543 let mut writer = GenericSingleObjectWriter::new_with_capacity_and_header_builder(
1544 &TestSingleObjectWriter::get_schema(),
1545 1024,
1546 header_builder,
1547 )
1548 .expect("Should resolve schema");
1549 let value = obj.into();
1550 writer
1551 .write_value_ref(&value, &mut buf)
1552 .expect("Error serializing properly");
1553
1554 assert_eq!(buf[0], 0x03);
1555 assert_eq!(buf[1], 0x00);
1556 assert_eq!(buf[2..18], schema_uuid.into_bytes()[..]);
1557 Ok(())
1558 }
1559
1560 #[test]
1561 fn test_writer_parity() -> TestResult {
1562 let obj1 = TestSingleObjectWriter {
1563 a: 300,
1564 b: 34.555,
1565 c: vec!["cat".into(), "dog".into()],
1566 };
1567
1568 let mut buf1: Vec<u8> = Vec::new();
1569 let mut buf2: Vec<u8> = Vec::new();
1570 let mut buf3: Vec<u8> = Vec::new();
1571
1572 let mut generic_writer = GenericSingleObjectWriter::new_with_capacity(
1573 &TestSingleObjectWriter::get_schema(),
1574 1024,
1575 )
1576 .expect("Should resolve schema");
1577 let mut specific_writer =
1578 SpecificSingleObjectWriter::<TestSingleObjectWriter>::with_capacity(1024)
1579 .expect("Resolved should pass");
1580 specific_writer
1581 .write(obj1.clone(), &mut buf1)
1582 .expect("Serialization expected");
1583 specific_writer
1584 .write_value(obj1.clone(), &mut buf2)
1585 .expect("Serialization expected");
1586 generic_writer
1587 .write_value(obj1.into(), &mut buf3)
1588 .expect("Serialization expected");
1589 assert_eq!(buf1, buf2);
1590 assert_eq!(buf1, buf3);
1591
1592 Ok(())
1593 }
1594
1595 #[test]
1596 fn avro_3894_take_aliases_into_account_when_serializing() -> TestResult {
1597 const SCHEMA: &str = r#"
1598 {
1599 "type": "record",
1600 "name": "Conference",
1601 "fields": [
1602 {"type": "string", "name": "name"},
1603 {"type": ["null", "long"], "name": "date", "aliases" : [ "time2", "time" ]}
1604 ]
1605 }"#;
1606
1607 #[derive(Debug, PartialEq, Eq, Clone, Serialize)]
1608 pub struct Conference {
1609 pub name: String,
1610 pub time: Option<i64>,
1611 }
1612
1613 let conf = Conference {
1614 name: "RustConf".to_string(),
1615 time: Some(1234567890),
1616 };
1617
1618 let schema = Schema::parse_str(SCHEMA)?;
1619 let mut writer = Writer::new(&schema, Vec::new())?;
1620
1621 let bytes = writer.append_ser(conf)?;
1622
1623 assert_eq!(182, bytes);
1624
1625 Ok(())
1626 }
1627
1628 #[test]
1629 fn avro_4014_validation_returns_a_detailed_error() -> TestResult {
1630 const SCHEMA: &str = r#"
1631 {
1632 "type": "record",
1633 "name": "Conference",
1634 "fields": [
1635 {"type": "string", "name": "name"},
1636 {"type": ["null", "long"], "name": "date", "aliases" : [ "time2", "time" ]}
1637 ]
1638 }"#;
1639
1640 #[derive(Debug, PartialEq, Clone, Serialize)]
1641 pub struct Conference {
1642 pub name: String,
1643 pub time: Option<f64>, }
1645
1646 let conf = Conference {
1647 name: "RustConf".to_string(),
1648 time: Some(12345678.90),
1649 };
1650
1651 let schema = Schema::parse_str(SCHEMA)?;
1652 let mut writer = Writer::new(&schema, Vec::new())?;
1653
1654 match writer.append_ser(conf) {
1655 Ok(bytes) => panic!("Expected an error, but got {bytes} bytes written"),
1656 Err(e) => {
1657 assert_eq!(
1658 e.to_string(),
1659 r#"Failed to serialize field 'time' for record Record(RecordSchema { name: Name { name: "Conference", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "name", doc: None, aliases: None, default: None, schema: String, order: Ascending, position: 0, custom_attributes: {} }, RecordField { name: "date", doc: None, aliases: Some(["time2", "time"]), default: None, schema: Union(UnionSchema { schemas: [Null, Long], variant_index: {Null: 0, Long: 1} }), order: Ascending, position: 1, custom_attributes: {} }], lookup: {"date": 1, "name": 0, "time": 1, "time2": 1}, attributes: {} }): Failed to serialize value of type f64 using schema Union(UnionSchema { schemas: [Null, Long], variant_index: {Null: 0, Long: 1} }): 12345678.9. Cause: Cannot find a Double schema in [Null, Long]"#
1660 );
1661 }
1662 }
1663 Ok(())
1664 }
1665
1666 #[test]
1667 fn avro_4063_flush_applies_to_inner_writer() -> TestResult {
1668 const SCHEMA: &str = r#"
1669 {
1670 "type": "record",
1671 "name": "ExampleSchema",
1672 "fields": [
1673 {"name": "exampleField", "type": "string"}
1674 ]
1675 }
1676 "#;
1677
1678 #[derive(Clone, Default)]
1679 struct TestBuffer(Rc<RefCell<Vec<u8>>>);
1680
1681 impl TestBuffer {
1682 fn len(&self) -> usize {
1683 self.0.borrow().len()
1684 }
1685 }
1686
1687 impl Write for TestBuffer {
1688 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
1689 self.0.borrow_mut().write(buf)
1690 }
1691
1692 fn flush(&mut self) -> std::io::Result<()> {
1693 Ok(())
1694 }
1695 }
1696
1697 let shared_buffer = TestBuffer::default();
1698
1699 let buffered_writer = std::io::BufWriter::new(shared_buffer.clone());
1700
1701 let schema = Schema::parse_str(SCHEMA)?;
1702
1703 let mut writer = Writer::new(&schema, buffered_writer)?;
1704
1705 let mut record = Record::new(writer.schema()).unwrap();
1706 record.put("exampleField", "value");
1707
1708 writer.append(record)?;
1709 writer.flush()?;
1710
1711 assert_eq!(
1712 shared_buffer.len(),
1713 151,
1714 "the test buffer was not fully written to after Writer::flush was called"
1715 );
1716
1717 Ok(())
1718 }
1719}