1use crate::{
20 AvroResult, Codec, Error,
21 encode::{encode, encode_internal, encode_to_vec},
22 error::Details,
23 headers::{HeaderBuilder, RabinFingerprintHeader},
24 schema::{AvroSchema, Name, ResolvedOwnedSchema, ResolvedSchema, Schema},
25 ser_schema::SchemaAwareWriteSerializer,
26 types::Value,
27};
28use serde::Serialize;
29use std::{
30 collections::HashMap, io::Write, marker::PhantomData, mem::ManuallyDrop, ops::RangeInclusive,
31};
32
33const DEFAULT_BLOCK_SIZE: usize = 16000;
34const AVRO_OBJECT_HEADER: &[u8] = b"Obj\x01";
35
36pub struct Writer<'a, W: Write> {
42 schema: &'a Schema,
43 writer: W,
44 resolved_schema: ResolvedSchema<'a>,
45 codec: Codec,
46 block_size: usize,
47 buffer: Vec<u8>,
48 num_values: usize,
49 marker: [u8; 16],
50 has_header: bool,
51 user_metadata: HashMap<String, Value>,
52}
53
54#[bon::bon]
55impl<'a, W: Write> Writer<'a, W> {
56 #[builder]
57 pub fn builder(
58 schema: &'a Schema,
59 schemata: Option<Vec<&'a Schema>>,
60 writer: W,
61 #[builder(default = Codec::Null)] codec: Codec,
62 #[builder(default = DEFAULT_BLOCK_SIZE)] block_size: usize,
63 #[builder(default = generate_sync_marker())] marker: [u8; 16],
64 #[builder(default = false)]
68 has_header: bool,
69 #[builder(default)] user_metadata: HashMap<String, Value>,
70 ) -> AvroResult<Self> {
71 let resolved_schema = if let Some(schemata) = schemata {
72 ResolvedSchema::try_from(schemata)?
73 } else {
74 ResolvedSchema::try_from(schema)?
75 };
76 Ok(Self {
77 schema,
78 writer,
79 resolved_schema,
80 codec,
81 block_size,
82 buffer: Vec::with_capacity(block_size),
83 num_values: 0,
84 marker,
85 has_header,
86 user_metadata,
87 })
88 }
89}
90
91impl<'a, W: Write> Writer<'a, W> {
92 pub fn new(schema: &'a Schema, writer: W) -> AvroResult<Self> {
96 Writer::with_codec(schema, writer, Codec::Null)
97 }
98
99 pub fn with_codec(schema: &'a Schema, writer: W, codec: Codec) -> AvroResult<Self> {
102 Self::builder()
103 .schema(schema)
104 .writer(writer)
105 .codec(codec)
106 .build()
107 }
108
109 pub fn with_schemata(
114 schema: &'a Schema,
115 schemata: Vec<&'a Schema>,
116 writer: W,
117 codec: Codec,
118 ) -> AvroResult<Self> {
119 Self::builder()
120 .schema(schema)
121 .schemata(schemata)
122 .writer(writer)
123 .codec(codec)
124 .build()
125 }
126
127 pub fn append_to(schema: &'a Schema, writer: W, marker: [u8; 16]) -> AvroResult<Self> {
131 Writer::append_to_with_codec(schema, writer, Codec::Null, marker)
132 }
133
134 pub fn append_to_with_codec(
137 schema: &'a Schema,
138 writer: W,
139 codec: Codec,
140 marker: [u8; 16],
141 ) -> AvroResult<Self> {
142 Self::builder()
143 .schema(schema)
144 .writer(writer)
145 .codec(codec)
146 .marker(marker)
147 .has_header(true)
148 .build()
149 }
150
151 pub fn append_to_with_codec_schemata(
154 schema: &'a Schema,
155 schemata: Vec<&'a Schema>,
156 writer: W,
157 codec: Codec,
158 marker: [u8; 16],
159 ) -> AvroResult<Self> {
160 Self::builder()
161 .schema(schema)
162 .schemata(schemata)
163 .writer(writer)
164 .codec(codec)
165 .marker(marker)
166 .has_header(true)
167 .build()
168 }
169
170 pub fn schema(&self) -> &'a Schema {
172 self.schema
173 }
174
175 pub fn append<T: Into<Value>>(&mut self, value: T) -> AvroResult<usize> {
184 let n = self.maybe_write_header()?;
185
186 let avro = value.into();
187 self.append_value_ref(&avro).map(|m| m + n)
188 }
189
190 pub fn append_value_ref(&mut self, value: &Value) -> AvroResult<usize> {
198 let n = self.maybe_write_header()?;
199
200 write_value_ref_resolved(self.schema, &self.resolved_schema, value, &mut self.buffer)?;
201 self.num_values += 1;
202
203 if self.buffer.len() >= self.block_size {
204 return self.flush().map(|b| b + n);
205 }
206
207 Ok(n)
208 }
209
210 pub fn append_ser<S: Serialize>(&mut self, value: S) -> AvroResult<usize> {
220 let n = self.maybe_write_header()?;
221
222 let mut serializer = SchemaAwareWriteSerializer::new(
223 &mut self.buffer,
224 self.schema,
225 self.resolved_schema.get_names(),
226 None,
227 );
228 value.serialize(&mut serializer)?;
229 self.num_values += 1;
230
231 if self.buffer.len() >= self.block_size {
232 return self.flush().map(|b| b + n);
233 }
234
235 Ok(n)
236 }
237
238 pub fn extend<I, T: Into<Value>>(&mut self, values: I) -> AvroResult<usize>
246 where
247 I: IntoIterator<Item = T>,
248 {
249 let mut num_bytes = 0;
264 for value in values {
265 num_bytes += self.append(value)?;
266 }
267 num_bytes += self.flush()?;
268
269 Ok(num_bytes)
270 }
271
272 pub fn extend_ser<I, T: Serialize>(&mut self, values: I) -> AvroResult<usize>
281 where
282 I: IntoIterator<Item = T>,
283 {
284 let mut num_bytes = 0;
299 for value in values {
300 num_bytes += self.append_ser(value)?;
301 }
302 num_bytes += self.flush()?;
303
304 Ok(num_bytes)
305 }
306
307 pub fn extend_from_slice(&mut self, values: &[Value]) -> AvroResult<usize> {
315 let mut num_bytes = 0;
316 for value in values {
317 num_bytes += self.append_value_ref(value)?;
318 }
319 num_bytes += self.flush()?;
320
321 Ok(num_bytes)
322 }
323
324 pub fn flush(&mut self) -> AvroResult<usize> {
332 let mut num_bytes = self.maybe_write_header()?;
333 if self.num_values == 0 {
334 return Ok(num_bytes);
335 }
336
337 self.codec.compress(&mut self.buffer)?;
338
339 let num_values = self.num_values;
340 let stream_len = self.buffer.len();
341
342 num_bytes += self.append_raw(&num_values.into(), &Schema::Long)?
343 + self.append_raw(&stream_len.into(), &Schema::Long)?
344 + self
345 .writer
346 .write(self.buffer.as_ref())
347 .map_err(Details::WriteBytes)?
348 + self.append_marker()?;
349
350 self.buffer.clear();
351 self.num_values = 0;
352
353 self.writer.flush().map_err(Details::FlushWriter)?;
354
355 Ok(num_bytes)
356 }
357
358 pub fn into_inner(mut self) -> AvroResult<W> {
363 self.maybe_write_header()?;
364 self.flush()?;
365
366 let mut this = ManuallyDrop::new(self);
367
368 let _buffer = std::mem::take(&mut this.buffer);
370 let _user_metadata = std::mem::take(&mut this.user_metadata);
371 unsafe { std::ptr::drop_in_place(&mut this.resolved_schema) };
373
374 let writer = unsafe { std::ptr::read(&this.writer) };
376
377 Ok(writer)
378 }
379
380 pub fn get_ref(&self) -> &W {
385 &self.writer
386 }
387
388 pub fn get_mut(&mut self) -> &mut W {
395 &mut self.writer
396 }
397
398 fn append_marker(&mut self) -> AvroResult<usize> {
400 self.writer
403 .write(&self.marker)
404 .map_err(|e| Details::WriteMarker(e).into())
405 }
406
407 fn append_raw(&mut self, value: &Value, schema: &Schema) -> AvroResult<usize> {
409 self.append_bytes(encode_to_vec(value, schema)?.as_ref())
410 }
411
412 fn append_bytes(&mut self, bytes: &[u8]) -> AvroResult<usize> {
414 self.writer
415 .write(bytes)
416 .map_err(|e| Details::WriteBytes(e).into())
417 }
418
419 pub fn add_user_metadata<T: AsRef<[u8]>>(&mut self, key: String, value: T) -> AvroResult<()> {
422 if !self.has_header {
423 if key.starts_with("avro.") {
424 return Err(Details::InvalidMetadataKey(key).into());
425 }
426 self.user_metadata
427 .insert(key, Value::Bytes(value.as_ref().to_vec()));
428 Ok(())
429 } else {
430 Err(Details::FileHeaderAlreadyWritten.into())
431 }
432 }
433
434 fn header(&self) -> Result<Vec<u8>, Error> {
436 let schema_bytes = serde_json::to_string(self.schema)
437 .map_err(Details::ConvertJsonToString)?
438 .into_bytes();
439
440 let mut metadata = HashMap::with_capacity(2);
441 metadata.insert("avro.schema", Value::Bytes(schema_bytes));
442 if self.codec != Codec::Null {
443 metadata.insert("avro.codec", self.codec.into());
444 }
445 match self.codec {
446 #[cfg(feature = "bzip")]
447 Codec::Bzip2(settings) => {
448 metadata.insert(
449 "avro.codec.compression_level",
450 Value::Bytes(vec![settings.compression_level]),
451 );
452 }
453 #[cfg(feature = "xz")]
454 Codec::Xz(settings) => {
455 metadata.insert(
456 "avro.codec.compression_level",
457 Value::Bytes(vec![settings.compression_level]),
458 );
459 }
460 #[cfg(feature = "zstandard")]
461 Codec::Zstandard(settings) => {
462 metadata.insert(
463 "avro.codec.compression_level",
464 Value::Bytes(vec![settings.compression_level]),
465 );
466 }
467 _ => {}
468 }
469
470 for (k, v) in &self.user_metadata {
471 metadata.insert(k.as_str(), v.clone());
472 }
473
474 let mut header = Vec::new();
475 header.extend_from_slice(AVRO_OBJECT_HEADER);
476 encode(&metadata.into(), &Schema::map(Schema::Bytes), &mut header)?;
477 header.extend_from_slice(&self.marker);
478
479 Ok(header)
480 }
481
482 fn maybe_write_header(&mut self) -> AvroResult<usize> {
483 if !self.has_header {
484 let header = self.header()?;
485 let n = self.append_bytes(header.as_ref())?;
486 self.has_header = true;
487 Ok(n)
488 } else {
489 Ok(0)
490 }
491 }
492}
493
494impl<W: Write> Drop for Writer<'_, W> {
495 fn drop(&mut self) {
497 let _ = self.maybe_write_header();
498 let _ = self.flush();
499 }
500}
501
502fn write_avro_datum<T: Into<Value>, W: Write>(
508 schema: &Schema,
509 value: T,
510 writer: &mut W,
511) -> Result<(), Error> {
512 let avro = value.into();
513 if !avro.validate(schema) {
514 return Err(Details::Validation.into());
515 }
516 encode(&avro, schema, writer)?;
517 Ok(())
518}
519
520fn write_avro_datum_schemata<T: Into<Value>>(
521 schema: &Schema,
522 schemata: Vec<&Schema>,
523 value: T,
524 buffer: &mut Vec<u8>,
525) -> AvroResult<usize> {
526 let avro = value.into();
527 let rs = ResolvedSchema::try_from(schemata)?;
528 let names = rs.get_names();
529 let enclosing_namespace = schema.namespace();
530 if let Some(_err) = avro.validate_internal(schema, names, &enclosing_namespace) {
531 return Err(Details::Validation.into());
532 }
533 encode_internal(&avro, schema, names, &enclosing_namespace, buffer)
534}
535
536pub struct GenericSingleObjectWriter {
540 buffer: Vec<u8>,
541 resolved: ResolvedOwnedSchema,
542}
543
544impl GenericSingleObjectWriter {
545 pub fn new_with_capacity(
546 schema: &Schema,
547 initial_buffer_cap: usize,
548 ) -> AvroResult<GenericSingleObjectWriter> {
549 let header_builder = RabinFingerprintHeader::from_schema(schema);
550 Self::new_with_capacity_and_header_builder(schema, initial_buffer_cap, header_builder)
551 }
552
553 pub fn new_with_capacity_and_header_builder<HB: HeaderBuilder>(
554 schema: &Schema,
555 initial_buffer_cap: usize,
556 header_builder: HB,
557 ) -> AvroResult<GenericSingleObjectWriter> {
558 let mut buffer = Vec::with_capacity(initial_buffer_cap);
559 let header = header_builder.build_header();
560 buffer.extend_from_slice(&header);
561
562 Ok(GenericSingleObjectWriter {
563 buffer,
564 resolved: ResolvedOwnedSchema::try_from(schema.clone())?,
565 })
566 }
567
568 const HEADER_LENGTH_RANGE: RangeInclusive<usize> = 10_usize..=20_usize;
569
570 pub fn write_value_ref<W: Write>(&mut self, v: &Value, writer: &mut W) -> AvroResult<usize> {
572 let original_length = self.buffer.len();
573 if !Self::HEADER_LENGTH_RANGE.contains(&original_length) {
574 Err(Details::IllegalSingleObjectWriterState.into())
575 } else {
576 write_value_ref_owned_resolved(&self.resolved, v, &mut self.buffer)?;
577 writer
578 .write_all(&self.buffer)
579 .map_err(Details::WriteBytes)?;
580 let len = self.buffer.len();
581 self.buffer.truncate(original_length);
582 Ok(len)
583 }
584 }
585
586 pub fn write_value<W: Write>(&mut self, v: Value, writer: &mut W) -> AvroResult<usize> {
588 self.write_value_ref(&v, writer)
589 }
590}
591
592pub struct SpecificSingleObjectWriter<T>
594where
595 T: AvroSchema,
596{
597 inner: GenericSingleObjectWriter,
598 schema: Schema,
599 header_written: bool,
600 _model: PhantomData<T>,
601}
602
603impl<T> SpecificSingleObjectWriter<T>
604where
605 T: AvroSchema,
606{
607 pub fn with_capacity(buffer_cap: usize) -> AvroResult<SpecificSingleObjectWriter<T>> {
608 let schema = T::get_schema();
609 Ok(SpecificSingleObjectWriter {
610 inner: GenericSingleObjectWriter::new_with_capacity(&schema, buffer_cap)?,
611 schema,
612 header_written: false,
613 _model: PhantomData,
614 })
615 }
616}
617
618impl<T> SpecificSingleObjectWriter<T>
619where
620 T: AvroSchema + Into<Value>,
621{
622 pub fn write_value<W: Write>(&mut self, data: T, writer: &mut W) -> AvroResult<usize> {
625 let v: Value = data.into();
626 self.inner.write_value_ref(&v, writer)
627 }
628}
629
630impl<T> SpecificSingleObjectWriter<T>
631where
632 T: AvroSchema + Serialize,
633{
634 pub fn write_ref<W: Write>(&mut self, data: &T, writer: &mut W) -> AvroResult<usize> {
637 let mut bytes_written: usize = 0;
638
639 if !self.header_written {
640 bytes_written += writer
641 .write(self.inner.buffer.as_slice())
642 .map_err(Details::WriteBytes)?;
643 self.header_written = true;
644 }
645
646 bytes_written += write_avro_datum_ref(&self.schema, data, writer)?;
647
648 Ok(bytes_written)
649 }
650
651 pub fn write<W: Write>(&mut self, data: T, writer: &mut W) -> AvroResult<usize> {
654 self.write_ref(&data, writer)
655 }
656}
657
658fn write_value_ref_resolved(
659 schema: &Schema,
660 resolved_schema: &ResolvedSchema,
661 value: &Value,
662 buffer: &mut Vec<u8>,
663) -> AvroResult<usize> {
664 match value.validate_internal(schema, resolved_schema.get_names(), &schema.namespace()) {
665 Some(reason) => Err(Details::ValidationWithReason {
666 value: value.clone(),
667 schema: schema.clone(),
668 reason,
669 }
670 .into()),
671 None => encode_internal(
672 value,
673 schema,
674 resolved_schema.get_names(),
675 &schema.namespace(),
676 buffer,
677 ),
678 }
679}
680
681fn write_value_ref_owned_resolved(
682 resolved_schema: &ResolvedOwnedSchema,
683 value: &Value,
684 buffer: &mut Vec<u8>,
685) -> AvroResult<()> {
686 let root_schema = resolved_schema.get_root_schema();
687 if let Some(reason) = value.validate_internal(
688 root_schema,
689 resolved_schema.get_names(),
690 &root_schema.namespace(),
691 ) {
692 return Err(Details::ValidationWithReason {
693 value: value.clone(),
694 schema: root_schema.clone(),
695 reason,
696 }
697 .into());
698 }
699 encode_internal(
700 value,
701 root_schema,
702 resolved_schema.get_names(),
703 &root_schema.namespace(),
704 buffer,
705 )?;
706 Ok(())
707}
708
709pub fn to_avro_datum<T: Into<Value>>(schema: &Schema, value: T) -> AvroResult<Vec<u8>> {
716 let mut buffer = Vec::new();
717 write_avro_datum(schema, value, &mut buffer)?;
718 Ok(buffer)
719}
720
721pub fn write_avro_datum_ref<T: Serialize, W: Write>(
728 schema: &Schema,
729 data: &T,
730 writer: &mut W,
731) -> AvroResult<usize> {
732 let names: HashMap<Name, &Schema> = HashMap::new();
733 let mut serializer = SchemaAwareWriteSerializer::new(writer, schema, &names, None);
734 let bytes_written = data.serialize(&mut serializer)?;
735 Ok(bytes_written)
736}
737
738pub fn to_avro_datum_schemata<T: Into<Value>>(
743 schema: &Schema,
744 schemata: Vec<&Schema>,
745 value: T,
746) -> AvroResult<Vec<u8>> {
747 let mut buffer = Vec::new();
748 write_avro_datum_schemata(schema, schemata, value, &mut buffer)?;
749 Ok(buffer)
750}
751
752#[cfg(not(target_arch = "wasm32"))]
753fn generate_sync_marker() -> [u8; 16] {
754 let mut marker = [0_u8; 16];
755 std::iter::repeat_with(rand::random)
756 .take(16)
757 .enumerate()
758 .for_each(|(i, n)| marker[i] = n);
759 marker
760}
761
762#[cfg(target_arch = "wasm32")]
763fn generate_sync_marker() -> [u8; 16] {
764 let mut marker = [0_u8; 16];
765 std::iter::repeat_with(quad_rand::rand)
766 .take(4)
767 .flat_map(|i| i.to_be_bytes())
768 .enumerate()
769 .for_each(|(i, n)| marker[i] = n);
770 marker
771}
772
773#[cfg(test)]
774mod tests {
775 use std::{cell::RefCell, rc::Rc};
776
777 use super::*;
778 use crate::{
779 Reader,
780 decimal::Decimal,
781 duration::{Days, Duration, Millis, Months},
782 headers::GlueSchemaUuidHeader,
783 rabin::Rabin,
784 schema::{DecimalSchema, FixedSchema, Name},
785 types::Record,
786 util::zig_i64,
787 };
788 use pretty_assertions::assert_eq;
789 use serde::{Deserialize, Serialize};
790 use uuid::Uuid;
791
792 use crate::{codec::DeflateSettings, error::Details};
793 use apache_avro_test_helper::TestResult;
794
795 const AVRO_OBJECT_HEADER_LEN: usize = AVRO_OBJECT_HEADER.len();
796
797 const SCHEMA: &str = r#"
798 {
799 "type": "record",
800 "name": "test",
801 "fields": [
802 {
803 "name": "a",
804 "type": "long",
805 "default": 42
806 },
807 {
808 "name": "b",
809 "type": "string"
810 }
811 ]
812 }
813 "#;
814
815 const UNION_SCHEMA: &str = r#"["null", "long"]"#;
816
817 #[test]
818 fn test_to_avro_datum() -> TestResult {
819 let schema = Schema::parse_str(SCHEMA)?;
820 let mut record = Record::new(&schema).unwrap();
821 record.put("a", 27i64);
822 record.put("b", "foo");
823
824 let mut expected = Vec::new();
825 zig_i64(27, &mut expected)?;
826 zig_i64(3, &mut expected)?;
827 expected.extend([b'f', b'o', b'o']);
828
829 assert_eq!(to_avro_datum(&schema, record)?, expected);
830
831 Ok(())
832 }
833
834 #[test]
835 fn avro_rs_193_write_avro_datum_ref() -> TestResult {
836 #[derive(Serialize)]
837 struct TestStruct {
838 a: i64,
839 b: String,
840 }
841
842 let schema = Schema::parse_str(SCHEMA)?;
843 let mut writer: Vec<u8> = Vec::new();
844 let data = TestStruct {
845 a: 27,
846 b: "foo".to_string(),
847 };
848
849 let mut expected = Vec::new();
850 zig_i64(27, &mut expected)?;
851 zig_i64(3, &mut expected)?;
852 expected.extend([b'f', b'o', b'o']);
853
854 let bytes = write_avro_datum_ref(&schema, &data, &mut writer)?;
855
856 assert_eq!(bytes, expected.len());
857 assert_eq!(writer, expected);
858
859 Ok(())
860 }
861
862 #[test]
863 fn avro_rs_220_flush_write_header() -> TestResult {
864 let schema = Schema::parse_str(SCHEMA)?;
865
866 let mut writer = Writer::new(&schema, Vec::new())?;
868 writer.flush()?;
869 let result = writer.into_inner()?;
870 assert_eq!(result.len(), 147);
871
872 let mut writer = Writer::builder()
874 .has_header(true)
875 .schema(&schema)
876 .writer(Vec::new())
877 .build()?;
878 writer.flush()?;
879 let result = writer.into_inner()?;
880 assert_eq!(result.len(), 0);
881
882 Ok(())
883 }
884
885 #[test]
886 fn test_union_not_null() -> TestResult {
887 let schema = Schema::parse_str(UNION_SCHEMA)?;
888 let union = Value::Union(1, Box::new(Value::Long(3)));
889
890 let mut expected = Vec::new();
891 zig_i64(1, &mut expected)?;
892 zig_i64(3, &mut expected)?;
893
894 assert_eq!(to_avro_datum(&schema, union)?, expected);
895
896 Ok(())
897 }
898
899 #[test]
900 fn test_union_null() -> TestResult {
901 let schema = Schema::parse_str(UNION_SCHEMA)?;
902 let union = Value::Union(0, Box::new(Value::Null));
903
904 let mut expected = Vec::new();
905 zig_i64(0, &mut expected)?;
906
907 assert_eq!(to_avro_datum(&schema, union)?, expected);
908
909 Ok(())
910 }
911
912 fn logical_type_test<T: Into<Value> + Clone>(
913 schema_str: &'static str,
914
915 expected_schema: &Schema,
916 value: Value,
917
918 raw_schema: &Schema,
919 raw_value: T,
920 ) -> TestResult {
921 let schema = Schema::parse_str(schema_str)?;
922 assert_eq!(&schema, expected_schema);
923 let ser = to_avro_datum(&schema, value.clone())?;
925 let raw_ser = to_avro_datum(raw_schema, raw_value)?;
926 assert_eq!(ser, raw_ser);
927
928 let mut r = ser.as_slice();
930 let de = crate::from_avro_datum(&schema, &mut r, None)?;
931 assert_eq!(de, value);
932 Ok(())
933 }
934
935 #[test]
936 fn date() -> TestResult {
937 logical_type_test(
938 r#"{"type": "int", "logicalType": "date"}"#,
939 &Schema::Date,
940 Value::Date(1_i32),
941 &Schema::Int,
942 1_i32,
943 )
944 }
945
946 #[test]
947 fn time_millis() -> TestResult {
948 logical_type_test(
949 r#"{"type": "int", "logicalType": "time-millis"}"#,
950 &Schema::TimeMillis,
951 Value::TimeMillis(1_i32),
952 &Schema::Int,
953 1_i32,
954 )
955 }
956
957 #[test]
958 fn time_micros() -> TestResult {
959 logical_type_test(
960 r#"{"type": "long", "logicalType": "time-micros"}"#,
961 &Schema::TimeMicros,
962 Value::TimeMicros(1_i64),
963 &Schema::Long,
964 1_i64,
965 )
966 }
967
968 #[test]
969 fn timestamp_millis() -> TestResult {
970 logical_type_test(
971 r#"{"type": "long", "logicalType": "timestamp-millis"}"#,
972 &Schema::TimestampMillis,
973 Value::TimestampMillis(1_i64),
974 &Schema::Long,
975 1_i64,
976 )
977 }
978
979 #[test]
980 fn timestamp_micros() -> TestResult {
981 logical_type_test(
982 r#"{"type": "long", "logicalType": "timestamp-micros"}"#,
983 &Schema::TimestampMicros,
984 Value::TimestampMicros(1_i64),
985 &Schema::Long,
986 1_i64,
987 )
988 }
989
990 #[test]
991 fn decimal_fixed() -> TestResult {
992 let size = 30;
993 let inner = Schema::Fixed(FixedSchema {
994 name: Name::new("decimal")?,
995 aliases: None,
996 doc: None,
997 size,
998 default: None,
999 attributes: Default::default(),
1000 });
1001 let value = vec![0u8; size];
1002 logical_type_test(
1003 r#"{"type": {"type": "fixed", "size": 30, "name": "decimal"}, "logicalType": "decimal", "precision": 20, "scale": 5}"#,
1004 &Schema::Decimal(DecimalSchema {
1005 precision: 20,
1006 scale: 5,
1007 inner: Box::new(inner.clone()),
1008 }),
1009 Value::Decimal(Decimal::from(value.clone())),
1010 &inner,
1011 Value::Fixed(size, value),
1012 )
1013 }
1014
1015 #[test]
1016 fn decimal_bytes() -> TestResult {
1017 let inner = Schema::Bytes;
1018 let value = vec![0u8; 10];
1019 logical_type_test(
1020 r#"{"type": "bytes", "logicalType": "decimal", "precision": 4, "scale": 3}"#,
1021 &Schema::Decimal(DecimalSchema {
1022 precision: 4,
1023 scale: 3,
1024 inner: Box::new(inner.clone()),
1025 }),
1026 Value::Decimal(Decimal::from(value.clone())),
1027 &inner,
1028 value,
1029 )
1030 }
1031
1032 #[test]
1033 fn duration() -> TestResult {
1034 let inner = Schema::Fixed(FixedSchema {
1035 name: Name::new("duration")?,
1036 aliases: None,
1037 doc: None,
1038 size: 12,
1039 default: None,
1040 attributes: Default::default(),
1041 });
1042 let value = Value::Duration(Duration::new(
1043 Months::new(256),
1044 Days::new(512),
1045 Millis::new(1024),
1046 ));
1047 logical_type_test(
1048 r#"{"type": {"type": "fixed", "name": "duration", "size": 12}, "logicalType": "duration"}"#,
1049 &Schema::Duration,
1050 value,
1051 &inner,
1052 Value::Fixed(12, vec![0, 1, 0, 0, 0, 2, 0, 0, 0, 4, 0, 0]),
1053 )
1054 }
1055
1056 #[test]
1057 fn test_writer_append() -> TestResult {
1058 let schema = Schema::parse_str(SCHEMA)?;
1059 let mut writer = Writer::new(&schema, Vec::new())?;
1060
1061 let mut record = Record::new(&schema).unwrap();
1062 record.put("a", 27i64);
1063 record.put("b", "foo");
1064
1065 let n1 = writer.append(record.clone())?;
1066 let n2 = writer.append(record.clone())?;
1067 let n3 = writer.flush()?;
1068 let result = writer.into_inner()?;
1069
1070 assert_eq!(n1 + n2 + n3, result.len());
1071
1072 let mut data = Vec::new();
1073 zig_i64(27, &mut data)?;
1074 zig_i64(3, &mut data)?;
1075 data.extend(b"foo");
1076 data.extend(data.clone());
1077
1078 assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1080 let last_data_byte = result.len() - 16;
1082 assert_eq!(
1083 &result[last_data_byte - data.len()..last_data_byte],
1084 data.as_slice()
1085 );
1086
1087 Ok(())
1088 }
1089
1090 #[test]
1091 fn test_writer_extend() -> TestResult {
1092 let schema = Schema::parse_str(SCHEMA)?;
1093 let mut writer = Writer::new(&schema, Vec::new())?;
1094
1095 let mut record = Record::new(&schema).unwrap();
1096 record.put("a", 27i64);
1097 record.put("b", "foo");
1098 let record_copy = record.clone();
1099 let records = vec![record, record_copy];
1100
1101 let n1 = writer.extend(records)?;
1102 let n2 = writer.flush()?;
1103 let result = writer.into_inner()?;
1104
1105 assert_eq!(n1 + n2, result.len());
1106
1107 let mut data = Vec::new();
1108 zig_i64(27, &mut data)?;
1109 zig_i64(3, &mut data)?;
1110 data.extend(b"foo");
1111 data.extend(data.clone());
1112
1113 assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1115 let last_data_byte = result.len() - 16;
1117 assert_eq!(
1118 &result[last_data_byte - data.len()..last_data_byte],
1119 data.as_slice()
1120 );
1121
1122 Ok(())
1123 }
1124
1125 #[derive(Debug, Clone, Deserialize, Serialize)]
1126 struct TestSerdeSerialize {
1127 a: i64,
1128 b: String,
1129 }
1130
1131 #[test]
1132 fn test_writer_append_ser() -> TestResult {
1133 let schema = Schema::parse_str(SCHEMA)?;
1134 let mut writer = Writer::new(&schema, Vec::new())?;
1135
1136 let record = TestSerdeSerialize {
1137 a: 27,
1138 b: "foo".to_owned(),
1139 };
1140
1141 let n1 = writer.append_ser(record)?;
1142 let n2 = writer.flush()?;
1143 let result = writer.into_inner()?;
1144
1145 assert_eq!(n1 + n2, result.len());
1146
1147 let mut data = Vec::new();
1148 zig_i64(27, &mut data)?;
1149 zig_i64(3, &mut data)?;
1150 data.extend(b"foo");
1151
1152 assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1154 let last_data_byte = result.len() - 16;
1156 assert_eq!(
1157 &result[last_data_byte - data.len()..last_data_byte],
1158 data.as_slice()
1159 );
1160
1161 Ok(())
1162 }
1163
1164 #[test]
1165 fn test_writer_extend_ser() -> TestResult {
1166 let schema = Schema::parse_str(SCHEMA)?;
1167 let mut writer = Writer::new(&schema, Vec::new())?;
1168
1169 let record = TestSerdeSerialize {
1170 a: 27,
1171 b: "foo".to_owned(),
1172 };
1173 let record_copy = record.clone();
1174 let records = vec![record, record_copy];
1175
1176 let n1 = writer.extend_ser(records)?;
1177 let n2 = writer.flush()?;
1178 let result = writer.into_inner()?;
1179
1180 assert_eq!(n1 + n2, result.len());
1181
1182 let mut data = Vec::new();
1183 zig_i64(27, &mut data)?;
1184 zig_i64(3, &mut data)?;
1185 data.extend(b"foo");
1186 data.extend(data.clone());
1187
1188 assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1190 let last_data_byte = result.len() - 16;
1192 assert_eq!(
1193 &result[last_data_byte - data.len()..last_data_byte],
1194 data.as_slice()
1195 );
1196
1197 Ok(())
1198 }
1199
1200 fn make_writer_with_codec(schema: &Schema) -> AvroResult<Writer<'_, Vec<u8>>> {
1201 Writer::with_codec(
1202 schema,
1203 Vec::new(),
1204 Codec::Deflate(DeflateSettings::default()),
1205 )
1206 }
1207
1208 fn make_writer_with_builder(schema: &Schema) -> AvroResult<Writer<'_, Vec<u8>>> {
1209 Writer::builder()
1210 .writer(Vec::new())
1211 .schema(schema)
1212 .codec(Codec::Deflate(DeflateSettings::default()))
1213 .block_size(100)
1214 .build()
1215 }
1216
1217 fn check_writer(mut writer: Writer<'_, Vec<u8>>, schema: &Schema) -> TestResult {
1218 let mut record = Record::new(schema).unwrap();
1219 record.put("a", 27i64);
1220 record.put("b", "foo");
1221
1222 let n1 = writer.append(record.clone())?;
1223 let n2 = writer.append(record.clone())?;
1224 let n3 = writer.flush()?;
1225 let result = writer.into_inner()?;
1226
1227 assert_eq!(n1 + n2 + n3, result.len());
1228
1229 let mut data = Vec::new();
1230 zig_i64(27, &mut data)?;
1231 zig_i64(3, &mut data)?;
1232 data.extend(b"foo");
1233 data.extend(data.clone());
1234 Codec::Deflate(DeflateSettings::default()).compress(&mut data)?;
1235
1236 assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1238 let last_data_byte = result.len() - 16;
1240 assert_eq!(
1241 &result[last_data_byte - data.len()..last_data_byte],
1242 data.as_slice()
1243 );
1244
1245 Ok(())
1246 }
1247
1248 #[test]
1249 fn test_writer_with_codec() -> TestResult {
1250 let schema = Schema::parse_str(SCHEMA)?;
1251 let writer = make_writer_with_codec(&schema)?;
1252 check_writer(writer, &schema)
1253 }
1254
1255 #[test]
1256 fn test_writer_with_builder() -> TestResult {
1257 let schema = Schema::parse_str(SCHEMA)?;
1258 let writer = make_writer_with_builder(&schema)?;
1259 check_writer(writer, &schema)
1260 }
1261
1262 #[test]
1263 fn test_logical_writer() -> TestResult {
1264 const LOGICAL_TYPE_SCHEMA: &str = r#"
1265 {
1266 "type": "record",
1267 "name": "logical_type_test",
1268 "fields": [
1269 {
1270 "name": "a",
1271 "type": [
1272 "null",
1273 {
1274 "type": "long",
1275 "logicalType": "timestamp-micros"
1276 }
1277 ]
1278 }
1279 ]
1280 }
1281 "#;
1282 let codec = Codec::Deflate(DeflateSettings::default());
1283 let schema = Schema::parse_str(LOGICAL_TYPE_SCHEMA)?;
1284 let mut writer = Writer::builder()
1285 .schema(&schema)
1286 .codec(codec)
1287 .writer(Vec::new())
1288 .build()?;
1289
1290 let mut record1 = Record::new(&schema).unwrap();
1291 record1.put(
1292 "a",
1293 Value::Union(1, Box::new(Value::TimestampMicros(1234_i64))),
1294 );
1295
1296 let mut record2 = Record::new(&schema).unwrap();
1297 record2.put("a", Value::Union(0, Box::new(Value::Null)));
1298
1299 let n1 = writer.append(record1)?;
1300 let n2 = writer.append(record2)?;
1301 let n3 = writer.flush()?;
1302 let result = writer.into_inner()?;
1303
1304 assert_eq!(n1 + n2 + n3, result.len());
1305
1306 let mut data = Vec::new();
1307 zig_i64(1, &mut data)?;
1309 zig_i64(1234, &mut data)?;
1310
1311 zig_i64(0, &mut data)?;
1313 codec.compress(&mut data)?;
1314
1315 assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1317 let last_data_byte = result.len() - 16;
1319 assert_eq!(
1320 &result[last_data_byte - data.len()..last_data_byte],
1321 data.as_slice()
1322 );
1323
1324 Ok(())
1325 }
1326
1327 #[test]
1328 fn test_avro_3405_writer_add_metadata_success() -> TestResult {
1329 let schema = Schema::parse_str(SCHEMA)?;
1330 let mut writer = Writer::new(&schema, Vec::new())?;
1331
1332 writer.add_user_metadata("stringKey".to_string(), String::from("stringValue"))?;
1333 writer.add_user_metadata("strKey".to_string(), "strValue")?;
1334 writer.add_user_metadata("bytesKey".to_string(), b"bytesValue")?;
1335 writer.add_user_metadata("vecKey".to_string(), vec![1, 2, 3])?;
1336
1337 let mut record = Record::new(&schema).unwrap();
1338 record.put("a", 27i64);
1339 record.put("b", "foo");
1340
1341 writer.append(record.clone())?;
1342 writer.append(record.clone())?;
1343 writer.flush()?;
1344 let result = writer.into_inner()?;
1345
1346 assert_eq!(result.len(), 244);
1347
1348 Ok(())
1349 }
1350
1351 #[test]
1352 fn test_avro_3881_metadata_empty_body() -> TestResult {
1353 let schema = Schema::parse_str(SCHEMA)?;
1354 let mut writer = Writer::new(&schema, Vec::new())?;
1355 writer.add_user_metadata("a".to_string(), "b")?;
1356 let result = writer.into_inner()?;
1357
1358 let reader = Reader::with_schema(&schema, &result[..])?;
1359 let mut expected = HashMap::new();
1360 expected.insert("a".to_string(), vec![b'b']);
1361 assert_eq!(reader.user_metadata(), &expected);
1362 assert_eq!(reader.into_iter().count(), 0);
1363
1364 Ok(())
1365 }
1366
1367 #[test]
1368 fn test_avro_3405_writer_add_metadata_failure() -> TestResult {
1369 let schema = Schema::parse_str(SCHEMA)?;
1370 let mut writer = Writer::new(&schema, Vec::new())?;
1371
1372 let mut record = Record::new(&schema).unwrap();
1373 record.put("a", 27i64);
1374 record.put("b", "foo");
1375 writer.append(record.clone())?;
1376
1377 match writer
1378 .add_user_metadata("stringKey".to_string(), String::from("value2"))
1379 .map_err(Error::into_details)
1380 {
1381 Err(e @ Details::FileHeaderAlreadyWritten) => {
1382 assert_eq!(e.to_string(), "The file metadata is already flushed.")
1383 }
1384 Err(e) => panic!("Unexpected error occurred while writing user metadata: {e:?}"),
1385 Ok(_) => panic!("Expected an error that metadata cannot be added after adding data"),
1386 }
1387
1388 Ok(())
1389 }
1390
1391 #[test]
1392 fn test_avro_3405_writer_add_metadata_reserved_prefix_failure() -> TestResult {
1393 let schema = Schema::parse_str(SCHEMA)?;
1394 let mut writer = Writer::new(&schema, Vec::new())?;
1395
1396 let key = "avro.stringKey".to_string();
1397 match writer
1398 .add_user_metadata(key.clone(), "value")
1399 .map_err(Error::into_details)
1400 {
1401 Err(ref e @ Details::InvalidMetadataKey(_)) => {
1402 assert_eq!(
1403 e.to_string(),
1404 format!(
1405 "Metadata keys starting with 'avro.' are reserved for internal usage: {key}."
1406 )
1407 )
1408 }
1409 Err(e) => panic!(
1410 "Unexpected error occurred while writing user metadata with reserved prefix ('avro.'): {e:?}"
1411 ),
1412 Ok(_) => {
1413 panic!("Expected an error that the metadata key cannot be prefixed with 'avro.'")
1414 }
1415 }
1416
1417 Ok(())
1418 }
1419
1420 #[test]
1421 fn test_avro_3405_writer_add_metadata_with_builder_api_success() -> TestResult {
1422 let schema = Schema::parse_str(SCHEMA)?;
1423
1424 let mut user_meta_data: HashMap<String, Value> = HashMap::new();
1425 user_meta_data.insert(
1426 "stringKey".to_string(),
1427 Value::String("stringValue".to_string()),
1428 );
1429 user_meta_data.insert("bytesKey".to_string(), Value::Bytes(b"bytesValue".to_vec()));
1430 user_meta_data.insert("vecKey".to_string(), Value::Bytes(vec![1, 2, 3]));
1431
1432 let writer: Writer<'_, Vec<u8>> = Writer::builder()
1433 .writer(Vec::new())
1434 .schema(&schema)
1435 .user_metadata(user_meta_data.clone())
1436 .build()?;
1437
1438 assert_eq!(writer.user_metadata, user_meta_data);
1439
1440 Ok(())
1441 }
1442
1443 #[derive(Serialize, Clone)]
1444 struct TestSingleObjectWriter {
1445 a: i64,
1446 b: f64,
1447 c: Vec<String>,
1448 }
1449
1450 impl AvroSchema for TestSingleObjectWriter {
1451 fn get_schema() -> Schema {
1452 let schema = r#"
1453 {
1454 "type":"record",
1455 "name":"TestSingleObjectWrtierSerialize",
1456 "fields":[
1457 {
1458 "name":"a",
1459 "type":"long"
1460 },
1461 {
1462 "name":"b",
1463 "type":"double"
1464 },
1465 {
1466 "name":"c",
1467 "type":{
1468 "type":"array",
1469 "items":"string"
1470 }
1471 }
1472 ]
1473 }
1474 "#;
1475 Schema::parse_str(schema).unwrap()
1476 }
1477 }
1478
1479 impl From<TestSingleObjectWriter> for Value {
1480 fn from(obj: TestSingleObjectWriter) -> Value {
1481 Value::Record(vec![
1482 ("a".into(), obj.a.into()),
1483 ("b".into(), obj.b.into()),
1484 (
1485 "c".into(),
1486 Value::Array(obj.c.into_iter().map(|s| s.into()).collect()),
1487 ),
1488 ])
1489 }
1490 }
1491
1492 #[test]
1493 fn test_single_object_writer() -> TestResult {
1494 let mut buf: Vec<u8> = Vec::new();
1495 let obj = TestSingleObjectWriter {
1496 a: 300,
1497 b: 34.555,
1498 c: vec!["cat".into(), "dog".into()],
1499 };
1500 let mut writer = GenericSingleObjectWriter::new_with_capacity(
1501 &TestSingleObjectWriter::get_schema(),
1502 1024,
1503 )
1504 .expect("Should resolve schema");
1505 let value = obj.into();
1506 let written_bytes = writer
1507 .write_value_ref(&value, &mut buf)
1508 .expect("Error serializing properly");
1509
1510 assert!(buf.len() > 10, "no bytes written");
1511 assert_eq!(buf.len(), written_bytes);
1512 assert_eq!(buf[0], 0xC3);
1513 assert_eq!(buf[1], 0x01);
1514 assert_eq!(
1515 &buf[2..10],
1516 &TestSingleObjectWriter::get_schema()
1517 .fingerprint::<Rabin>()
1518 .bytes[..]
1519 );
1520 let mut msg_binary = Vec::new();
1521 encode(
1522 &value,
1523 &TestSingleObjectWriter::get_schema(),
1524 &mut msg_binary,
1525 )
1526 .expect("encode should have failed by here as a dependency of any writing");
1527 assert_eq!(&buf[10..], &msg_binary[..]);
1528
1529 Ok(())
1530 }
1531
1532 #[test]
1533 fn test_single_object_writer_with_header_builder() -> TestResult {
1534 let mut buf: Vec<u8> = Vec::new();
1535 let obj = TestSingleObjectWriter {
1536 a: 300,
1537 b: 34.555,
1538 c: vec!["cat".into(), "dog".into()],
1539 };
1540 let schema_uuid = Uuid::parse_str("b2f1cf00-0434-013e-439a-125eb8485a5f")?;
1541 let header_builder = GlueSchemaUuidHeader::from_uuid(schema_uuid);
1542 let mut writer = GenericSingleObjectWriter::new_with_capacity_and_header_builder(
1543 &TestSingleObjectWriter::get_schema(),
1544 1024,
1545 header_builder,
1546 )
1547 .expect("Should resolve schema");
1548 let value = obj.into();
1549 writer
1550 .write_value_ref(&value, &mut buf)
1551 .expect("Error serializing properly");
1552
1553 assert_eq!(buf[0], 0x03);
1554 assert_eq!(buf[1], 0x00);
1555 assert_eq!(buf[2..18], schema_uuid.into_bytes()[..]);
1556 Ok(())
1557 }
1558
1559 #[test]
1560 fn test_writer_parity() -> TestResult {
1561 let obj1 = TestSingleObjectWriter {
1562 a: 300,
1563 b: 34.555,
1564 c: vec!["cat".into(), "dog".into()],
1565 };
1566
1567 let mut buf1: Vec<u8> = Vec::new();
1568 let mut buf2: Vec<u8> = Vec::new();
1569 let mut buf3: Vec<u8> = Vec::new();
1570
1571 let mut generic_writer = GenericSingleObjectWriter::new_with_capacity(
1572 &TestSingleObjectWriter::get_schema(),
1573 1024,
1574 )
1575 .expect("Should resolve schema");
1576 let mut specific_writer =
1577 SpecificSingleObjectWriter::<TestSingleObjectWriter>::with_capacity(1024)
1578 .expect("Resolved should pass");
1579 specific_writer
1580 .write(obj1.clone(), &mut buf1)
1581 .expect("Serialization expected");
1582 specific_writer
1583 .write_value(obj1.clone(), &mut buf2)
1584 .expect("Serialization expected");
1585 generic_writer
1586 .write_value(obj1.into(), &mut buf3)
1587 .expect("Serialization expected");
1588 assert_eq!(buf1, buf2);
1589 assert_eq!(buf1, buf3);
1590
1591 Ok(())
1592 }
1593
1594 #[test]
1595 fn avro_3894_take_aliases_into_account_when_serializing() -> TestResult {
1596 const SCHEMA: &str = r#"
1597 {
1598 "type": "record",
1599 "name": "Conference",
1600 "fields": [
1601 {"type": "string", "name": "name"},
1602 {"type": ["null", "long"], "name": "date", "aliases" : [ "time2", "time" ]}
1603 ]
1604 }"#;
1605
1606 #[derive(Debug, PartialEq, Eq, Clone, Serialize)]
1607 pub struct Conference {
1608 pub name: String,
1609 pub time: Option<i64>,
1610 }
1611
1612 let conf = Conference {
1613 name: "RustConf".to_string(),
1614 time: Some(1234567890),
1615 };
1616
1617 let schema = Schema::parse_str(SCHEMA)?;
1618 let mut writer = Writer::new(&schema, Vec::new())?;
1619
1620 let bytes = writer.append_ser(conf)?;
1621
1622 assert_eq!(182, bytes);
1623
1624 Ok(())
1625 }
1626
1627 #[test]
1628 fn avro_4014_validation_returns_a_detailed_error() -> TestResult {
1629 const SCHEMA: &str = r#"
1630 {
1631 "type": "record",
1632 "name": "Conference",
1633 "fields": [
1634 {"type": "string", "name": "name"},
1635 {"type": ["null", "long"], "name": "date", "aliases" : [ "time2", "time" ]}
1636 ]
1637 }"#;
1638
1639 #[derive(Debug, PartialEq, Clone, Serialize)]
1640 pub struct Conference {
1641 pub name: String,
1642 pub time: Option<f64>, }
1644
1645 let conf = Conference {
1646 name: "RustConf".to_string(),
1647 time: Some(12345678.90),
1648 };
1649
1650 let schema = Schema::parse_str(SCHEMA)?;
1651 let mut writer = Writer::new(&schema, Vec::new())?;
1652
1653 match writer.append_ser(conf) {
1654 Ok(bytes) => panic!("Expected an error, but got {bytes} bytes written"),
1655 Err(e) => {
1656 assert_eq!(
1657 e.to_string(),
1658 r#"Failed to serialize field 'time' for record Record(RecordSchema { name: Name { name: "Conference", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "name", doc: None, aliases: None, default: None, schema: String, order: Ascending, position: 0, custom_attributes: {} }, RecordField { name: "date", doc: None, aliases: Some(["time2", "time"]), default: None, schema: Union(UnionSchema { schemas: [Null, Long], variant_index: {Null: 0, Long: 1} }), order: Ascending, position: 1, custom_attributes: {} }], lookup: {"date": 1, "name": 0, "time": 1, "time2": 1}, attributes: {} }): Failed to serialize value of type f64 using schema Union(UnionSchema { schemas: [Null, Long], variant_index: {Null: 0, Long: 1} }): 12345678.9. Cause: Cannot find a Double schema in [Null, Long]"#
1659 );
1660 }
1661 }
1662 Ok(())
1663 }
1664
1665 #[test]
1666 fn avro_4063_flush_applies_to_inner_writer() -> TestResult {
1667 const SCHEMA: &str = r#"
1668 {
1669 "type": "record",
1670 "name": "ExampleSchema",
1671 "fields": [
1672 {"name": "exampleField", "type": "string"}
1673 ]
1674 }
1675 "#;
1676
1677 #[derive(Clone, Default)]
1678 struct TestBuffer(Rc<RefCell<Vec<u8>>>);
1679
1680 impl TestBuffer {
1681 fn len(&self) -> usize {
1682 self.0.borrow().len()
1683 }
1684 }
1685
1686 impl Write for TestBuffer {
1687 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
1688 self.0.borrow_mut().write(buf)
1689 }
1690
1691 fn flush(&mut self) -> std::io::Result<()> {
1692 Ok(())
1693 }
1694 }
1695
1696 let shared_buffer = TestBuffer::default();
1697
1698 let buffered_writer = std::io::BufWriter::new(shared_buffer.clone());
1699
1700 let schema = Schema::parse_str(SCHEMA)?;
1701
1702 let mut writer = Writer::new(&schema, buffered_writer)?;
1703
1704 let mut record = Record::new(writer.schema()).unwrap();
1705 record.put("exampleField", "value");
1706
1707 writer.append(record)?;
1708 writer.flush()?;
1709
1710 assert_eq!(
1711 shared_buffer.len(),
1712 151,
1713 "the test buffer was not fully written to after Writer::flush was called"
1714 );
1715
1716 Ok(())
1717 }
1718}