1use crate::{
20 AvroResult, Codec, Error,
21 encode::{encode, encode_internal, encode_to_vec},
22 error::Details,
23 headers::{HeaderBuilder, RabinFingerprintHeader},
24 schema::{AvroSchema, Name, ResolvedOwnedSchema, ResolvedSchema, Schema},
25 ser_schema::SchemaAwareWriteSerializer,
26 types::Value,
27};
28use serde::Serialize;
29use std::{
30 collections::HashMap, io::Write, marker::PhantomData, mem::ManuallyDrop, ops::RangeInclusive,
31};
32
33const DEFAULT_BLOCK_SIZE: usize = 16000;
34const AVRO_OBJECT_HEADER: &[u8] = b"Obj\x01";
35
36#[derive(bon::Builder)]
42pub struct Writer<'a, W: Write> {
43 schema: &'a Schema,
44 writer: W,
45 #[builder(skip)]
46 resolved_schema: Option<ResolvedSchema<'a>>,
47 #[builder(default = Codec::Null)]
48 codec: Codec,
49 #[builder(default = DEFAULT_BLOCK_SIZE)]
50 block_size: usize,
51 #[builder(skip = Vec::with_capacity(block_size))]
52 buffer: Vec<u8>,
53 #[builder(skip)]
54 num_values: usize,
55 #[builder(default = generate_sync_marker())]
56 marker: [u8; 16],
57 #[builder(default = false)]
61 has_header: bool,
62 #[builder(default)]
63 user_metadata: HashMap<String, Value>,
64}
65
66impl<'a, W: Write> Writer<'a, W> {
67 pub fn new(schema: &'a Schema, writer: W) -> Self {
71 Writer::with_codec(schema, writer, Codec::Null)
72 }
73
74 pub fn with_codec(schema: &'a Schema, writer: W, codec: Codec) -> Self {
77 let mut w = Self::builder()
78 .schema(schema)
79 .writer(writer)
80 .codec(codec)
81 .build();
82 w.resolved_schema = ResolvedSchema::try_from(schema).ok();
83 w
84 }
85
86 pub fn with_schemata(
91 schema: &'a Schema,
92 schemata: Vec<&'a Schema>,
93 writer: W,
94 codec: Codec,
95 ) -> Self {
96 let mut w = Self::builder()
97 .schema(schema)
98 .writer(writer)
99 .codec(codec)
100 .build();
101 w.resolved_schema = ResolvedSchema::try_from(schemata).ok();
102 w
103 }
104
105 pub fn append_to(schema: &'a Schema, writer: W, marker: [u8; 16]) -> Self {
109 Writer::append_to_with_codec(schema, writer, Codec::Null, marker)
110 }
111
112 pub fn append_to_with_codec(
115 schema: &'a Schema,
116 writer: W,
117 codec: Codec,
118 marker: [u8; 16],
119 ) -> Self {
120 let mut w = Self::builder()
121 .schema(schema)
122 .writer(writer)
123 .codec(codec)
124 .marker(marker)
125 .has_header(true)
126 .build();
127 w.resolved_schema = ResolvedSchema::try_from(schema).ok();
128 w
129 }
130
131 pub fn append_to_with_codec_schemata(
134 schema: &'a Schema,
135 schemata: Vec<&'a Schema>,
136 writer: W,
137 codec: Codec,
138 marker: [u8; 16],
139 ) -> Self {
140 let mut w = Self::builder()
141 .schema(schema)
142 .writer(writer)
143 .codec(codec)
144 .marker(marker)
145 .has_header(true)
146 .build();
147 w.resolved_schema = ResolvedSchema::try_from(schemata).ok();
148 w
149 }
150
151 pub fn schema(&self) -> &'a Schema {
153 self.schema
154 }
155
156 pub fn append<T: Into<Value>>(&mut self, value: T) -> AvroResult<usize> {
165 let n = self.maybe_write_header()?;
166
167 let avro = value.into();
168 self.append_value_ref(&avro).map(|m| m + n)
169 }
170
171 pub fn append_value_ref(&mut self, value: &Value) -> AvroResult<usize> {
179 let n = self.maybe_write_header()?;
180
181 match self.resolved_schema {
183 Some(ref rs) => {
184 write_value_ref_resolved(self.schema, rs, value, &mut self.buffer)?;
185 self.num_values += 1;
186
187 if self.buffer.len() >= self.block_size {
188 return self.flush().map(|b| b + n);
189 }
190
191 Ok(n)
192 }
193 None => {
194 let rs = ResolvedSchema::try_from(self.schema)?;
195 self.resolved_schema = Some(rs);
196 self.append_value_ref(value)
197 }
198 }
199 }
200
201 pub fn append_ser<S: Serialize>(&mut self, value: S) -> AvroResult<usize> {
211 let n = self.maybe_write_header()?;
212
213 match self.resolved_schema {
214 Some(ref rs) => {
215 let mut serializer = SchemaAwareWriteSerializer::new(
216 &mut self.buffer,
217 self.schema,
218 rs.get_names(),
219 None,
220 );
221 value.serialize(&mut serializer)?;
222 self.num_values += 1;
223
224 if self.buffer.len() >= self.block_size {
225 return self.flush().map(|b| b + n);
226 }
227
228 Ok(n)
229 }
230 None => {
231 let rs = ResolvedSchema::try_from(self.schema)?;
232 self.resolved_schema = Some(rs);
233 self.append_ser(value)
234 }
235 }
236 }
237
238 pub fn extend<I, T: Into<Value>>(&mut self, values: I) -> AvroResult<usize>
246 where
247 I: IntoIterator<Item = T>,
248 {
249 let mut num_bytes = 0;
264 for value in values {
265 num_bytes += self.append(value)?;
266 }
267 num_bytes += self.flush()?;
268
269 Ok(num_bytes)
270 }
271
272 pub fn extend_ser<I, T: Serialize>(&mut self, values: I) -> AvroResult<usize>
281 where
282 I: IntoIterator<Item = T>,
283 {
284 let mut num_bytes = 0;
299 for value in values {
300 num_bytes += self.append_ser(value)?;
301 }
302 num_bytes += self.flush()?;
303
304 Ok(num_bytes)
305 }
306
307 pub fn extend_from_slice(&mut self, values: &[Value]) -> AvroResult<usize> {
315 let mut num_bytes = 0;
316 for value in values {
317 num_bytes += self.append_value_ref(value)?;
318 }
319 num_bytes += self.flush()?;
320
321 Ok(num_bytes)
322 }
323
324 pub fn flush(&mut self) -> AvroResult<usize> {
332 let mut num_bytes = self.maybe_write_header()?;
333 if self.num_values == 0 {
334 return Ok(num_bytes);
335 }
336
337 self.codec.compress(&mut self.buffer)?;
338
339 let num_values = self.num_values;
340 let stream_len = self.buffer.len();
341
342 num_bytes += self.append_raw(&num_values.into(), &Schema::Long)?
343 + self.append_raw(&stream_len.into(), &Schema::Long)?
344 + self
345 .writer
346 .write(self.buffer.as_ref())
347 .map_err(Details::WriteBytes)?
348 + self.append_marker()?;
349
350 self.buffer.clear();
351 self.num_values = 0;
352
353 self.writer.flush().map_err(Details::FlushWriter)?;
354
355 Ok(num_bytes)
356 }
357
358 pub fn into_inner(mut self) -> AvroResult<W> {
363 self.maybe_write_header()?;
364 self.flush()?;
365
366 let mut this = ManuallyDrop::new(self);
367
368 let _resolved_schema = std::mem::take(&mut this.resolved_schema);
370 let _buffer = std::mem::take(&mut this.buffer);
371 let _user_metadata = std::mem::take(&mut this.user_metadata);
372
373 let writer = unsafe { std::ptr::read(&this.writer) };
375
376 Ok(writer)
377 }
378
379 pub fn get_ref(&self) -> &W {
384 &self.writer
385 }
386
387 pub fn get_mut(&mut self) -> &mut W {
394 &mut self.writer
395 }
396
397 fn append_marker(&mut self) -> AvroResult<usize> {
399 self.writer
402 .write(&self.marker)
403 .map_err(|e| Details::WriteMarker(e).into())
404 }
405
406 fn append_raw(&mut self, value: &Value, schema: &Schema) -> AvroResult<usize> {
408 self.append_bytes(encode_to_vec(value, schema)?.as_ref())
409 }
410
411 fn append_bytes(&mut self, bytes: &[u8]) -> AvroResult<usize> {
413 self.writer
414 .write(bytes)
415 .map_err(|e| Details::WriteBytes(e).into())
416 }
417
418 pub fn add_user_metadata<T: AsRef<[u8]>>(&mut self, key: String, value: T) -> AvroResult<()> {
421 if !self.has_header {
422 if key.starts_with("avro.") {
423 return Err(Details::InvalidMetadataKey(key).into());
424 }
425 self.user_metadata
426 .insert(key, Value::Bytes(value.as_ref().to_vec()));
427 Ok(())
428 } else {
429 Err(Details::FileHeaderAlreadyWritten.into())
430 }
431 }
432
433 fn header(&self) -> Result<Vec<u8>, Error> {
435 let schema_bytes = serde_json::to_string(self.schema)
436 .map_err(Details::ConvertJsonToString)?
437 .into_bytes();
438
439 let mut metadata = HashMap::with_capacity(2);
440 metadata.insert("avro.schema", Value::Bytes(schema_bytes));
441 metadata.insert("avro.codec", self.codec.into());
442 match self.codec {
443 #[cfg(feature = "bzip")]
444 Codec::Bzip2(settings) => {
445 metadata.insert(
446 "avro.codec.compression_level",
447 Value::Bytes(vec![settings.compression_level]),
448 );
449 }
450 #[cfg(feature = "xz")]
451 Codec::Xz(settings) => {
452 metadata.insert(
453 "avro.codec.compression_level",
454 Value::Bytes(vec![settings.compression_level]),
455 );
456 }
457 #[cfg(feature = "zstandard")]
458 Codec::Zstandard(settings) => {
459 metadata.insert(
460 "avro.codec.compression_level",
461 Value::Bytes(vec![settings.compression_level]),
462 );
463 }
464 _ => {}
465 }
466
467 for (k, v) in &self.user_metadata {
468 metadata.insert(k.as_str(), v.clone());
469 }
470
471 let mut header = Vec::new();
472 header.extend_from_slice(AVRO_OBJECT_HEADER);
473 encode(&metadata.into(), &Schema::map(Schema::Bytes), &mut header)?;
474 header.extend_from_slice(&self.marker);
475
476 Ok(header)
477 }
478
479 fn maybe_write_header(&mut self) -> AvroResult<usize> {
480 if !self.has_header {
481 let header = self.header()?;
482 let n = self.append_bytes(header.as_ref())?;
483 self.has_header = true;
484 Ok(n)
485 } else {
486 Ok(0)
487 }
488 }
489}
490
491impl<W: Write> Drop for Writer<'_, W> {
492 fn drop(&mut self) {
494 let _ = self.maybe_write_header();
495 let _ = self.flush();
496 }
497}
498
499fn write_avro_datum<T: Into<Value>, W: Write>(
505 schema: &Schema,
506 value: T,
507 writer: &mut W,
508) -> Result<(), Error> {
509 let avro = value.into();
510 if !avro.validate(schema) {
511 return Err(Details::Validation.into());
512 }
513 encode(&avro, schema, writer)?;
514 Ok(())
515}
516
517fn write_avro_datum_schemata<T: Into<Value>>(
518 schema: &Schema,
519 schemata: Vec<&Schema>,
520 value: T,
521 buffer: &mut Vec<u8>,
522) -> AvroResult<usize> {
523 let avro = value.into();
524 let rs = ResolvedSchema::try_from(schemata)?;
525 let names = rs.get_names();
526 let enclosing_namespace = schema.namespace();
527 if let Some(_err) = avro.validate_internal(schema, names, &enclosing_namespace) {
528 return Err(Details::Validation.into());
529 }
530 encode_internal(&avro, schema, names, &enclosing_namespace, buffer)
531}
532
533pub struct GenericSingleObjectWriter {
537 buffer: Vec<u8>,
538 resolved: ResolvedOwnedSchema,
539}
540
541impl GenericSingleObjectWriter {
542 pub fn new_with_capacity(
543 schema: &Schema,
544 initial_buffer_cap: usize,
545 ) -> AvroResult<GenericSingleObjectWriter> {
546 let header_builder = RabinFingerprintHeader::from_schema(schema);
547 Self::new_with_capacity_and_header_builder(schema, initial_buffer_cap, header_builder)
548 }
549
550 pub fn new_with_capacity_and_header_builder<HB: HeaderBuilder>(
551 schema: &Schema,
552 initial_buffer_cap: usize,
553 header_builder: HB,
554 ) -> AvroResult<GenericSingleObjectWriter> {
555 let mut buffer = Vec::with_capacity(initial_buffer_cap);
556 let header = header_builder.build_header();
557 buffer.extend_from_slice(&header);
558
559 Ok(GenericSingleObjectWriter {
560 buffer,
561 resolved: ResolvedOwnedSchema::try_from(schema.clone())?,
562 })
563 }
564
565 const HEADER_LENGTH_RANGE: RangeInclusive<usize> = 10_usize..=20_usize;
566
567 pub fn write_value_ref<W: Write>(&mut self, v: &Value, writer: &mut W) -> AvroResult<usize> {
569 let original_length = self.buffer.len();
570 if !Self::HEADER_LENGTH_RANGE.contains(&original_length) {
571 Err(Details::IllegalSingleObjectWriterState.into())
572 } else {
573 write_value_ref_owned_resolved(&self.resolved, v, &mut self.buffer)?;
574 writer
575 .write_all(&self.buffer)
576 .map_err(Details::WriteBytes)?;
577 let len = self.buffer.len();
578 self.buffer.truncate(original_length);
579 Ok(len)
580 }
581 }
582
583 pub fn write_value<W: Write>(&mut self, v: Value, writer: &mut W) -> AvroResult<usize> {
585 self.write_value_ref(&v, writer)
586 }
587}
588
589pub struct SpecificSingleObjectWriter<T>
591where
592 T: AvroSchema,
593{
594 inner: GenericSingleObjectWriter,
595 schema: Schema,
596 header_written: bool,
597 _model: PhantomData<T>,
598}
599
600impl<T> SpecificSingleObjectWriter<T>
601where
602 T: AvroSchema,
603{
604 pub fn with_capacity(buffer_cap: usize) -> AvroResult<SpecificSingleObjectWriter<T>> {
605 let schema = T::get_schema();
606 Ok(SpecificSingleObjectWriter {
607 inner: GenericSingleObjectWriter::new_with_capacity(&schema, buffer_cap)?,
608 schema,
609 header_written: false,
610 _model: PhantomData,
611 })
612 }
613}
614
615impl<T> SpecificSingleObjectWriter<T>
616where
617 T: AvroSchema + Into<Value>,
618{
619 pub fn write_value<W: Write>(&mut self, data: T, writer: &mut W) -> AvroResult<usize> {
622 let v: Value = data.into();
623 self.inner.write_value_ref(&v, writer)
624 }
625}
626
627impl<T> SpecificSingleObjectWriter<T>
628where
629 T: AvroSchema + Serialize,
630{
631 pub fn write_ref<W: Write>(&mut self, data: &T, writer: &mut W) -> AvroResult<usize> {
634 let mut bytes_written: usize = 0;
635
636 if !self.header_written {
637 bytes_written += writer
638 .write(self.inner.buffer.as_slice())
639 .map_err(Details::WriteBytes)?;
640 self.header_written = true;
641 }
642
643 bytes_written += write_avro_datum_ref(&self.schema, data, writer)?;
644
645 Ok(bytes_written)
646 }
647
648 pub fn write<W: Write>(&mut self, data: T, writer: &mut W) -> AvroResult<usize> {
651 self.write_ref(&data, writer)
652 }
653}
654
655fn write_value_ref_resolved(
656 schema: &Schema,
657 resolved_schema: &ResolvedSchema,
658 value: &Value,
659 buffer: &mut Vec<u8>,
660) -> AvroResult<usize> {
661 match value.validate_internal(schema, resolved_schema.get_names(), &schema.namespace()) {
662 Some(reason) => Err(Details::ValidationWithReason {
663 value: value.clone(),
664 schema: schema.clone(),
665 reason,
666 }
667 .into()),
668 None => encode_internal(
669 value,
670 schema,
671 resolved_schema.get_names(),
672 &schema.namespace(),
673 buffer,
674 ),
675 }
676}
677
678fn write_value_ref_owned_resolved(
679 resolved_schema: &ResolvedOwnedSchema,
680 value: &Value,
681 buffer: &mut Vec<u8>,
682) -> AvroResult<()> {
683 let root_schema = resolved_schema.get_root_schema();
684 if let Some(reason) = value.validate_internal(
685 root_schema,
686 resolved_schema.get_names(),
687 &root_schema.namespace(),
688 ) {
689 return Err(Details::ValidationWithReason {
690 value: value.clone(),
691 schema: root_schema.clone(),
692 reason,
693 }
694 .into());
695 }
696 encode_internal(
697 value,
698 root_schema,
699 resolved_schema.get_names(),
700 &root_schema.namespace(),
701 buffer,
702 )?;
703 Ok(())
704}
705
706pub fn to_avro_datum<T: Into<Value>>(schema: &Schema, value: T) -> AvroResult<Vec<u8>> {
713 let mut buffer = Vec::new();
714 write_avro_datum(schema, value, &mut buffer)?;
715 Ok(buffer)
716}
717
718pub fn write_avro_datum_ref<T: Serialize, W: Write>(
725 schema: &Schema,
726 data: &T,
727 writer: &mut W,
728) -> AvroResult<usize> {
729 let names: HashMap<Name, &Schema> = HashMap::new();
730 let mut serializer = SchemaAwareWriteSerializer::new(writer, schema, &names, None);
731 let bytes_written = data.serialize(&mut serializer)?;
732 Ok(bytes_written)
733}
734
735pub fn to_avro_datum_schemata<T: Into<Value>>(
740 schema: &Schema,
741 schemata: Vec<&Schema>,
742 value: T,
743) -> AvroResult<Vec<u8>> {
744 let mut buffer = Vec::new();
745 write_avro_datum_schemata(schema, schemata, value, &mut buffer)?;
746 Ok(buffer)
747}
748
749#[cfg(not(target_arch = "wasm32"))]
750fn generate_sync_marker() -> [u8; 16] {
751 let mut marker = [0_u8; 16];
752 std::iter::repeat_with(rand::random)
753 .take(16)
754 .enumerate()
755 .for_each(|(i, n)| marker[i] = n);
756 marker
757}
758
759#[cfg(target_arch = "wasm32")]
760fn generate_sync_marker() -> [u8; 16] {
761 let mut marker = [0_u8; 16];
762 std::iter::repeat_with(quad_rand::rand)
763 .take(4)
764 .flat_map(|i| i.to_be_bytes())
765 .enumerate()
766 .for_each(|(i, n)| marker[i] = n);
767 marker
768}
769
770#[cfg(test)]
771mod tests {
772 use std::{cell::RefCell, rc::Rc};
773
774 use super::*;
775 use crate::{
776 Reader,
777 decimal::Decimal,
778 duration::{Days, Duration, Millis, Months},
779 headers::GlueSchemaUuidHeader,
780 rabin::Rabin,
781 schema::{DecimalSchema, FixedSchema, Name},
782 types::Record,
783 util::zig_i64,
784 };
785 use pretty_assertions::assert_eq;
786 use serde::{Deserialize, Serialize};
787 use uuid::Uuid;
788
789 use crate::{codec::DeflateSettings, error::Details};
790 use apache_avro_test_helper::TestResult;
791
792 const AVRO_OBJECT_HEADER_LEN: usize = AVRO_OBJECT_HEADER.len();
793
794 const SCHEMA: &str = r#"
795 {
796 "type": "record",
797 "name": "test",
798 "fields": [
799 {
800 "name": "a",
801 "type": "long",
802 "default": 42
803 },
804 {
805 "name": "b",
806 "type": "string"
807 }
808 ]
809 }
810 "#;
811
812 const UNION_SCHEMA: &str = r#"["null", "long"]"#;
813
814 #[test]
815 fn test_to_avro_datum() -> TestResult {
816 let schema = Schema::parse_str(SCHEMA)?;
817 let mut record = Record::new(&schema).unwrap();
818 record.put("a", 27i64);
819 record.put("b", "foo");
820
821 let mut expected = Vec::new();
822 zig_i64(27, &mut expected)?;
823 zig_i64(3, &mut expected)?;
824 expected.extend([b'f', b'o', b'o']);
825
826 assert_eq!(to_avro_datum(&schema, record)?, expected);
827
828 Ok(())
829 }
830
831 #[test]
832 fn avro_rs_193_write_avro_datum_ref() -> TestResult {
833 #[derive(Serialize)]
834 struct TestStruct {
835 a: i64,
836 b: String,
837 }
838
839 let schema = Schema::parse_str(SCHEMA)?;
840 let mut writer: Vec<u8> = Vec::new();
841 let data = TestStruct {
842 a: 27,
843 b: "foo".to_string(),
844 };
845
846 let mut expected = Vec::new();
847 zig_i64(27, &mut expected)?;
848 zig_i64(3, &mut expected)?;
849 expected.extend([b'f', b'o', b'o']);
850
851 let bytes = write_avro_datum_ref(&schema, &data, &mut writer)?;
852
853 assert_eq!(bytes, expected.len());
854 assert_eq!(writer, expected);
855
856 Ok(())
857 }
858
859 #[test]
860 fn avro_rs_220_flush_write_header() -> TestResult {
861 let schema = Schema::parse_str(SCHEMA)?;
862
863 let mut writer = Writer::new(&schema, Vec::new());
865 writer.flush()?;
866 let result = writer.into_inner()?;
867 assert_eq!(result.len(), 163);
868
869 let mut writer = Writer::builder()
871 .has_header(true)
872 .schema(&schema)
873 .writer(Vec::new())
874 .build();
875 writer.flush()?;
876 let result = writer.into_inner()?;
877 assert_eq!(result.len(), 0);
878
879 Ok(())
880 }
881
882 #[test]
883 fn test_union_not_null() -> TestResult {
884 let schema = Schema::parse_str(UNION_SCHEMA)?;
885 let union = Value::Union(1, Box::new(Value::Long(3)));
886
887 let mut expected = Vec::new();
888 zig_i64(1, &mut expected)?;
889 zig_i64(3, &mut expected)?;
890
891 assert_eq!(to_avro_datum(&schema, union)?, expected);
892
893 Ok(())
894 }
895
896 #[test]
897 fn test_union_null() -> TestResult {
898 let schema = Schema::parse_str(UNION_SCHEMA)?;
899 let union = Value::Union(0, Box::new(Value::Null));
900
901 let mut expected = Vec::new();
902 zig_i64(0, &mut expected)?;
903
904 assert_eq!(to_avro_datum(&schema, union)?, expected);
905
906 Ok(())
907 }
908
909 fn logical_type_test<T: Into<Value> + Clone>(
910 schema_str: &'static str,
911
912 expected_schema: &Schema,
913 value: Value,
914
915 raw_schema: &Schema,
916 raw_value: T,
917 ) -> TestResult {
918 let schema = Schema::parse_str(schema_str)?;
919 assert_eq!(&schema, expected_schema);
920 let ser = to_avro_datum(&schema, value.clone())?;
922 let raw_ser = to_avro_datum(raw_schema, raw_value)?;
923 assert_eq!(ser, raw_ser);
924
925 let mut r = ser.as_slice();
927 let de = crate::from_avro_datum(&schema, &mut r, None)?;
928 assert_eq!(de, value);
929 Ok(())
930 }
931
932 #[test]
933 fn date() -> TestResult {
934 logical_type_test(
935 r#"{"type": "int", "logicalType": "date"}"#,
936 &Schema::Date,
937 Value::Date(1_i32),
938 &Schema::Int,
939 1_i32,
940 )
941 }
942
943 #[test]
944 fn time_millis() -> TestResult {
945 logical_type_test(
946 r#"{"type": "int", "logicalType": "time-millis"}"#,
947 &Schema::TimeMillis,
948 Value::TimeMillis(1_i32),
949 &Schema::Int,
950 1_i32,
951 )
952 }
953
954 #[test]
955 fn time_micros() -> TestResult {
956 logical_type_test(
957 r#"{"type": "long", "logicalType": "time-micros"}"#,
958 &Schema::TimeMicros,
959 Value::TimeMicros(1_i64),
960 &Schema::Long,
961 1_i64,
962 )
963 }
964
965 #[test]
966 fn timestamp_millis() -> TestResult {
967 logical_type_test(
968 r#"{"type": "long", "logicalType": "timestamp-millis"}"#,
969 &Schema::TimestampMillis,
970 Value::TimestampMillis(1_i64),
971 &Schema::Long,
972 1_i64,
973 )
974 }
975
976 #[test]
977 fn timestamp_micros() -> TestResult {
978 logical_type_test(
979 r#"{"type": "long", "logicalType": "timestamp-micros"}"#,
980 &Schema::TimestampMicros,
981 Value::TimestampMicros(1_i64),
982 &Schema::Long,
983 1_i64,
984 )
985 }
986
987 #[test]
988 fn decimal_fixed() -> TestResult {
989 let size = 30;
990 let inner = Schema::Fixed(FixedSchema {
991 name: Name::new("decimal")?,
992 aliases: None,
993 doc: None,
994 size,
995 default: None,
996 attributes: Default::default(),
997 });
998 let value = vec![0u8; size];
999 logical_type_test(
1000 r#"{"type": {"type": "fixed", "size": 30, "name": "decimal"}, "logicalType": "decimal", "precision": 20, "scale": 5}"#,
1001 &Schema::Decimal(DecimalSchema {
1002 precision: 20,
1003 scale: 5,
1004 inner: Box::new(inner.clone()),
1005 }),
1006 Value::Decimal(Decimal::from(value.clone())),
1007 &inner,
1008 Value::Fixed(size, value),
1009 )
1010 }
1011
1012 #[test]
1013 fn decimal_bytes() -> TestResult {
1014 let inner = Schema::Bytes;
1015 let value = vec![0u8; 10];
1016 logical_type_test(
1017 r#"{"type": "bytes", "logicalType": "decimal", "precision": 4, "scale": 3}"#,
1018 &Schema::Decimal(DecimalSchema {
1019 precision: 4,
1020 scale: 3,
1021 inner: Box::new(inner.clone()),
1022 }),
1023 Value::Decimal(Decimal::from(value.clone())),
1024 &inner,
1025 value,
1026 )
1027 }
1028
1029 #[test]
1030 fn duration() -> TestResult {
1031 let inner = Schema::Fixed(FixedSchema {
1032 name: Name::new("duration")?,
1033 aliases: None,
1034 doc: None,
1035 size: 12,
1036 default: None,
1037 attributes: Default::default(),
1038 });
1039 let value = Value::Duration(Duration::new(
1040 Months::new(256),
1041 Days::new(512),
1042 Millis::new(1024),
1043 ));
1044 logical_type_test(
1045 r#"{"type": {"type": "fixed", "name": "duration", "size": 12}, "logicalType": "duration"}"#,
1046 &Schema::Duration,
1047 value,
1048 &inner,
1049 Value::Fixed(12, vec![0, 1, 0, 0, 0, 2, 0, 0, 0, 4, 0, 0]),
1050 )
1051 }
1052
1053 #[test]
1054 fn test_writer_append() -> TestResult {
1055 let schema = Schema::parse_str(SCHEMA)?;
1056 let mut writer = Writer::new(&schema, Vec::new());
1057
1058 let mut record = Record::new(&schema).unwrap();
1059 record.put("a", 27i64);
1060 record.put("b", "foo");
1061
1062 let n1 = writer.append(record.clone())?;
1063 let n2 = writer.append(record.clone())?;
1064 let n3 = writer.flush()?;
1065 let result = writer.into_inner()?;
1066
1067 assert_eq!(n1 + n2 + n3, result.len());
1068
1069 let mut data = Vec::new();
1070 zig_i64(27, &mut data)?;
1071 zig_i64(3, &mut data)?;
1072 data.extend(b"foo");
1073 data.extend(data.clone());
1074
1075 assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1077 let last_data_byte = result.len() - 16;
1079 assert_eq!(
1080 &result[last_data_byte - data.len()..last_data_byte],
1081 data.as_slice()
1082 );
1083
1084 Ok(())
1085 }
1086
1087 #[test]
1088 fn test_writer_extend() -> TestResult {
1089 let schema = Schema::parse_str(SCHEMA)?;
1090 let mut writer = Writer::new(&schema, Vec::new());
1091
1092 let mut record = Record::new(&schema).unwrap();
1093 record.put("a", 27i64);
1094 record.put("b", "foo");
1095 let record_copy = record.clone();
1096 let records = vec![record, record_copy];
1097
1098 let n1 = writer.extend(records)?;
1099 let n2 = writer.flush()?;
1100 let result = writer.into_inner()?;
1101
1102 assert_eq!(n1 + n2, result.len());
1103
1104 let mut data = Vec::new();
1105 zig_i64(27, &mut data)?;
1106 zig_i64(3, &mut data)?;
1107 data.extend(b"foo");
1108 data.extend(data.clone());
1109
1110 assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1112 let last_data_byte = result.len() - 16;
1114 assert_eq!(
1115 &result[last_data_byte - data.len()..last_data_byte],
1116 data.as_slice()
1117 );
1118
1119 Ok(())
1120 }
1121
1122 #[derive(Debug, Clone, Deserialize, Serialize)]
1123 struct TestSerdeSerialize {
1124 a: i64,
1125 b: String,
1126 }
1127
1128 #[test]
1129 fn test_writer_append_ser() -> TestResult {
1130 let schema = Schema::parse_str(SCHEMA)?;
1131 let mut writer = Writer::new(&schema, Vec::new());
1132
1133 let record = TestSerdeSerialize {
1134 a: 27,
1135 b: "foo".to_owned(),
1136 };
1137
1138 let n1 = writer.append_ser(record)?;
1139 let n2 = writer.flush()?;
1140 let result = writer.into_inner()?;
1141
1142 assert_eq!(n1 + n2, result.len());
1143
1144 let mut data = Vec::new();
1145 zig_i64(27, &mut data)?;
1146 zig_i64(3, &mut data)?;
1147 data.extend(b"foo");
1148
1149 assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1151 let last_data_byte = result.len() - 16;
1153 assert_eq!(
1154 &result[last_data_byte - data.len()..last_data_byte],
1155 data.as_slice()
1156 );
1157
1158 Ok(())
1159 }
1160
1161 #[test]
1162 fn test_writer_extend_ser() -> TestResult {
1163 let schema = Schema::parse_str(SCHEMA)?;
1164 let mut writer = Writer::new(&schema, Vec::new());
1165
1166 let record = TestSerdeSerialize {
1167 a: 27,
1168 b: "foo".to_owned(),
1169 };
1170 let record_copy = record.clone();
1171 let records = vec![record, record_copy];
1172
1173 let n1 = writer.extend_ser(records)?;
1174 let n2 = writer.flush()?;
1175 let result = writer.into_inner()?;
1176
1177 assert_eq!(n1 + n2, result.len());
1178
1179 let mut data = Vec::new();
1180 zig_i64(27, &mut data)?;
1181 zig_i64(3, &mut data)?;
1182 data.extend(b"foo");
1183 data.extend(data.clone());
1184
1185 assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1187 let last_data_byte = result.len() - 16;
1189 assert_eq!(
1190 &result[last_data_byte - data.len()..last_data_byte],
1191 data.as_slice()
1192 );
1193
1194 Ok(())
1195 }
1196
1197 fn make_writer_with_codec(schema: &Schema) -> Writer<'_, Vec<u8>> {
1198 Writer::with_codec(
1199 schema,
1200 Vec::new(),
1201 Codec::Deflate(DeflateSettings::default()),
1202 )
1203 }
1204
1205 fn make_writer_with_builder(schema: &Schema) -> Writer<'_, Vec<u8>> {
1206 Writer::builder()
1207 .writer(Vec::new())
1208 .schema(schema)
1209 .codec(Codec::Deflate(DeflateSettings::default()))
1210 .block_size(100)
1211 .build()
1212 }
1213
1214 fn check_writer(mut writer: Writer<'_, Vec<u8>>, schema: &Schema) -> TestResult {
1215 let mut record = Record::new(schema).unwrap();
1216 record.put("a", 27i64);
1217 record.put("b", "foo");
1218
1219 let n1 = writer.append(record.clone())?;
1220 let n2 = writer.append(record.clone())?;
1221 let n3 = writer.flush()?;
1222 let result = writer.into_inner()?;
1223
1224 assert_eq!(n1 + n2 + n3, result.len());
1225
1226 let mut data = Vec::new();
1227 zig_i64(27, &mut data)?;
1228 zig_i64(3, &mut data)?;
1229 data.extend(b"foo");
1230 data.extend(data.clone());
1231 Codec::Deflate(DeflateSettings::default()).compress(&mut data)?;
1232
1233 assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1235 let last_data_byte = result.len() - 16;
1237 assert_eq!(
1238 &result[last_data_byte - data.len()..last_data_byte],
1239 data.as_slice()
1240 );
1241
1242 Ok(())
1243 }
1244
1245 #[test]
1246 fn test_writer_with_codec() -> TestResult {
1247 let schema = Schema::parse_str(SCHEMA)?;
1248 let writer = make_writer_with_codec(&schema);
1249 check_writer(writer, &schema)
1250 }
1251
1252 #[test]
1253 fn test_writer_with_builder() -> TestResult {
1254 let schema = Schema::parse_str(SCHEMA)?;
1255 let writer = make_writer_with_builder(&schema);
1256 check_writer(writer, &schema)
1257 }
1258
1259 #[test]
1260 fn test_logical_writer() -> TestResult {
1261 const LOGICAL_TYPE_SCHEMA: &str = r#"
1262 {
1263 "type": "record",
1264 "name": "logical_type_test",
1265 "fields": [
1266 {
1267 "name": "a",
1268 "type": [
1269 "null",
1270 {
1271 "type": "long",
1272 "logicalType": "timestamp-micros"
1273 }
1274 ]
1275 }
1276 ]
1277 }
1278 "#;
1279 let codec = Codec::Deflate(DeflateSettings::default());
1280 let schema = Schema::parse_str(LOGICAL_TYPE_SCHEMA)?;
1281 let mut writer = Writer::builder()
1282 .schema(&schema)
1283 .codec(codec)
1284 .writer(Vec::new())
1285 .build();
1286
1287 let mut record1 = Record::new(&schema).unwrap();
1288 record1.put(
1289 "a",
1290 Value::Union(1, Box::new(Value::TimestampMicros(1234_i64))),
1291 );
1292
1293 let mut record2 = Record::new(&schema).unwrap();
1294 record2.put("a", Value::Union(0, Box::new(Value::Null)));
1295
1296 let n1 = writer.append(record1)?;
1297 let n2 = writer.append(record2)?;
1298 let n3 = writer.flush()?;
1299 let result = writer.into_inner()?;
1300
1301 assert_eq!(n1 + n2 + n3, result.len());
1302
1303 let mut data = Vec::new();
1304 zig_i64(1, &mut data)?;
1306 zig_i64(1234, &mut data)?;
1307
1308 zig_i64(0, &mut data)?;
1310 codec.compress(&mut data)?;
1311
1312 assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1314 let last_data_byte = result.len() - 16;
1316 assert_eq!(
1317 &result[last_data_byte - data.len()..last_data_byte],
1318 data.as_slice()
1319 );
1320
1321 Ok(())
1322 }
1323
1324 #[test]
1325 fn test_avro_3405_writer_add_metadata_success() -> TestResult {
1326 let schema = Schema::parse_str(SCHEMA)?;
1327 let mut writer = Writer::new(&schema, Vec::new());
1328
1329 writer.add_user_metadata("stringKey".to_string(), String::from("stringValue"))?;
1330 writer.add_user_metadata("strKey".to_string(), "strValue")?;
1331 writer.add_user_metadata("bytesKey".to_string(), b"bytesValue")?;
1332 writer.add_user_metadata("vecKey".to_string(), vec![1, 2, 3])?;
1333
1334 let mut record = Record::new(&schema).unwrap();
1335 record.put("a", 27i64);
1336 record.put("b", "foo");
1337
1338 writer.append(record.clone())?;
1339 writer.append(record.clone())?;
1340 writer.flush()?;
1341 let result = writer.into_inner()?;
1342
1343 assert_eq!(result.len(), 260);
1344
1345 Ok(())
1346 }
1347
1348 #[test]
1349 fn test_avro_3881_metadata_empty_body() -> TestResult {
1350 let schema = Schema::parse_str(SCHEMA)?;
1351 let mut writer = Writer::new(&schema, Vec::new());
1352 writer.add_user_metadata("a".to_string(), "b")?;
1353 let result = writer.into_inner()?;
1354
1355 let reader = Reader::with_schema(&schema, &result[..])?;
1356 let mut expected = HashMap::new();
1357 expected.insert("a".to_string(), vec![b'b']);
1358 assert_eq!(reader.user_metadata(), &expected);
1359 assert_eq!(reader.into_iter().count(), 0);
1360
1361 Ok(())
1362 }
1363
1364 #[test]
1365 fn test_avro_3405_writer_add_metadata_failure() -> TestResult {
1366 let schema = Schema::parse_str(SCHEMA)?;
1367 let mut writer = Writer::new(&schema, Vec::new());
1368
1369 let mut record = Record::new(&schema).unwrap();
1370 record.put("a", 27i64);
1371 record.put("b", "foo");
1372 writer.append(record.clone())?;
1373
1374 match writer
1375 .add_user_metadata("stringKey".to_string(), String::from("value2"))
1376 .map_err(Error::into_details)
1377 {
1378 Err(e @ Details::FileHeaderAlreadyWritten) => {
1379 assert_eq!(e.to_string(), "The file metadata is already flushed.")
1380 }
1381 Err(e) => panic!("Unexpected error occurred while writing user metadata: {e:?}"),
1382 Ok(_) => panic!("Expected an error that metadata cannot be added after adding data"),
1383 }
1384
1385 Ok(())
1386 }
1387
1388 #[test]
1389 fn test_avro_3405_writer_add_metadata_reserved_prefix_failure() -> TestResult {
1390 let schema = Schema::parse_str(SCHEMA)?;
1391 let mut writer = Writer::new(&schema, Vec::new());
1392
1393 let key = "avro.stringKey".to_string();
1394 match writer
1395 .add_user_metadata(key.clone(), "value")
1396 .map_err(Error::into_details)
1397 {
1398 Err(ref e @ Details::InvalidMetadataKey(_)) => {
1399 assert_eq!(
1400 e.to_string(),
1401 format!(
1402 "Metadata keys starting with 'avro.' are reserved for internal usage: {key}."
1403 )
1404 )
1405 }
1406 Err(e) => panic!(
1407 "Unexpected error occurred while writing user metadata with reserved prefix ('avro.'): {e:?}"
1408 ),
1409 Ok(_) => {
1410 panic!("Expected an error that the metadata key cannot be prefixed with 'avro.'")
1411 }
1412 }
1413
1414 Ok(())
1415 }
1416
1417 #[test]
1418 fn test_avro_3405_writer_add_metadata_with_builder_api_success() -> TestResult {
1419 let schema = Schema::parse_str(SCHEMA)?;
1420
1421 let mut user_meta_data: HashMap<String, Value> = HashMap::new();
1422 user_meta_data.insert(
1423 "stringKey".to_string(),
1424 Value::String("stringValue".to_string()),
1425 );
1426 user_meta_data.insert("bytesKey".to_string(), Value::Bytes(b"bytesValue".to_vec()));
1427 user_meta_data.insert("vecKey".to_string(), Value::Bytes(vec![1, 2, 3]));
1428
1429 let writer: Writer<'_, Vec<u8>> = Writer::builder()
1430 .writer(Vec::new())
1431 .schema(&schema)
1432 .user_metadata(user_meta_data.clone())
1433 .build();
1434
1435 assert_eq!(writer.user_metadata, user_meta_data);
1436
1437 Ok(())
1438 }
1439
1440 #[derive(Serialize, Clone)]
1441 struct TestSingleObjectWriter {
1442 a: i64,
1443 b: f64,
1444 c: Vec<String>,
1445 }
1446
1447 impl AvroSchema for TestSingleObjectWriter {
1448 fn get_schema() -> Schema {
1449 let schema = r#"
1450 {
1451 "type":"record",
1452 "name":"TestSingleObjectWrtierSerialize",
1453 "fields":[
1454 {
1455 "name":"a",
1456 "type":"long"
1457 },
1458 {
1459 "name":"b",
1460 "type":"double"
1461 },
1462 {
1463 "name":"c",
1464 "type":{
1465 "type":"array",
1466 "items":"string"
1467 }
1468 }
1469 ]
1470 }
1471 "#;
1472 Schema::parse_str(schema).unwrap()
1473 }
1474 }
1475
1476 impl From<TestSingleObjectWriter> for Value {
1477 fn from(obj: TestSingleObjectWriter) -> Value {
1478 Value::Record(vec![
1479 ("a".into(), obj.a.into()),
1480 ("b".into(), obj.b.into()),
1481 (
1482 "c".into(),
1483 Value::Array(obj.c.into_iter().map(|s| s.into()).collect()),
1484 ),
1485 ])
1486 }
1487 }
1488
1489 #[test]
1490 fn test_single_object_writer() -> TestResult {
1491 let mut buf: Vec<u8> = Vec::new();
1492 let obj = TestSingleObjectWriter {
1493 a: 300,
1494 b: 34.555,
1495 c: vec!["cat".into(), "dog".into()],
1496 };
1497 let mut writer = GenericSingleObjectWriter::new_with_capacity(
1498 &TestSingleObjectWriter::get_schema(),
1499 1024,
1500 )
1501 .expect("Should resolve schema");
1502 let value = obj.into();
1503 let written_bytes = writer
1504 .write_value_ref(&value, &mut buf)
1505 .expect("Error serializing properly");
1506
1507 assert!(buf.len() > 10, "no bytes written");
1508 assert_eq!(buf.len(), written_bytes);
1509 assert_eq!(buf[0], 0xC3);
1510 assert_eq!(buf[1], 0x01);
1511 assert_eq!(
1512 &buf[2..10],
1513 &TestSingleObjectWriter::get_schema()
1514 .fingerprint::<Rabin>()
1515 .bytes[..]
1516 );
1517 let mut msg_binary = Vec::new();
1518 encode(
1519 &value,
1520 &TestSingleObjectWriter::get_schema(),
1521 &mut msg_binary,
1522 )
1523 .expect("encode should have failed by here as a dependency of any writing");
1524 assert_eq!(&buf[10..], &msg_binary[..]);
1525
1526 Ok(())
1527 }
1528
1529 #[test]
1530 fn test_single_object_writer_with_header_builder() -> TestResult {
1531 let mut buf: Vec<u8> = Vec::new();
1532 let obj = TestSingleObjectWriter {
1533 a: 300,
1534 b: 34.555,
1535 c: vec!["cat".into(), "dog".into()],
1536 };
1537 let schema_uuid = Uuid::parse_str("b2f1cf00-0434-013e-439a-125eb8485a5f")?;
1538 let header_builder = GlueSchemaUuidHeader::from_uuid(schema_uuid);
1539 let mut writer = GenericSingleObjectWriter::new_with_capacity_and_header_builder(
1540 &TestSingleObjectWriter::get_schema(),
1541 1024,
1542 header_builder,
1543 )
1544 .expect("Should resolve schema");
1545 let value = obj.into();
1546 writer
1547 .write_value_ref(&value, &mut buf)
1548 .expect("Error serializing properly");
1549
1550 assert_eq!(buf[0], 0x03);
1551 assert_eq!(buf[1], 0x00);
1552 assert_eq!(buf[2..18], schema_uuid.into_bytes()[..]);
1553 Ok(())
1554 }
1555
1556 #[test]
1557 fn test_writer_parity() -> TestResult {
1558 let obj1 = TestSingleObjectWriter {
1559 a: 300,
1560 b: 34.555,
1561 c: vec!["cat".into(), "dog".into()],
1562 };
1563
1564 let mut buf1: Vec<u8> = Vec::new();
1565 let mut buf2: Vec<u8> = Vec::new();
1566 let mut buf3: Vec<u8> = Vec::new();
1567
1568 let mut generic_writer = GenericSingleObjectWriter::new_with_capacity(
1569 &TestSingleObjectWriter::get_schema(),
1570 1024,
1571 )
1572 .expect("Should resolve schema");
1573 let mut specific_writer =
1574 SpecificSingleObjectWriter::<TestSingleObjectWriter>::with_capacity(1024)
1575 .expect("Resolved should pass");
1576 specific_writer
1577 .write(obj1.clone(), &mut buf1)
1578 .expect("Serialization expected");
1579 specific_writer
1580 .write_value(obj1.clone(), &mut buf2)
1581 .expect("Serialization expected");
1582 generic_writer
1583 .write_value(obj1.into(), &mut buf3)
1584 .expect("Serialization expected");
1585 assert_eq!(buf1, buf2);
1586 assert_eq!(buf1, buf3);
1587
1588 Ok(())
1589 }
1590
1591 #[test]
1592 fn avro_3894_take_aliases_into_account_when_serializing() -> TestResult {
1593 const SCHEMA: &str = r#"
1594 {
1595 "type": "record",
1596 "name": "Conference",
1597 "fields": [
1598 {"type": "string", "name": "name"},
1599 {"type": ["null", "long"], "name": "date", "aliases" : [ "time2", "time" ]}
1600 ]
1601 }"#;
1602
1603 #[derive(Debug, PartialEq, Eq, Clone, Serialize)]
1604 pub struct Conference {
1605 pub name: String,
1606 pub time: Option<i64>,
1607 }
1608
1609 let conf = Conference {
1610 name: "RustConf".to_string(),
1611 time: Some(1234567890),
1612 };
1613
1614 let schema = Schema::parse_str(SCHEMA)?;
1615 let mut writer = Writer::new(&schema, Vec::new());
1616
1617 let bytes = writer.append_ser(conf)?;
1618
1619 assert_eq!(198, bytes);
1620
1621 Ok(())
1622 }
1623
1624 #[test]
1625 fn avro_4014_validation_returns_a_detailed_error() -> TestResult {
1626 const SCHEMA: &str = r#"
1627 {
1628 "type": "record",
1629 "name": "Conference",
1630 "fields": [
1631 {"type": "string", "name": "name"},
1632 {"type": ["null", "long"], "name": "date", "aliases" : [ "time2", "time" ]}
1633 ]
1634 }"#;
1635
1636 #[derive(Debug, PartialEq, Clone, Serialize)]
1637 pub struct Conference {
1638 pub name: String,
1639 pub time: Option<f64>, }
1641
1642 let conf = Conference {
1643 name: "RustConf".to_string(),
1644 time: Some(12345678.90),
1645 };
1646
1647 let schema = Schema::parse_str(SCHEMA)?;
1648 let mut writer = Writer::new(&schema, Vec::new());
1649
1650 match writer.append_ser(conf) {
1651 Ok(bytes) => panic!("Expected an error, but got {bytes} bytes written"),
1652 Err(e) => {
1653 assert_eq!(
1654 e.to_string(),
1655 r#"Failed to serialize field 'time' for record Record(RecordSchema { name: Name { name: "Conference", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "name", doc: None, aliases: None, default: None, schema: String, order: Ascending, position: 0, custom_attributes: {} }, RecordField { name: "date", doc: None, aliases: Some(["time2", "time"]), default: None, schema: Union(UnionSchema { schemas: [Null, Long], variant_index: {Null: 0, Long: 1} }), order: Ascending, position: 1, custom_attributes: {} }], lookup: {"date": 1, "name": 0, "time": 1, "time2": 1}, attributes: {} }): Failed to serialize value of type f64 using schema Long: 12345678.9. Cause: Expected: Long. Got: Double"#
1656 );
1657 }
1658 }
1659 Ok(())
1660 }
1661
1662 #[test]
1663 fn avro_4063_flush_applies_to_inner_writer() -> TestResult {
1664 const SCHEMA: &str = r#"
1665 {
1666 "type": "record",
1667 "name": "ExampleSchema",
1668 "fields": [
1669 {"name": "exampleField", "type": "string"}
1670 ]
1671 }
1672 "#;
1673
1674 #[derive(Clone, Default)]
1675 struct TestBuffer(Rc<RefCell<Vec<u8>>>);
1676
1677 impl TestBuffer {
1678 fn len(&self) -> usize {
1679 self.0.borrow().len()
1680 }
1681 }
1682
1683 impl Write for TestBuffer {
1684 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
1685 self.0.borrow_mut().write(buf)
1686 }
1687
1688 fn flush(&mut self) -> std::io::Result<()> {
1689 Ok(())
1690 }
1691 }
1692
1693 let shared_buffer = TestBuffer::default();
1694
1695 let buffered_writer = std::io::BufWriter::new(shared_buffer.clone());
1696
1697 let schema = Schema::parse_str(SCHEMA)?;
1698
1699 let mut writer = Writer::new(&schema, buffered_writer);
1700
1701 let mut record = Record::new(writer.schema()).unwrap();
1702 record.put("exampleField", "value");
1703
1704 writer.append(record)?;
1705 writer.flush()?;
1706
1707 assert_eq!(
1708 shared_buffer.len(),
1709 167,
1710 "the test buffer was not fully written to after Writer::flush was called"
1711 );
1712
1713 Ok(())
1714 }
1715}