1use crate::{
20 AvroResult, Codec, Error,
21 encode::{encode, encode_internal, encode_to_vec},
22 error::Details,
23 headers::{HeaderBuilder, RabinFingerprintHeader},
24 schema::{AvroSchema, Name, ResolvedOwnedSchema, ResolvedSchema, Schema},
25 ser_schema::SchemaAwareWriteSerializer,
26 types::Value,
27};
28use serde::Serialize;
29use std::{
30 collections::HashMap, io::Write, marker::PhantomData, mem::ManuallyDrop, ops::RangeInclusive,
31};
32
33const DEFAULT_BLOCK_SIZE: usize = 16000;
34const AVRO_OBJECT_HEADER: &[u8] = b"Obj\x01";
35
36pub struct Writer<'a, W: Write> {
42 schema: &'a Schema,
43 writer: W,
44 resolved_schema: ResolvedSchema<'a>,
45 codec: Codec,
46 block_size: usize,
47 buffer: Vec<u8>,
48 num_values: usize,
49 marker: [u8; 16],
50 has_header: bool,
51 user_metadata: HashMap<String, Value>,
52}
53
54#[bon::bon]
55impl<'a, W: Write> Writer<'a, W> {
56 #[builder]
57 pub fn builder(
58 schema: &'a Schema,
59 schemata: Option<Vec<&'a Schema>>,
60 writer: W,
61 #[builder(default = Codec::Null)] codec: Codec,
62 #[builder(default = DEFAULT_BLOCK_SIZE)] block_size: usize,
63 #[builder(default = generate_sync_marker())] marker: [u8; 16],
64 #[builder(default = false)]
68 has_header: bool,
69 #[builder(default)] user_metadata: HashMap<String, Value>,
70 ) -> AvroResult<Self> {
71 let resolved_schema = if let Some(schemata) = schemata {
72 ResolvedSchema::try_from(schemata)?
73 } else {
74 ResolvedSchema::try_from(schema)?
75 };
76 Ok(Self {
77 schema,
78 writer,
79 resolved_schema,
80 codec,
81 block_size,
82 buffer: Vec::with_capacity(block_size),
83 num_values: 0,
84 marker,
85 has_header,
86 user_metadata,
87 })
88 }
89}
90
91impl<'a, W: Write> Writer<'a, W> {
92 pub fn new(schema: &'a Schema, writer: W) -> AvroResult<Self> {
96 Writer::with_codec(schema, writer, Codec::Null)
97 }
98
99 pub fn with_codec(schema: &'a Schema, writer: W, codec: Codec) -> AvroResult<Self> {
102 Self::builder()
103 .schema(schema)
104 .writer(writer)
105 .codec(codec)
106 .build()
107 }
108
109 pub fn with_schemata(
114 schema: &'a Schema,
115 schemata: Vec<&'a Schema>,
116 writer: W,
117 codec: Codec,
118 ) -> AvroResult<Self> {
119 Self::builder()
120 .schema(schema)
121 .schemata(schemata)
122 .writer(writer)
123 .codec(codec)
124 .build()
125 }
126
127 pub fn append_to(schema: &'a Schema, writer: W, marker: [u8; 16]) -> AvroResult<Self> {
131 Writer::append_to_with_codec(schema, writer, Codec::Null, marker)
132 }
133
134 pub fn append_to_with_codec(
137 schema: &'a Schema,
138 writer: W,
139 codec: Codec,
140 marker: [u8; 16],
141 ) -> AvroResult<Self> {
142 Self::builder()
143 .schema(schema)
144 .writer(writer)
145 .codec(codec)
146 .marker(marker)
147 .has_header(true)
148 .build()
149 }
150
151 pub fn append_to_with_codec_schemata(
154 schema: &'a Schema,
155 schemata: Vec<&'a Schema>,
156 writer: W,
157 codec: Codec,
158 marker: [u8; 16],
159 ) -> AvroResult<Self> {
160 Self::builder()
161 .schema(schema)
162 .schemata(schemata)
163 .writer(writer)
164 .codec(codec)
165 .marker(marker)
166 .has_header(true)
167 .build()
168 }
169
170 pub fn schema(&self) -> &'a Schema {
172 self.schema
173 }
174
175 pub fn append<T: Into<Value>>(&mut self, value: T) -> AvroResult<usize> {
184 let n = self.maybe_write_header()?;
185
186 let avro = value.into();
187 self.append_value_ref(&avro).map(|m| m + n)
188 }
189
190 pub fn append_value_ref(&mut self, value: &Value) -> AvroResult<usize> {
198 let n = self.maybe_write_header()?;
199
200 write_value_ref_resolved(self.schema, &self.resolved_schema, value, &mut self.buffer)?;
201 self.num_values += 1;
202
203 if self.buffer.len() >= self.block_size {
204 return self.flush().map(|b| b + n);
205 }
206
207 Ok(n)
208 }
209
210 pub fn append_ser<S: Serialize>(&mut self, value: S) -> AvroResult<usize> {
220 let n = self.maybe_write_header()?;
221
222 let mut serializer = SchemaAwareWriteSerializer::new(
223 &mut self.buffer,
224 self.schema,
225 self.resolved_schema.get_names(),
226 None,
227 );
228 value.serialize(&mut serializer)?;
229 self.num_values += 1;
230
231 if self.buffer.len() >= self.block_size {
232 return self.flush().map(|b| b + n);
233 }
234
235 Ok(n)
236 }
237
238 pub fn extend<I, T: Into<Value>>(&mut self, values: I) -> AvroResult<usize>
246 where
247 I: IntoIterator<Item = T>,
248 {
249 let mut num_bytes = 0;
264 for value in values {
265 num_bytes += self.append(value)?;
266 }
267 num_bytes += self.flush()?;
268
269 Ok(num_bytes)
270 }
271
272 pub fn extend_ser<I, T: Serialize>(&mut self, values: I) -> AvroResult<usize>
281 where
282 I: IntoIterator<Item = T>,
283 {
284 let mut num_bytes = 0;
299 for value in values {
300 num_bytes += self.append_ser(value)?;
301 }
302 num_bytes += self.flush()?;
303
304 Ok(num_bytes)
305 }
306
307 pub fn extend_from_slice(&mut self, values: &[Value]) -> AvroResult<usize> {
315 let mut num_bytes = 0;
316 for value in values {
317 num_bytes += self.append_value_ref(value)?;
318 }
319 num_bytes += self.flush()?;
320
321 Ok(num_bytes)
322 }
323
324 pub fn flush(&mut self) -> AvroResult<usize> {
332 let mut num_bytes = self.maybe_write_header()?;
333 if self.num_values == 0 {
334 return Ok(num_bytes);
335 }
336
337 self.codec.compress(&mut self.buffer)?;
338
339 let num_values = self.num_values;
340 let stream_len = self.buffer.len();
341
342 num_bytes += self.append_raw(&num_values.into(), &Schema::Long)?
343 + self.append_raw(&stream_len.into(), &Schema::Long)?
344 + self
345 .writer
346 .write(self.buffer.as_ref())
347 .map_err(Details::WriteBytes)?
348 + self.append_marker()?;
349
350 self.buffer.clear();
351 self.num_values = 0;
352
353 self.writer.flush().map_err(Details::FlushWriter)?;
354
355 Ok(num_bytes)
356 }
357
358 pub fn into_inner(mut self) -> AvroResult<W> {
363 self.maybe_write_header()?;
364 self.flush()?;
365
366 let mut this = ManuallyDrop::new(self);
367
368 let _buffer = std::mem::take(&mut this.buffer);
370 let _user_metadata = std::mem::take(&mut this.user_metadata);
371 unsafe { std::ptr::drop_in_place(&mut this.resolved_schema) };
373
374 let writer = unsafe { std::ptr::read(&this.writer) };
376
377 Ok(writer)
378 }
379
380 pub fn get_ref(&self) -> &W {
385 &self.writer
386 }
387
388 pub fn get_mut(&mut self) -> &mut W {
395 &mut self.writer
396 }
397
398 fn append_marker(&mut self) -> AvroResult<usize> {
400 self.writer
403 .write(&self.marker)
404 .map_err(|e| Details::WriteMarker(e).into())
405 }
406
407 fn append_raw(&mut self, value: &Value, schema: &Schema) -> AvroResult<usize> {
409 self.append_bytes(encode_to_vec(value, schema)?.as_ref())
410 }
411
412 fn append_bytes(&mut self, bytes: &[u8]) -> AvroResult<usize> {
414 self.writer
415 .write(bytes)
416 .map_err(|e| Details::WriteBytes(e).into())
417 }
418
419 pub fn add_user_metadata<T: AsRef<[u8]>>(&mut self, key: String, value: T) -> AvroResult<()> {
422 if !self.has_header {
423 if key.starts_with("avro.") {
424 return Err(Details::InvalidMetadataKey(key).into());
425 }
426 self.user_metadata
427 .insert(key, Value::Bytes(value.as_ref().to_vec()));
428 Ok(())
429 } else {
430 Err(Details::FileHeaderAlreadyWritten.into())
431 }
432 }
433
434 fn header(&self) -> Result<Vec<u8>, Error> {
436 let schema_bytes = serde_json::to_string(self.schema)
437 .map_err(Details::ConvertJsonToString)?
438 .into_bytes();
439
440 let mut metadata = HashMap::with_capacity(2);
441 metadata.insert("avro.schema", Value::Bytes(schema_bytes));
442 metadata.insert("avro.codec", self.codec.into());
443 match self.codec {
444 #[cfg(feature = "bzip")]
445 Codec::Bzip2(settings) => {
446 metadata.insert(
447 "avro.codec.compression_level",
448 Value::Bytes(vec![settings.compression_level]),
449 );
450 }
451 #[cfg(feature = "xz")]
452 Codec::Xz(settings) => {
453 metadata.insert(
454 "avro.codec.compression_level",
455 Value::Bytes(vec![settings.compression_level]),
456 );
457 }
458 #[cfg(feature = "zstandard")]
459 Codec::Zstandard(settings) => {
460 metadata.insert(
461 "avro.codec.compression_level",
462 Value::Bytes(vec![settings.compression_level]),
463 );
464 }
465 _ => {}
466 }
467
468 for (k, v) in &self.user_metadata {
469 metadata.insert(k.as_str(), v.clone());
470 }
471
472 let mut header = Vec::new();
473 header.extend_from_slice(AVRO_OBJECT_HEADER);
474 encode(&metadata.into(), &Schema::map(Schema::Bytes), &mut header)?;
475 header.extend_from_slice(&self.marker);
476
477 Ok(header)
478 }
479
480 fn maybe_write_header(&mut self) -> AvroResult<usize> {
481 if !self.has_header {
482 let header = self.header()?;
483 let n = self.append_bytes(header.as_ref())?;
484 self.has_header = true;
485 Ok(n)
486 } else {
487 Ok(0)
488 }
489 }
490}
491
492impl<W: Write> Drop for Writer<'_, W> {
493 fn drop(&mut self) {
495 let _ = self.maybe_write_header();
496 let _ = self.flush();
497 }
498}
499
500fn write_avro_datum<T: Into<Value>, W: Write>(
506 schema: &Schema,
507 value: T,
508 writer: &mut W,
509) -> Result<(), Error> {
510 let avro = value.into();
511 if !avro.validate(schema) {
512 return Err(Details::Validation.into());
513 }
514 encode(&avro, schema, writer)?;
515 Ok(())
516}
517
518fn write_avro_datum_schemata<T: Into<Value>>(
519 schema: &Schema,
520 schemata: Vec<&Schema>,
521 value: T,
522 buffer: &mut Vec<u8>,
523) -> AvroResult<usize> {
524 let avro = value.into();
525 let rs = ResolvedSchema::try_from(schemata)?;
526 let names = rs.get_names();
527 let enclosing_namespace = schema.namespace();
528 if let Some(_err) = avro.validate_internal(schema, names, &enclosing_namespace) {
529 return Err(Details::Validation.into());
530 }
531 encode_internal(&avro, schema, names, &enclosing_namespace, buffer)
532}
533
534pub struct GenericSingleObjectWriter {
538 buffer: Vec<u8>,
539 resolved: ResolvedOwnedSchema,
540}
541
542impl GenericSingleObjectWriter {
543 pub fn new_with_capacity(
544 schema: &Schema,
545 initial_buffer_cap: usize,
546 ) -> AvroResult<GenericSingleObjectWriter> {
547 let header_builder = RabinFingerprintHeader::from_schema(schema);
548 Self::new_with_capacity_and_header_builder(schema, initial_buffer_cap, header_builder)
549 }
550
551 pub fn new_with_capacity_and_header_builder<HB: HeaderBuilder>(
552 schema: &Schema,
553 initial_buffer_cap: usize,
554 header_builder: HB,
555 ) -> AvroResult<GenericSingleObjectWriter> {
556 let mut buffer = Vec::with_capacity(initial_buffer_cap);
557 let header = header_builder.build_header();
558 buffer.extend_from_slice(&header);
559
560 Ok(GenericSingleObjectWriter {
561 buffer,
562 resolved: ResolvedOwnedSchema::try_from(schema.clone())?,
563 })
564 }
565
566 const HEADER_LENGTH_RANGE: RangeInclusive<usize> = 10_usize..=20_usize;
567
568 pub fn write_value_ref<W: Write>(&mut self, v: &Value, writer: &mut W) -> AvroResult<usize> {
570 let original_length = self.buffer.len();
571 if !Self::HEADER_LENGTH_RANGE.contains(&original_length) {
572 Err(Details::IllegalSingleObjectWriterState.into())
573 } else {
574 write_value_ref_owned_resolved(&self.resolved, v, &mut self.buffer)?;
575 writer
576 .write_all(&self.buffer)
577 .map_err(Details::WriteBytes)?;
578 let len = self.buffer.len();
579 self.buffer.truncate(original_length);
580 Ok(len)
581 }
582 }
583
584 pub fn write_value<W: Write>(&mut self, v: Value, writer: &mut W) -> AvroResult<usize> {
586 self.write_value_ref(&v, writer)
587 }
588}
589
590pub struct SpecificSingleObjectWriter<T>
592where
593 T: AvroSchema,
594{
595 inner: GenericSingleObjectWriter,
596 schema: Schema,
597 header_written: bool,
598 _model: PhantomData<T>,
599}
600
601impl<T> SpecificSingleObjectWriter<T>
602where
603 T: AvroSchema,
604{
605 pub fn with_capacity(buffer_cap: usize) -> AvroResult<SpecificSingleObjectWriter<T>> {
606 let schema = T::get_schema();
607 Ok(SpecificSingleObjectWriter {
608 inner: GenericSingleObjectWriter::new_with_capacity(&schema, buffer_cap)?,
609 schema,
610 header_written: false,
611 _model: PhantomData,
612 })
613 }
614}
615
616impl<T> SpecificSingleObjectWriter<T>
617where
618 T: AvroSchema + Into<Value>,
619{
620 pub fn write_value<W: Write>(&mut self, data: T, writer: &mut W) -> AvroResult<usize> {
623 let v: Value = data.into();
624 self.inner.write_value_ref(&v, writer)
625 }
626}
627
628impl<T> SpecificSingleObjectWriter<T>
629where
630 T: AvroSchema + Serialize,
631{
632 pub fn write_ref<W: Write>(&mut self, data: &T, writer: &mut W) -> AvroResult<usize> {
635 let mut bytes_written: usize = 0;
636
637 if !self.header_written {
638 bytes_written += writer
639 .write(self.inner.buffer.as_slice())
640 .map_err(Details::WriteBytes)?;
641 self.header_written = true;
642 }
643
644 bytes_written += write_avro_datum_ref(&self.schema, data, writer)?;
645
646 Ok(bytes_written)
647 }
648
649 pub fn write<W: Write>(&mut self, data: T, writer: &mut W) -> AvroResult<usize> {
652 self.write_ref(&data, writer)
653 }
654}
655
656fn write_value_ref_resolved(
657 schema: &Schema,
658 resolved_schema: &ResolvedSchema,
659 value: &Value,
660 buffer: &mut Vec<u8>,
661) -> AvroResult<usize> {
662 match value.validate_internal(schema, resolved_schema.get_names(), &schema.namespace()) {
663 Some(reason) => Err(Details::ValidationWithReason {
664 value: value.clone(),
665 schema: schema.clone(),
666 reason,
667 }
668 .into()),
669 None => encode_internal(
670 value,
671 schema,
672 resolved_schema.get_names(),
673 &schema.namespace(),
674 buffer,
675 ),
676 }
677}
678
679fn write_value_ref_owned_resolved(
680 resolved_schema: &ResolvedOwnedSchema,
681 value: &Value,
682 buffer: &mut Vec<u8>,
683) -> AvroResult<()> {
684 let root_schema = resolved_schema.get_root_schema();
685 if let Some(reason) = value.validate_internal(
686 root_schema,
687 resolved_schema.get_names(),
688 &root_schema.namespace(),
689 ) {
690 return Err(Details::ValidationWithReason {
691 value: value.clone(),
692 schema: root_schema.clone(),
693 reason,
694 }
695 .into());
696 }
697 encode_internal(
698 value,
699 root_schema,
700 resolved_schema.get_names(),
701 &root_schema.namespace(),
702 buffer,
703 )?;
704 Ok(())
705}
706
707pub fn to_avro_datum<T: Into<Value>>(schema: &Schema, value: T) -> AvroResult<Vec<u8>> {
714 let mut buffer = Vec::new();
715 write_avro_datum(schema, value, &mut buffer)?;
716 Ok(buffer)
717}
718
719pub fn write_avro_datum_ref<T: Serialize, W: Write>(
726 schema: &Schema,
727 data: &T,
728 writer: &mut W,
729) -> AvroResult<usize> {
730 let names: HashMap<Name, &Schema> = HashMap::new();
731 let mut serializer = SchemaAwareWriteSerializer::new(writer, schema, &names, None);
732 let bytes_written = data.serialize(&mut serializer)?;
733 Ok(bytes_written)
734}
735
736pub fn to_avro_datum_schemata<T: Into<Value>>(
741 schema: &Schema,
742 schemata: Vec<&Schema>,
743 value: T,
744) -> AvroResult<Vec<u8>> {
745 let mut buffer = Vec::new();
746 write_avro_datum_schemata(schema, schemata, value, &mut buffer)?;
747 Ok(buffer)
748}
749
750#[cfg(not(target_arch = "wasm32"))]
751fn generate_sync_marker() -> [u8; 16] {
752 let mut marker = [0_u8; 16];
753 std::iter::repeat_with(rand::random)
754 .take(16)
755 .enumerate()
756 .for_each(|(i, n)| marker[i] = n);
757 marker
758}
759
760#[cfg(target_arch = "wasm32")]
761fn generate_sync_marker() -> [u8; 16] {
762 let mut marker = [0_u8; 16];
763 std::iter::repeat_with(quad_rand::rand)
764 .take(4)
765 .flat_map(|i| i.to_be_bytes())
766 .enumerate()
767 .for_each(|(i, n)| marker[i] = n);
768 marker
769}
770
771#[cfg(test)]
772mod tests {
773 use std::{cell::RefCell, rc::Rc};
774
775 use super::*;
776 use crate::{
777 Reader,
778 decimal::Decimal,
779 duration::{Days, Duration, Millis, Months},
780 headers::GlueSchemaUuidHeader,
781 rabin::Rabin,
782 schema::{DecimalSchema, FixedSchema, Name},
783 types::Record,
784 util::zig_i64,
785 };
786 use pretty_assertions::assert_eq;
787 use serde::{Deserialize, Serialize};
788 use uuid::Uuid;
789
790 use crate::{codec::DeflateSettings, error::Details};
791 use apache_avro_test_helper::TestResult;
792
793 const AVRO_OBJECT_HEADER_LEN: usize = AVRO_OBJECT_HEADER.len();
794
795 const SCHEMA: &str = r#"
796 {
797 "type": "record",
798 "name": "test",
799 "fields": [
800 {
801 "name": "a",
802 "type": "long",
803 "default": 42
804 },
805 {
806 "name": "b",
807 "type": "string"
808 }
809 ]
810 }
811 "#;
812
813 const UNION_SCHEMA: &str = r#"["null", "long"]"#;
814
815 #[test]
816 fn test_to_avro_datum() -> TestResult {
817 let schema = Schema::parse_str(SCHEMA)?;
818 let mut record = Record::new(&schema).unwrap();
819 record.put("a", 27i64);
820 record.put("b", "foo");
821
822 let mut expected = Vec::new();
823 zig_i64(27, &mut expected)?;
824 zig_i64(3, &mut expected)?;
825 expected.extend([b'f', b'o', b'o']);
826
827 assert_eq!(to_avro_datum(&schema, record)?, expected);
828
829 Ok(())
830 }
831
832 #[test]
833 fn avro_rs_193_write_avro_datum_ref() -> TestResult {
834 #[derive(Serialize)]
835 struct TestStruct {
836 a: i64,
837 b: String,
838 }
839
840 let schema = Schema::parse_str(SCHEMA)?;
841 let mut writer: Vec<u8> = Vec::new();
842 let data = TestStruct {
843 a: 27,
844 b: "foo".to_string(),
845 };
846
847 let mut expected = Vec::new();
848 zig_i64(27, &mut expected)?;
849 zig_i64(3, &mut expected)?;
850 expected.extend([b'f', b'o', b'o']);
851
852 let bytes = write_avro_datum_ref(&schema, &data, &mut writer)?;
853
854 assert_eq!(bytes, expected.len());
855 assert_eq!(writer, expected);
856
857 Ok(())
858 }
859
860 #[test]
861 fn avro_rs_220_flush_write_header() -> TestResult {
862 let schema = Schema::parse_str(SCHEMA)?;
863
864 let mut writer = Writer::new(&schema, Vec::new())?;
866 writer.flush()?;
867 let result = writer.into_inner()?;
868 assert_eq!(result.len(), 163);
869
870 let mut writer = Writer::builder()
872 .has_header(true)
873 .schema(&schema)
874 .writer(Vec::new())
875 .build()?;
876 writer.flush()?;
877 let result = writer.into_inner()?;
878 assert_eq!(result.len(), 0);
879
880 Ok(())
881 }
882
883 #[test]
884 fn test_union_not_null() -> TestResult {
885 let schema = Schema::parse_str(UNION_SCHEMA)?;
886 let union = Value::Union(1, Box::new(Value::Long(3)));
887
888 let mut expected = Vec::new();
889 zig_i64(1, &mut expected)?;
890 zig_i64(3, &mut expected)?;
891
892 assert_eq!(to_avro_datum(&schema, union)?, expected);
893
894 Ok(())
895 }
896
897 #[test]
898 fn test_union_null() -> TestResult {
899 let schema = Schema::parse_str(UNION_SCHEMA)?;
900 let union = Value::Union(0, Box::new(Value::Null));
901
902 let mut expected = Vec::new();
903 zig_i64(0, &mut expected)?;
904
905 assert_eq!(to_avro_datum(&schema, union)?, expected);
906
907 Ok(())
908 }
909
910 fn logical_type_test<T: Into<Value> + Clone>(
911 schema_str: &'static str,
912
913 expected_schema: &Schema,
914 value: Value,
915
916 raw_schema: &Schema,
917 raw_value: T,
918 ) -> TestResult {
919 let schema = Schema::parse_str(schema_str)?;
920 assert_eq!(&schema, expected_schema);
921 let ser = to_avro_datum(&schema, value.clone())?;
923 let raw_ser = to_avro_datum(raw_schema, raw_value)?;
924 assert_eq!(ser, raw_ser);
925
926 let mut r = ser.as_slice();
928 let de = crate::from_avro_datum(&schema, &mut r, None)?;
929 assert_eq!(de, value);
930 Ok(())
931 }
932
933 #[test]
934 fn date() -> TestResult {
935 logical_type_test(
936 r#"{"type": "int", "logicalType": "date"}"#,
937 &Schema::Date,
938 Value::Date(1_i32),
939 &Schema::Int,
940 1_i32,
941 )
942 }
943
944 #[test]
945 fn time_millis() -> TestResult {
946 logical_type_test(
947 r#"{"type": "int", "logicalType": "time-millis"}"#,
948 &Schema::TimeMillis,
949 Value::TimeMillis(1_i32),
950 &Schema::Int,
951 1_i32,
952 )
953 }
954
955 #[test]
956 fn time_micros() -> TestResult {
957 logical_type_test(
958 r#"{"type": "long", "logicalType": "time-micros"}"#,
959 &Schema::TimeMicros,
960 Value::TimeMicros(1_i64),
961 &Schema::Long,
962 1_i64,
963 )
964 }
965
966 #[test]
967 fn timestamp_millis() -> TestResult {
968 logical_type_test(
969 r#"{"type": "long", "logicalType": "timestamp-millis"}"#,
970 &Schema::TimestampMillis,
971 Value::TimestampMillis(1_i64),
972 &Schema::Long,
973 1_i64,
974 )
975 }
976
977 #[test]
978 fn timestamp_micros() -> TestResult {
979 logical_type_test(
980 r#"{"type": "long", "logicalType": "timestamp-micros"}"#,
981 &Schema::TimestampMicros,
982 Value::TimestampMicros(1_i64),
983 &Schema::Long,
984 1_i64,
985 )
986 }
987
988 #[test]
989 fn decimal_fixed() -> TestResult {
990 let size = 30;
991 let inner = Schema::Fixed(FixedSchema {
992 name: Name::new("decimal")?,
993 aliases: None,
994 doc: None,
995 size,
996 default: None,
997 attributes: Default::default(),
998 });
999 let value = vec![0u8; size];
1000 logical_type_test(
1001 r#"{"type": {"type": "fixed", "size": 30, "name": "decimal"}, "logicalType": "decimal", "precision": 20, "scale": 5}"#,
1002 &Schema::Decimal(DecimalSchema {
1003 precision: 20,
1004 scale: 5,
1005 inner: Box::new(inner.clone()),
1006 }),
1007 Value::Decimal(Decimal::from(value.clone())),
1008 &inner,
1009 Value::Fixed(size, value),
1010 )
1011 }
1012
1013 #[test]
1014 fn decimal_bytes() -> TestResult {
1015 let inner = Schema::Bytes;
1016 let value = vec![0u8; 10];
1017 logical_type_test(
1018 r#"{"type": "bytes", "logicalType": "decimal", "precision": 4, "scale": 3}"#,
1019 &Schema::Decimal(DecimalSchema {
1020 precision: 4,
1021 scale: 3,
1022 inner: Box::new(inner.clone()),
1023 }),
1024 Value::Decimal(Decimal::from(value.clone())),
1025 &inner,
1026 value,
1027 )
1028 }
1029
1030 #[test]
1031 fn duration() -> TestResult {
1032 let inner = Schema::Fixed(FixedSchema {
1033 name: Name::new("duration")?,
1034 aliases: None,
1035 doc: None,
1036 size: 12,
1037 default: None,
1038 attributes: Default::default(),
1039 });
1040 let value = Value::Duration(Duration::new(
1041 Months::new(256),
1042 Days::new(512),
1043 Millis::new(1024),
1044 ));
1045 logical_type_test(
1046 r#"{"type": {"type": "fixed", "name": "duration", "size": 12}, "logicalType": "duration"}"#,
1047 &Schema::Duration,
1048 value,
1049 &inner,
1050 Value::Fixed(12, vec![0, 1, 0, 0, 0, 2, 0, 0, 0, 4, 0, 0]),
1051 )
1052 }
1053
1054 #[test]
1055 fn test_writer_append() -> TestResult {
1056 let schema = Schema::parse_str(SCHEMA)?;
1057 let mut writer = Writer::new(&schema, Vec::new())?;
1058
1059 let mut record = Record::new(&schema).unwrap();
1060 record.put("a", 27i64);
1061 record.put("b", "foo");
1062
1063 let n1 = writer.append(record.clone())?;
1064 let n2 = writer.append(record.clone())?;
1065 let n3 = writer.flush()?;
1066 let result = writer.into_inner()?;
1067
1068 assert_eq!(n1 + n2 + n3, result.len());
1069
1070 let mut data = Vec::new();
1071 zig_i64(27, &mut data)?;
1072 zig_i64(3, &mut data)?;
1073 data.extend(b"foo");
1074 data.extend(data.clone());
1075
1076 assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1078 let last_data_byte = result.len() - 16;
1080 assert_eq!(
1081 &result[last_data_byte - data.len()..last_data_byte],
1082 data.as_slice()
1083 );
1084
1085 Ok(())
1086 }
1087
1088 #[test]
1089 fn test_writer_extend() -> TestResult {
1090 let schema = Schema::parse_str(SCHEMA)?;
1091 let mut writer = Writer::new(&schema, Vec::new())?;
1092
1093 let mut record = Record::new(&schema).unwrap();
1094 record.put("a", 27i64);
1095 record.put("b", "foo");
1096 let record_copy = record.clone();
1097 let records = vec![record, record_copy];
1098
1099 let n1 = writer.extend(records)?;
1100 let n2 = writer.flush()?;
1101 let result = writer.into_inner()?;
1102
1103 assert_eq!(n1 + n2, result.len());
1104
1105 let mut data = Vec::new();
1106 zig_i64(27, &mut data)?;
1107 zig_i64(3, &mut data)?;
1108 data.extend(b"foo");
1109 data.extend(data.clone());
1110
1111 assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1113 let last_data_byte = result.len() - 16;
1115 assert_eq!(
1116 &result[last_data_byte - data.len()..last_data_byte],
1117 data.as_slice()
1118 );
1119
1120 Ok(())
1121 }
1122
1123 #[derive(Debug, Clone, Deserialize, Serialize)]
1124 struct TestSerdeSerialize {
1125 a: i64,
1126 b: String,
1127 }
1128
1129 #[test]
1130 fn test_writer_append_ser() -> TestResult {
1131 let schema = Schema::parse_str(SCHEMA)?;
1132 let mut writer = Writer::new(&schema, Vec::new())?;
1133
1134 let record = TestSerdeSerialize {
1135 a: 27,
1136 b: "foo".to_owned(),
1137 };
1138
1139 let n1 = writer.append_ser(record)?;
1140 let n2 = writer.flush()?;
1141 let result = writer.into_inner()?;
1142
1143 assert_eq!(n1 + n2, result.len());
1144
1145 let mut data = Vec::new();
1146 zig_i64(27, &mut data)?;
1147 zig_i64(3, &mut data)?;
1148 data.extend(b"foo");
1149
1150 assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1152 let last_data_byte = result.len() - 16;
1154 assert_eq!(
1155 &result[last_data_byte - data.len()..last_data_byte],
1156 data.as_slice()
1157 );
1158
1159 Ok(())
1160 }
1161
1162 #[test]
1163 fn test_writer_extend_ser() -> TestResult {
1164 let schema = Schema::parse_str(SCHEMA)?;
1165 let mut writer = Writer::new(&schema, Vec::new())?;
1166
1167 let record = TestSerdeSerialize {
1168 a: 27,
1169 b: "foo".to_owned(),
1170 };
1171 let record_copy = record.clone();
1172 let records = vec![record, record_copy];
1173
1174 let n1 = writer.extend_ser(records)?;
1175 let n2 = writer.flush()?;
1176 let result = writer.into_inner()?;
1177
1178 assert_eq!(n1 + n2, result.len());
1179
1180 let mut data = Vec::new();
1181 zig_i64(27, &mut data)?;
1182 zig_i64(3, &mut data)?;
1183 data.extend(b"foo");
1184 data.extend(data.clone());
1185
1186 assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1188 let last_data_byte = result.len() - 16;
1190 assert_eq!(
1191 &result[last_data_byte - data.len()..last_data_byte],
1192 data.as_slice()
1193 );
1194
1195 Ok(())
1196 }
1197
1198 fn make_writer_with_codec(schema: &Schema) -> AvroResult<Writer<'_, Vec<u8>>> {
1199 Writer::with_codec(
1200 schema,
1201 Vec::new(),
1202 Codec::Deflate(DeflateSettings::default()),
1203 )
1204 }
1205
1206 fn make_writer_with_builder(schema: &Schema) -> AvroResult<Writer<'_, Vec<u8>>> {
1207 Writer::builder()
1208 .writer(Vec::new())
1209 .schema(schema)
1210 .codec(Codec::Deflate(DeflateSettings::default()))
1211 .block_size(100)
1212 .build()
1213 }
1214
1215 fn check_writer(mut writer: Writer<'_, Vec<u8>>, schema: &Schema) -> TestResult {
1216 let mut record = Record::new(schema).unwrap();
1217 record.put("a", 27i64);
1218 record.put("b", "foo");
1219
1220 let n1 = writer.append(record.clone())?;
1221 let n2 = writer.append(record.clone())?;
1222 let n3 = writer.flush()?;
1223 let result = writer.into_inner()?;
1224
1225 assert_eq!(n1 + n2 + n3, result.len());
1226
1227 let mut data = Vec::new();
1228 zig_i64(27, &mut data)?;
1229 zig_i64(3, &mut data)?;
1230 data.extend(b"foo");
1231 data.extend(data.clone());
1232 Codec::Deflate(DeflateSettings::default()).compress(&mut data)?;
1233
1234 assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1236 let last_data_byte = result.len() - 16;
1238 assert_eq!(
1239 &result[last_data_byte - data.len()..last_data_byte],
1240 data.as_slice()
1241 );
1242
1243 Ok(())
1244 }
1245
1246 #[test]
1247 fn test_writer_with_codec() -> TestResult {
1248 let schema = Schema::parse_str(SCHEMA)?;
1249 let writer = make_writer_with_codec(&schema)?;
1250 check_writer(writer, &schema)
1251 }
1252
1253 #[test]
1254 fn test_writer_with_builder() -> TestResult {
1255 let schema = Schema::parse_str(SCHEMA)?;
1256 let writer = make_writer_with_builder(&schema)?;
1257 check_writer(writer, &schema)
1258 }
1259
1260 #[test]
1261 fn test_logical_writer() -> TestResult {
1262 const LOGICAL_TYPE_SCHEMA: &str = r#"
1263 {
1264 "type": "record",
1265 "name": "logical_type_test",
1266 "fields": [
1267 {
1268 "name": "a",
1269 "type": [
1270 "null",
1271 {
1272 "type": "long",
1273 "logicalType": "timestamp-micros"
1274 }
1275 ]
1276 }
1277 ]
1278 }
1279 "#;
1280 let codec = Codec::Deflate(DeflateSettings::default());
1281 let schema = Schema::parse_str(LOGICAL_TYPE_SCHEMA)?;
1282 let mut writer = Writer::builder()
1283 .schema(&schema)
1284 .codec(codec)
1285 .writer(Vec::new())
1286 .build()?;
1287
1288 let mut record1 = Record::new(&schema).unwrap();
1289 record1.put(
1290 "a",
1291 Value::Union(1, Box::new(Value::TimestampMicros(1234_i64))),
1292 );
1293
1294 let mut record2 = Record::new(&schema).unwrap();
1295 record2.put("a", Value::Union(0, Box::new(Value::Null)));
1296
1297 let n1 = writer.append(record1)?;
1298 let n2 = writer.append(record2)?;
1299 let n3 = writer.flush()?;
1300 let result = writer.into_inner()?;
1301
1302 assert_eq!(n1 + n2 + n3, result.len());
1303
1304 let mut data = Vec::new();
1305 zig_i64(1, &mut data)?;
1307 zig_i64(1234, &mut data)?;
1308
1309 zig_i64(0, &mut data)?;
1311 codec.compress(&mut data)?;
1312
1313 assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1315 let last_data_byte = result.len() - 16;
1317 assert_eq!(
1318 &result[last_data_byte - data.len()..last_data_byte],
1319 data.as_slice()
1320 );
1321
1322 Ok(())
1323 }
1324
1325 #[test]
1326 fn test_avro_3405_writer_add_metadata_success() -> TestResult {
1327 let schema = Schema::parse_str(SCHEMA)?;
1328 let mut writer = Writer::new(&schema, Vec::new())?;
1329
1330 writer.add_user_metadata("stringKey".to_string(), String::from("stringValue"))?;
1331 writer.add_user_metadata("strKey".to_string(), "strValue")?;
1332 writer.add_user_metadata("bytesKey".to_string(), b"bytesValue")?;
1333 writer.add_user_metadata("vecKey".to_string(), vec![1, 2, 3])?;
1334
1335 let mut record = Record::new(&schema).unwrap();
1336 record.put("a", 27i64);
1337 record.put("b", "foo");
1338
1339 writer.append(record.clone())?;
1340 writer.append(record.clone())?;
1341 writer.flush()?;
1342 let result = writer.into_inner()?;
1343
1344 assert_eq!(result.len(), 260);
1345
1346 Ok(())
1347 }
1348
1349 #[test]
1350 fn test_avro_3881_metadata_empty_body() -> TestResult {
1351 let schema = Schema::parse_str(SCHEMA)?;
1352 let mut writer = Writer::new(&schema, Vec::new())?;
1353 writer.add_user_metadata("a".to_string(), "b")?;
1354 let result = writer.into_inner()?;
1355
1356 let reader = Reader::with_schema(&schema, &result[..])?;
1357 let mut expected = HashMap::new();
1358 expected.insert("a".to_string(), vec![b'b']);
1359 assert_eq!(reader.user_metadata(), &expected);
1360 assert_eq!(reader.into_iter().count(), 0);
1361
1362 Ok(())
1363 }
1364
1365 #[test]
1366 fn test_avro_3405_writer_add_metadata_failure() -> TestResult {
1367 let schema = Schema::parse_str(SCHEMA)?;
1368 let mut writer = Writer::new(&schema, Vec::new())?;
1369
1370 let mut record = Record::new(&schema).unwrap();
1371 record.put("a", 27i64);
1372 record.put("b", "foo");
1373 writer.append(record.clone())?;
1374
1375 match writer
1376 .add_user_metadata("stringKey".to_string(), String::from("value2"))
1377 .map_err(Error::into_details)
1378 {
1379 Err(e @ Details::FileHeaderAlreadyWritten) => {
1380 assert_eq!(e.to_string(), "The file metadata is already flushed.")
1381 }
1382 Err(e) => panic!("Unexpected error occurred while writing user metadata: {e:?}"),
1383 Ok(_) => panic!("Expected an error that metadata cannot be added after adding data"),
1384 }
1385
1386 Ok(())
1387 }
1388
1389 #[test]
1390 fn test_avro_3405_writer_add_metadata_reserved_prefix_failure() -> TestResult {
1391 let schema = Schema::parse_str(SCHEMA)?;
1392 let mut writer = Writer::new(&schema, Vec::new())?;
1393
1394 let key = "avro.stringKey".to_string();
1395 match writer
1396 .add_user_metadata(key.clone(), "value")
1397 .map_err(Error::into_details)
1398 {
1399 Err(ref e @ Details::InvalidMetadataKey(_)) => {
1400 assert_eq!(
1401 e.to_string(),
1402 format!(
1403 "Metadata keys starting with 'avro.' are reserved for internal usage: {key}."
1404 )
1405 )
1406 }
1407 Err(e) => panic!(
1408 "Unexpected error occurred while writing user metadata with reserved prefix ('avro.'): {e:?}"
1409 ),
1410 Ok(_) => {
1411 panic!("Expected an error that the metadata key cannot be prefixed with 'avro.'")
1412 }
1413 }
1414
1415 Ok(())
1416 }
1417
1418 #[test]
1419 fn test_avro_3405_writer_add_metadata_with_builder_api_success() -> TestResult {
1420 let schema = Schema::parse_str(SCHEMA)?;
1421
1422 let mut user_meta_data: HashMap<String, Value> = HashMap::new();
1423 user_meta_data.insert(
1424 "stringKey".to_string(),
1425 Value::String("stringValue".to_string()),
1426 );
1427 user_meta_data.insert("bytesKey".to_string(), Value::Bytes(b"bytesValue".to_vec()));
1428 user_meta_data.insert("vecKey".to_string(), Value::Bytes(vec![1, 2, 3]));
1429
1430 let writer: Writer<'_, Vec<u8>> = Writer::builder()
1431 .writer(Vec::new())
1432 .schema(&schema)
1433 .user_metadata(user_meta_data.clone())
1434 .build()?;
1435
1436 assert_eq!(writer.user_metadata, user_meta_data);
1437
1438 Ok(())
1439 }
1440
1441 #[derive(Serialize, Clone)]
1442 struct TestSingleObjectWriter {
1443 a: i64,
1444 b: f64,
1445 c: Vec<String>,
1446 }
1447
1448 impl AvroSchema for TestSingleObjectWriter {
1449 fn get_schema() -> Schema {
1450 let schema = r#"
1451 {
1452 "type":"record",
1453 "name":"TestSingleObjectWrtierSerialize",
1454 "fields":[
1455 {
1456 "name":"a",
1457 "type":"long"
1458 },
1459 {
1460 "name":"b",
1461 "type":"double"
1462 },
1463 {
1464 "name":"c",
1465 "type":{
1466 "type":"array",
1467 "items":"string"
1468 }
1469 }
1470 ]
1471 }
1472 "#;
1473 Schema::parse_str(schema).unwrap()
1474 }
1475 }
1476
1477 impl From<TestSingleObjectWriter> for Value {
1478 fn from(obj: TestSingleObjectWriter) -> Value {
1479 Value::Record(vec![
1480 ("a".into(), obj.a.into()),
1481 ("b".into(), obj.b.into()),
1482 (
1483 "c".into(),
1484 Value::Array(obj.c.into_iter().map(|s| s.into()).collect()),
1485 ),
1486 ])
1487 }
1488 }
1489
1490 #[test]
1491 fn test_single_object_writer() -> TestResult {
1492 let mut buf: Vec<u8> = Vec::new();
1493 let obj = TestSingleObjectWriter {
1494 a: 300,
1495 b: 34.555,
1496 c: vec!["cat".into(), "dog".into()],
1497 };
1498 let mut writer = GenericSingleObjectWriter::new_with_capacity(
1499 &TestSingleObjectWriter::get_schema(),
1500 1024,
1501 )
1502 .expect("Should resolve schema");
1503 let value = obj.into();
1504 let written_bytes = writer
1505 .write_value_ref(&value, &mut buf)
1506 .expect("Error serializing properly");
1507
1508 assert!(buf.len() > 10, "no bytes written");
1509 assert_eq!(buf.len(), written_bytes);
1510 assert_eq!(buf[0], 0xC3);
1511 assert_eq!(buf[1], 0x01);
1512 assert_eq!(
1513 &buf[2..10],
1514 &TestSingleObjectWriter::get_schema()
1515 .fingerprint::<Rabin>()
1516 .bytes[..]
1517 );
1518 let mut msg_binary = Vec::new();
1519 encode(
1520 &value,
1521 &TestSingleObjectWriter::get_schema(),
1522 &mut msg_binary,
1523 )
1524 .expect("encode should have failed by here as a dependency of any writing");
1525 assert_eq!(&buf[10..], &msg_binary[..]);
1526
1527 Ok(())
1528 }
1529
1530 #[test]
1531 fn test_single_object_writer_with_header_builder() -> TestResult {
1532 let mut buf: Vec<u8> = Vec::new();
1533 let obj = TestSingleObjectWriter {
1534 a: 300,
1535 b: 34.555,
1536 c: vec!["cat".into(), "dog".into()],
1537 };
1538 let schema_uuid = Uuid::parse_str("b2f1cf00-0434-013e-439a-125eb8485a5f")?;
1539 let header_builder = GlueSchemaUuidHeader::from_uuid(schema_uuid);
1540 let mut writer = GenericSingleObjectWriter::new_with_capacity_and_header_builder(
1541 &TestSingleObjectWriter::get_schema(),
1542 1024,
1543 header_builder,
1544 )
1545 .expect("Should resolve schema");
1546 let value = obj.into();
1547 writer
1548 .write_value_ref(&value, &mut buf)
1549 .expect("Error serializing properly");
1550
1551 assert_eq!(buf[0], 0x03);
1552 assert_eq!(buf[1], 0x00);
1553 assert_eq!(buf[2..18], schema_uuid.into_bytes()[..]);
1554 Ok(())
1555 }
1556
1557 #[test]
1558 fn test_writer_parity() -> TestResult {
1559 let obj1 = TestSingleObjectWriter {
1560 a: 300,
1561 b: 34.555,
1562 c: vec!["cat".into(), "dog".into()],
1563 };
1564
1565 let mut buf1: Vec<u8> = Vec::new();
1566 let mut buf2: Vec<u8> = Vec::new();
1567 let mut buf3: Vec<u8> = Vec::new();
1568
1569 let mut generic_writer = GenericSingleObjectWriter::new_with_capacity(
1570 &TestSingleObjectWriter::get_schema(),
1571 1024,
1572 )
1573 .expect("Should resolve schema");
1574 let mut specific_writer =
1575 SpecificSingleObjectWriter::<TestSingleObjectWriter>::with_capacity(1024)
1576 .expect("Resolved should pass");
1577 specific_writer
1578 .write(obj1.clone(), &mut buf1)
1579 .expect("Serialization expected");
1580 specific_writer
1581 .write_value(obj1.clone(), &mut buf2)
1582 .expect("Serialization expected");
1583 generic_writer
1584 .write_value(obj1.into(), &mut buf3)
1585 .expect("Serialization expected");
1586 assert_eq!(buf1, buf2);
1587 assert_eq!(buf1, buf3);
1588
1589 Ok(())
1590 }
1591
1592 #[test]
1593 fn avro_3894_take_aliases_into_account_when_serializing() -> TestResult {
1594 const SCHEMA: &str = r#"
1595 {
1596 "type": "record",
1597 "name": "Conference",
1598 "fields": [
1599 {"type": "string", "name": "name"},
1600 {"type": ["null", "long"], "name": "date", "aliases" : [ "time2", "time" ]}
1601 ]
1602 }"#;
1603
1604 #[derive(Debug, PartialEq, Eq, Clone, Serialize)]
1605 pub struct Conference {
1606 pub name: String,
1607 pub time: Option<i64>,
1608 }
1609
1610 let conf = Conference {
1611 name: "RustConf".to_string(),
1612 time: Some(1234567890),
1613 };
1614
1615 let schema = Schema::parse_str(SCHEMA)?;
1616 let mut writer = Writer::new(&schema, Vec::new())?;
1617
1618 let bytes = writer.append_ser(conf)?;
1619
1620 assert_eq!(198, bytes);
1621
1622 Ok(())
1623 }
1624
1625 #[test]
1626 fn avro_4014_validation_returns_a_detailed_error() -> TestResult {
1627 const SCHEMA: &str = r#"
1628 {
1629 "type": "record",
1630 "name": "Conference",
1631 "fields": [
1632 {"type": "string", "name": "name"},
1633 {"type": ["null", "long"], "name": "date", "aliases" : [ "time2", "time" ]}
1634 ]
1635 }"#;
1636
1637 #[derive(Debug, PartialEq, Clone, Serialize)]
1638 pub struct Conference {
1639 pub name: String,
1640 pub time: Option<f64>, }
1642
1643 let conf = Conference {
1644 name: "RustConf".to_string(),
1645 time: Some(12345678.90),
1646 };
1647
1648 let schema = Schema::parse_str(SCHEMA)?;
1649 let mut writer = Writer::new(&schema, Vec::new())?;
1650
1651 match writer.append_ser(conf) {
1652 Ok(bytes) => panic!("Expected an error, but got {bytes} bytes written"),
1653 Err(e) => {
1654 assert_eq!(
1655 e.to_string(),
1656 r#"Failed to serialize field 'time' for record Record(RecordSchema { name: Name { name: "Conference", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "name", doc: None, aliases: None, default: None, schema: String, order: Ascending, position: 0, custom_attributes: {} }, RecordField { name: "date", doc: None, aliases: Some(["time2", "time"]), default: None, schema: Union(UnionSchema { schemas: [Null, Long], variant_index: {Null: 0, Long: 1} }), order: Ascending, position: 1, custom_attributes: {} }], lookup: {"date": 1, "name": 0, "time": 1, "time2": 1}, attributes: {} }): Failed to serialize value of type f64 using schema Long: 12345678.9. Cause: Expected: Long. Got: Double"#
1657 );
1658 }
1659 }
1660 Ok(())
1661 }
1662
1663 #[test]
1664 fn avro_4063_flush_applies_to_inner_writer() -> TestResult {
1665 const SCHEMA: &str = r#"
1666 {
1667 "type": "record",
1668 "name": "ExampleSchema",
1669 "fields": [
1670 {"name": "exampleField", "type": "string"}
1671 ]
1672 }
1673 "#;
1674
1675 #[derive(Clone, Default)]
1676 struct TestBuffer(Rc<RefCell<Vec<u8>>>);
1677
1678 impl TestBuffer {
1679 fn len(&self) -> usize {
1680 self.0.borrow().len()
1681 }
1682 }
1683
1684 impl Write for TestBuffer {
1685 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
1686 self.0.borrow_mut().write(buf)
1687 }
1688
1689 fn flush(&mut self) -> std::io::Result<()> {
1690 Ok(())
1691 }
1692 }
1693
1694 let shared_buffer = TestBuffer::default();
1695
1696 let buffered_writer = std::io::BufWriter::new(shared_buffer.clone());
1697
1698 let schema = Schema::parse_str(SCHEMA)?;
1699
1700 let mut writer = Writer::new(&schema, buffered_writer)?;
1701
1702 let mut record = Record::new(writer.schema()).unwrap();
1703 record.put("exampleField", "value");
1704
1705 writer.append(record)?;
1706 writer.flush()?;
1707
1708 assert_eq!(
1709 shared_buffer.len(),
1710 167,
1711 "the test buffer was not fully written to after Writer::flush was called"
1712 );
1713
1714 Ok(())
1715 }
1716}