1use crate::{
22 bigdecimal::big_decimal_as_bytes,
23 encode::{encode_int, encode_long},
24 error::{Details, Error},
25 schema::{Name, NamesRef, Namespace, RecordField, RecordSchema, Schema},
26};
27use bigdecimal::BigDecimal;
28use serde::{Serialize, ser};
29use std::{borrow::Cow, io::Write, str::FromStr};
30
31const COLLECTION_SERIALIZER_ITEM_LIMIT: usize = 1024;
32const COLLECTION_SERIALIZER_DEFAULT_INIT_ITEM_CAPACITY: usize = 32;
33const SINGLE_VALUE_INIT_BUFFER_SIZE: usize = 128;
34
35pub struct SchemaAwareWriteSerializeSeq<'a, 's, W: Write> {
42 ser: &'a mut SchemaAwareWriteSerializer<'s, W>,
43 item_schema: &'s Schema,
44 item_buffer_size: usize,
45 item_buffers: Vec<Vec<u8>>,
46 bytes_written: usize,
47}
48
49impl<'a, 's, W: Write> SchemaAwareWriteSerializeSeq<'a, 's, W> {
50 fn new(
51 ser: &'a mut SchemaAwareWriteSerializer<'s, W>,
52 item_schema: &'s Schema,
53 len: Option<usize>,
54 ) -> SchemaAwareWriteSerializeSeq<'a, 's, W> {
55 SchemaAwareWriteSerializeSeq {
56 ser,
57 item_schema,
58 item_buffer_size: SINGLE_VALUE_INIT_BUFFER_SIZE,
59 item_buffers: Vec::with_capacity(
60 len.unwrap_or(COLLECTION_SERIALIZER_DEFAULT_INIT_ITEM_CAPACITY),
61 ),
62 bytes_written: 0,
63 }
64 }
65
66 fn write_buffered_items(&mut self) -> Result<(), Error> {
67 if !self.item_buffers.is_empty() {
68 self.bytes_written +=
69 encode_long(self.item_buffers.len() as i64, &mut self.ser.writer)?;
70 for item in self.item_buffers.drain(..) {
71 self.bytes_written += self
72 .ser
73 .writer
74 .write(item.as_slice())
75 .map_err(Details::WriteBytes)?;
76 }
77 }
78
79 Ok(())
80 }
81
82 fn serialize_element<T: ser::Serialize>(&mut self, value: &T) -> Result<(), Error> {
83 let mut item_buffer: Vec<u8> = Vec::with_capacity(self.item_buffer_size);
84 let mut item_ser = SchemaAwareWriteSerializer::new(
85 &mut item_buffer,
86 self.item_schema,
87 self.ser.names,
88 self.ser.enclosing_namespace.clone(),
89 );
90 value.serialize(&mut item_ser)?;
91
92 self.item_buffer_size = std::cmp::max(self.item_buffer_size, item_buffer.len() + 16);
93
94 self.item_buffers.push(item_buffer);
95
96 if self.item_buffers.len() > COLLECTION_SERIALIZER_ITEM_LIMIT {
97 self.write_buffered_items()?;
98 }
99
100 Ok(())
101 }
102
103 fn end(mut self) -> Result<usize, Error> {
104 self.write_buffered_items()?;
105 self.bytes_written += self.ser.writer.write(&[0u8]).map_err(Details::WriteBytes)?;
106
107 Ok(self.bytes_written)
108 }
109}
110
111impl<W: Write> ser::SerializeSeq for SchemaAwareWriteSerializeSeq<'_, '_, W> {
112 type Ok = usize;
113 type Error = Error;
114
115 fn serialize_element<T>(&mut self, value: &T) -> Result<(), Self::Error>
116 where
117 T: ?Sized + ser::Serialize,
118 {
119 self.serialize_element(&value)
120 }
121
122 fn end(self) -> Result<Self::Ok, Self::Error> {
123 self.end()
124 }
125}
126
127impl<W: Write> ser::SerializeTuple for SchemaAwareWriteSerializeSeq<'_, '_, W> {
128 type Ok = usize;
129 type Error = Error;
130
131 fn serialize_element<T>(&mut self, value: &T) -> Result<(), Self::Error>
132 where
133 T: ?Sized + ser::Serialize,
134 {
135 ser::SerializeSeq::serialize_element(self, value)
136 }
137
138 fn end(self) -> Result<Self::Ok, Self::Error> {
139 ser::SerializeSeq::end(self)
140 }
141}
142
143pub struct SchemaAwareWriteSerializeMap<'a, 's, W: Write> {
150 ser: &'a mut SchemaAwareWriteSerializer<'s, W>,
151 item_schema: &'s Schema,
152 item_buffer_size: usize,
153 item_buffers: Vec<Vec<u8>>,
154 bytes_written: usize,
155}
156
157impl<'a, 's, W: Write> SchemaAwareWriteSerializeMap<'a, 's, W> {
158 fn new(
159 ser: &'a mut SchemaAwareWriteSerializer<'s, W>,
160 item_schema: &'s Schema,
161 len: Option<usize>,
162 ) -> SchemaAwareWriteSerializeMap<'a, 's, W> {
163 SchemaAwareWriteSerializeMap {
164 ser,
165 item_schema,
166 item_buffer_size: SINGLE_VALUE_INIT_BUFFER_SIZE,
167 item_buffers: Vec::with_capacity(
168 len.unwrap_or(COLLECTION_SERIALIZER_DEFAULT_INIT_ITEM_CAPACITY),
169 ),
170 bytes_written: 0,
171 }
172 }
173
174 fn write_buffered_items(&mut self) -> Result<(), Error> {
175 if !self.item_buffers.is_empty() {
176 self.bytes_written +=
177 encode_long(self.item_buffers.len() as i64, &mut self.ser.writer)?;
178 for item in self.item_buffers.drain(..) {
179 self.bytes_written += self
180 .ser
181 .writer
182 .write(item.as_slice())
183 .map_err(Details::WriteBytes)?;
184 }
185 }
186
187 Ok(())
188 }
189}
190
191impl<W: Write> ser::SerializeMap for SchemaAwareWriteSerializeMap<'_, '_, W> {
192 type Ok = usize;
193 type Error = Error;
194
195 fn serialize_key<T>(&mut self, key: &T) -> Result<(), Self::Error>
196 where
197 T: ?Sized + ser::Serialize,
198 {
199 let mut element_buffer: Vec<u8> = Vec::with_capacity(self.item_buffer_size);
200 let string_schema = Schema::String;
201 let mut key_ser = SchemaAwareWriteSerializer::new(
202 &mut element_buffer,
203 &string_schema,
204 self.ser.names,
205 self.ser.enclosing_namespace.clone(),
206 );
207 key.serialize(&mut key_ser)?;
208
209 self.item_buffers.push(element_buffer);
210
211 Ok(())
212 }
213
214 fn serialize_value<T>(&mut self, value: &T) -> Result<(), Self::Error>
215 where
216 T: ?Sized + ser::Serialize,
217 {
218 let last_index = self.item_buffers.len() - 1;
219 let element_buffer = &mut self.item_buffers[last_index];
220 let mut val_ser = SchemaAwareWriteSerializer::new(
221 element_buffer,
222 self.item_schema,
223 self.ser.names,
224 self.ser.enclosing_namespace.clone(),
225 );
226 value.serialize(&mut val_ser)?;
227
228 self.item_buffer_size = std::cmp::max(self.item_buffer_size, element_buffer.len() + 16);
229
230 if self.item_buffers.len() > COLLECTION_SERIALIZER_ITEM_LIMIT {
231 self.write_buffered_items()?;
232 }
233
234 Ok(())
235 }
236
237 fn end(mut self) -> Result<Self::Ok, Self::Error> {
238 self.write_buffered_items()?;
239 self.bytes_written += self.ser.writer.write(&[0u8]).map_err(Details::WriteBytes)?;
240
241 Ok(self.bytes_written)
242 }
243}
244
245pub struct SchemaAwareWriteSerializeStruct<'a, 's, W: Write> {
250 ser: &'a mut SchemaAwareWriteSerializer<'s, W>,
251 record_schema: &'s RecordSchema,
252 bytes_written: usize,
253}
254
255impl<'a, 's, W: Write> SchemaAwareWriteSerializeStruct<'a, 's, W> {
256 fn new(
257 ser: &'a mut SchemaAwareWriteSerializer<'s, W>,
258 record_schema: &'s RecordSchema,
259 ) -> SchemaAwareWriteSerializeStruct<'a, 's, W> {
260 SchemaAwareWriteSerializeStruct {
261 ser,
262 record_schema,
263 bytes_written: 0,
264 }
265 }
266
267 fn serialize_next_field<T>(&mut self, field: &RecordField, value: &T) -> Result<(), Error>
268 where
269 T: ?Sized + ser::Serialize,
270 {
271 let mut value_ser = SchemaAwareWriteSerializer::new(
273 &mut *self.ser.writer,
274 &field.schema,
275 self.ser.names,
276 self.ser.enclosing_namespace.clone(),
277 );
278 self.bytes_written += value.serialize(&mut value_ser)?;
279
280 Ok(())
281 }
282
283 fn end(self) -> Result<usize, Error> {
284 Ok(self.bytes_written)
285 }
286}
287
288impl<W: Write> ser::SerializeStruct for SchemaAwareWriteSerializeStruct<'_, '_, W> {
289 type Ok = usize;
290 type Error = Error;
291
292 fn serialize_field<T>(&mut self, key: &'static str, value: &T) -> Result<(), Self::Error>
293 where
294 T: ?Sized + ser::Serialize,
295 {
296 let record_field = self
297 .record_schema
298 .lookup
299 .get(key)
300 .and_then(|idx| self.record_schema.fields.get(*idx));
301
302 match record_field {
303 Some(field) => {
304 self.serialize_next_field(field, value).map_err(|e| {
306 Details::SerializeRecordFieldWithSchema {
307 field_name: key,
308 record_schema: Schema::Record(self.record_schema.clone()),
309 error: Box::new(e),
310 }
311 .into()
312 })
313 }
314 None => Err(Details::FieldName(String::from(key)).into()),
315 }
316 }
317
318 fn skip_field(&mut self, key: &'static str) -> Result<(), Self::Error> {
319 let skipped_field = self
320 .record_schema
321 .lookup
322 .get(key)
323 .and_then(|idx| self.record_schema.fields.get(*idx));
324
325 if let Some(skipped_field) = skipped_field {
326 skipped_field
328 .default
329 .serialize(&mut SchemaAwareWriteSerializer::new(
330 self.ser.writer,
331 &skipped_field.schema,
332 self.ser.names,
333 self.ser.enclosing_namespace.clone(),
334 ))?;
335 } else {
336 return Err(Details::GetField(key.to_string()).into());
337 }
338
339 Ok(())
340 }
341
342 fn end(self) -> Result<Self::Ok, Self::Error> {
343 self.end()
344 }
345}
346
347impl<W: Write> ser::SerializeStructVariant for SchemaAwareWriteSerializeStruct<'_, '_, W> {
348 type Ok = usize;
349 type Error = Error;
350
351 fn serialize_field<T>(&mut self, key: &'static str, value: &T) -> Result<(), Self::Error>
352 where
353 T: ?Sized + ser::Serialize,
354 {
355 ser::SerializeStruct::serialize_field(self, key, value)
356 }
357
358 fn end(self) -> Result<Self::Ok, Self::Error> {
359 ser::SerializeStruct::end(self)
360 }
361}
362
363pub enum SchemaAwareWriteSerializeTupleStruct<'a, 's, W: Write> {
367 Record(SchemaAwareWriteSerializeStruct<'a, 's, W>),
368 Array(SchemaAwareWriteSerializeSeq<'a, 's, W>),
369}
370
371impl<W: Write> SchemaAwareWriteSerializeTupleStruct<'_, '_, W> {
372 fn serialize_field<T>(&mut self, value: &T) -> Result<(), Error>
373 where
374 T: ?Sized + ser::Serialize,
375 {
376 use SchemaAwareWriteSerializeTupleStruct::*;
377 match self {
378 Record(_record_ser) => {
379 unimplemented!("Tuple struct serialization to record is not supported!");
380 }
381 Array(array_ser) => array_ser.serialize_element(&value),
382 }
383 }
384
385 fn end(self) -> Result<usize, Error> {
386 use SchemaAwareWriteSerializeTupleStruct::*;
387 match self {
388 Record(record_ser) => record_ser.end(),
389 Array(array_ser) => array_ser.end(),
390 }
391 }
392}
393
394impl<W: Write> ser::SerializeTupleStruct for SchemaAwareWriteSerializeTupleStruct<'_, '_, W> {
395 type Ok = usize;
396 type Error = Error;
397
398 fn serialize_field<T>(&mut self, value: &T) -> Result<(), Self::Error>
399 where
400 T: ?Sized + ser::Serialize,
401 {
402 self.serialize_field(&value)
403 }
404
405 fn end(self) -> Result<Self::Ok, Self::Error> {
406 self.end()
407 }
408}
409
410impl<W: Write> ser::SerializeTupleVariant for SchemaAwareWriteSerializeTupleStruct<'_, '_, W> {
411 type Ok = usize;
412 type Error = Error;
413
414 fn serialize_field<T>(&mut self, value: &T) -> Result<(), Self::Error>
415 where
416 T: ?Sized + ser::Serialize,
417 {
418 self.serialize_field(&value)
419 }
420
421 fn end(self) -> Result<Self::Ok, Self::Error> {
422 self.end()
423 }
424}
425
426pub struct SchemaAwareWriteSerializer<'s, W: Write> {
432 writer: &'s mut W,
433 root_schema: &'s Schema,
434 names: &'s NamesRef<'s>,
435 enclosing_namespace: Namespace,
436}
437
438impl<'s, W: Write> SchemaAwareWriteSerializer<'s, W> {
439 pub fn new(
449 writer: &'s mut W,
450 schema: &'s Schema,
451 names: &'s NamesRef<'s>,
452 enclosing_namespace: Namespace,
453 ) -> SchemaAwareWriteSerializer<'s, W> {
454 SchemaAwareWriteSerializer {
455 writer,
456 root_schema: schema,
457 names,
458 enclosing_namespace,
459 }
460 }
461
462 fn get_ref_schema(&self, name: &'s Name) -> Result<&'s Schema, Error> {
463 let full_name = match name.namespace {
464 Some(_) => Cow::Borrowed(name),
465 None => Cow::Owned(Name {
466 name: name.name.clone(),
467 namespace: self.enclosing_namespace.clone(),
468 }),
469 };
470
471 let ref_schema = self.names.get(full_name.as_ref()).copied();
472
473 ref_schema.ok_or_else(|| Details::SchemaResolutionError(full_name.as_ref().clone()).into())
474 }
475
476 fn write_bytes(&mut self, bytes: &[u8]) -> Result<usize, Error> {
477 let mut bytes_written: usize = 0;
478
479 bytes_written += encode_long(bytes.len() as i64, &mut self.writer)?;
480 bytes_written += self.writer.write(bytes).map_err(Details::WriteBytes)?;
481
482 Ok(bytes_written)
483 }
484
485 fn serialize_bool_with_schema(&mut self, value: bool, schema: &Schema) -> Result<usize, Error> {
486 let create_error = |cause: String| {
487 Error::new(Details::SerializeValueWithSchema {
488 value_type: "bool",
489 value: format!("{value}. Cause: {cause}"),
490 schema: schema.clone(),
491 })
492 };
493
494 match schema {
495 Schema::Boolean => self
496 .writer
497 .write(&[u8::from(value)])
498 .map_err(|e| Details::WriteBytes(e).into()),
499 Schema::Union(union_schema) => {
500 for (i, variant_schema) in union_schema.schemas.iter().enumerate() {
501 match variant_schema {
502 Schema::Boolean => {
503 encode_int(i as i32, &mut *self.writer)?;
504 return self.serialize_bool_with_schema(value, variant_schema);
505 }
506 _ => { }
507 }
508 }
509 Err(create_error(format!(
510 "No matching Schema::Bool found in {:?}",
511 union_schema.schemas
512 )))
513 }
514 expected => Err(create_error(format!("Expected {expected}. Got: Bool"))),
515 }
516 }
517
518 fn serialize_i32_with_schema(&mut self, value: i32, schema: &Schema) -> Result<usize, Error> {
519 let create_error = |cause: String| {
520 Error::new(Details::SerializeValueWithSchema {
521 value_type: "int (i8 | i16 | i32)",
522 value: format!("{value}. Cause: {cause}"),
523 schema: schema.clone(),
524 })
525 };
526
527 match schema {
528 Schema::Int | Schema::TimeMillis | Schema::Date => encode_int(value, &mut self.writer),
529 Schema::Long
530 | Schema::TimeMicros
531 | Schema::TimestampMillis
532 | Schema::TimestampMicros
533 | Schema::TimestampNanos
534 | Schema::LocalTimestampMillis
535 | Schema::LocalTimestampMicros
536 | Schema::LocalTimestampNanos => encode_long(value as i64, &mut self.writer),
537 Schema::Union(union_schema) => {
538 for (i, variant_schema) in union_schema.schemas.iter().enumerate() {
539 match variant_schema {
540 Schema::Int
541 | Schema::TimeMillis
542 | Schema::Date
543 | Schema::Long
544 | Schema::TimeMicros
545 | Schema::TimestampMillis
546 | Schema::TimestampMicros
547 | Schema::TimestampNanos
548 | Schema::LocalTimestampMillis
549 | Schema::LocalTimestampMicros
550 | Schema::LocalTimestampNanos => {
551 encode_int(i as i32, &mut *self.writer)?;
552 return self.serialize_i32_with_schema(value, variant_schema);
553 }
554 _ => { }
555 }
556 }
557 Err(create_error(format!(
558 "Cannot find a matching int-like schema in {union_schema:?}"
559 )))
560 }
561 expected => Err(create_error(format!("Expected {expected}. Got: Int/Long"))),
562 }
563 }
564
565 fn serialize_i64_with_schema(&mut self, value: i64, schema: &Schema) -> Result<usize, Error> {
566 let create_error = |cause: String| {
567 Error::new(Details::SerializeValueWithSchema {
568 value_type: "i64",
569 value: format!("{value}. Cause: {cause}"),
570 schema: schema.clone(),
571 })
572 };
573
574 match schema {
575 Schema::Int | Schema::TimeMillis | Schema::Date => {
576 let int_value =
577 i32::try_from(value).map_err(|cause| create_error(cause.to_string()))?;
578 encode_int(int_value, &mut self.writer)
579 }
580 Schema::Long
581 | Schema::TimeMicros
582 | Schema::TimestampMillis
583 | Schema::TimestampMicros
584 | Schema::TimestampNanos
585 | Schema::LocalTimestampMillis
586 | Schema::LocalTimestampMicros
587 | Schema::LocalTimestampNanos => encode_long(value, &mut self.writer),
588 Schema::Union(union_schema) => {
589 for (i, variant_schema) in union_schema.schemas.iter().enumerate() {
590 match variant_schema {
591 Schema::Int
592 | Schema::TimeMillis
593 | Schema::Date
594 | Schema::Long
595 | Schema::TimeMicros
596 | Schema::TimestampMillis
597 | Schema::TimestampMicros
598 | Schema::TimestampNanos
599 | Schema::LocalTimestampMillis
600 | Schema::LocalTimestampMicros
601 | Schema::LocalTimestampNanos => {
602 encode_int(i as i32, &mut *self.writer)?;
603 return self.serialize_i64_with_schema(value, variant_schema);
604 }
605 _ => { }
606 }
607 }
608 Err(create_error(format!(
609 "Cannot find a matching int/long-like schema in {:?}",
610 union_schema.schemas
611 )))
612 }
613 expected => Err(create_error(format!("Expected: {expected}. Got: Int/Long"))),
614 }
615 }
616
617 fn serialize_u8_with_schema(&mut self, value: u8, schema: &Schema) -> Result<usize, Error> {
618 let create_error = |cause: String| {
619 Error::new(Details::SerializeValueWithSchema {
620 value_type: "u8",
621 value: format!("{value}. Cause: {cause}"),
622 schema: schema.clone(),
623 })
624 };
625
626 match schema {
627 Schema::Int | Schema::TimeMillis | Schema::Date => {
628 encode_int(value as i32, &mut self.writer)
629 }
630 Schema::Long
631 | Schema::TimeMicros
632 | Schema::TimestampMillis
633 | Schema::TimestampMicros
634 | Schema::TimestampNanos
635 | Schema::LocalTimestampMillis
636 | Schema::LocalTimestampMicros
637 | Schema::LocalTimestampNanos => encode_long(value as i64, &mut self.writer),
638 Schema::Bytes => self.write_bytes(&[value]),
639 Schema::Union(union_schema) => {
640 for (i, variant_schema) in union_schema.schemas.iter().enumerate() {
641 match variant_schema {
642 Schema::Int
643 | Schema::TimeMillis
644 | Schema::Date
645 | Schema::Long
646 | Schema::TimeMicros
647 | Schema::TimestampMillis
648 | Schema::TimestampMicros
649 | Schema::TimestampNanos
650 | Schema::LocalTimestampMillis
651 | Schema::LocalTimestampMicros
652 | Schema::LocalTimestampNanos
653 | Schema::Bytes => {
654 encode_int(i as i32, &mut *self.writer)?;
655 return self.serialize_u8_with_schema(value, variant_schema);
656 }
657 _ => { }
658 }
659 }
660 Err(create_error(format!(
661 "Cannot find a matching Int-like, Long-like or Bytes schema in {union_schema:?}"
662 )))
663 }
664 expected => Err(create_error(format!("Expected: {expected}. Got: Int"))),
665 }
666 }
667
668 fn serialize_u32_with_schema(&mut self, value: u32, schema: &Schema) -> Result<usize, Error> {
669 let create_error = |cause: String| {
670 Error::new(Details::SerializeValueWithSchema {
671 value_type: "unsigned int (u16 | u32)",
672 value: format!("{value}. Cause: {cause}"),
673 schema: schema.clone(),
674 })
675 };
676
677 match schema {
678 Schema::Int | Schema::TimeMillis | Schema::Date => {
679 let int_value =
680 i32::try_from(value).map_err(|cause| create_error(cause.to_string()))?;
681 encode_int(int_value, &mut self.writer)
682 }
683 Schema::Long
684 | Schema::TimeMicros
685 | Schema::TimestampMillis
686 | Schema::TimestampMicros
687 | Schema::TimestampNanos
688 | Schema::LocalTimestampMillis
689 | Schema::LocalTimestampMicros
690 | Schema::LocalTimestampNanos => encode_long(value as i64, &mut self.writer),
691 Schema::Union(union_schema) => {
692 for (i, variant_schema) in union_schema.schemas.iter().enumerate() {
693 match variant_schema {
694 Schema::Int
695 | Schema::TimeMillis
696 | Schema::Date
697 | Schema::Long
698 | Schema::TimeMicros
699 | Schema::TimestampMillis
700 | Schema::TimestampMicros
701 | Schema::TimestampNanos
702 | Schema::LocalTimestampMillis
703 | Schema::LocalTimestampMicros
704 | Schema::LocalTimestampNanos => {
705 encode_int(i as i32, &mut *self.writer)?;
706 return self.serialize_u32_with_schema(value, variant_schema);
707 }
708 _ => { }
709 }
710 }
711 Err(create_error(format!(
712 "Cannot find a matching Int-like or Long-like schema in {union_schema:?}"
713 )))
714 }
715 expected => Err(create_error(format!("Expected: {expected}. Got: Int/Long"))),
716 }
717 }
718
719 fn serialize_u64_with_schema(&mut self, value: u64, schema: &Schema) -> Result<usize, Error> {
720 let create_error = |cause: String| {
721 Error::new(Details::SerializeValueWithSchema {
722 value_type: "u64",
723 value: format!("{value}. Cause: {cause}"),
724 schema: schema.clone(),
725 })
726 };
727
728 match schema {
729 Schema::Int | Schema::TimeMillis | Schema::Date => {
730 let int_value =
731 i32::try_from(value).map_err(|cause| create_error(cause.to_string()))?;
732 encode_int(int_value, &mut self.writer)
733 }
734 Schema::Long
735 | Schema::TimeMicros
736 | Schema::TimestampMillis
737 | Schema::TimestampMicros
738 | Schema::TimestampNanos
739 | Schema::LocalTimestampMillis
740 | Schema::LocalTimestampMicros
741 | Schema::LocalTimestampNanos => {
742 let long_value =
743 i64::try_from(value).map_err(|cause| create_error(cause.to_string()))?;
744 encode_long(long_value, &mut self.writer)
745 }
746 Schema::Union(union_schema) => {
747 for (i, variant_schema) in union_schema.schemas.iter().enumerate() {
748 match variant_schema {
749 Schema::Int
750 | Schema::TimeMillis
751 | Schema::Date
752 | Schema::Long
753 | Schema::TimeMicros
754 | Schema::TimestampMillis
755 | Schema::TimestampMicros
756 | Schema::TimestampNanos
757 | Schema::LocalTimestampMillis
758 | Schema::LocalTimestampMicros
759 | Schema::LocalTimestampNanos => {
760 encode_int(i as i32, &mut *self.writer)?;
761 return self.serialize_u64_with_schema(value, variant_schema);
762 }
763 _ => { }
764 }
765 }
766 Err(create_error(format!(
767 "Cannot find a matching Int-like or Long-like schema in {:?}",
768 union_schema.schemas
769 )))
770 }
771 expected => Err(create_error(format!("Expected {expected}. Got: Int/Long"))),
772 }
773 }
774
775 fn serialize_f32_with_schema(&mut self, value: f32, schema: &Schema) -> Result<usize, Error> {
776 let create_error = |cause: String| {
777 Error::new(Details::SerializeValueWithSchema {
778 value_type: "f32",
779 value: format!("{value}. Cause: {cause}"),
780 schema: schema.clone(),
781 })
782 };
783
784 match schema {
785 Schema::Float => self
786 .writer
787 .write(&value.to_le_bytes())
788 .map_err(|e| Details::WriteBytes(e).into()),
789 Schema::Double => self
790 .writer
791 .write(&(value as f64).to_le_bytes())
792 .map_err(|e| Details::WriteBytes(e).into()),
793 Schema::Union(union_schema) => {
794 for (i, variant_schema) in union_schema.schemas.iter().enumerate() {
795 match variant_schema {
796 Schema::Float | Schema::Double => {
797 encode_int(i as i32, &mut *self.writer)?;
798 return self.serialize_f32_with_schema(value, variant_schema);
799 }
800 _ => { }
801 }
802 }
803 Err(create_error(format!(
804 "Cannot find a Float schema in {:?}",
805 union_schema.schemas
806 )))
807 }
808 expected => Err(create_error(format!("Expected: {expected}. Got: Float"))),
809 }
810 }
811
812 fn serialize_f64_with_schema(&mut self, value: f64, schema: &Schema) -> Result<usize, Error> {
813 let create_error = |cause: String| {
814 Error::new(Details::SerializeValueWithSchema {
815 value_type: "f64",
816 value: format!("{value}. Cause: {cause}"),
817 schema: schema.clone(),
818 })
819 };
820
821 match schema {
822 Schema::Float => self
823 .writer
824 .write(&(value as f32).to_le_bytes())
825 .map_err(|e| Details::WriteBytes(e).into()),
826 Schema::Double => self
827 .writer
828 .write(&value.to_le_bytes())
829 .map_err(|e| Details::WriteBytes(e).into()),
830 Schema::Union(union_schema) => {
831 for (i, variant_schema) in union_schema.schemas.iter().enumerate() {
832 match variant_schema {
833 Schema::Float | Schema::Double => {
834 encode_int(i as i32, &mut *self.writer)?;
835 return self.serialize_f64_with_schema(value, variant_schema);
836 }
837 _ => { }
838 }
839 }
840 Err(create_error(format!(
841 "Cannot find a Double schema in {:?}",
842 union_schema.schemas
843 )))
844 }
845 expected => Err(create_error(format!("Expected: {expected}. Got: Double"))),
846 }
847 }
848
849 fn serialize_char_with_schema(&mut self, value: char, schema: &Schema) -> Result<usize, Error> {
850 let create_error = |cause: String| {
851 Error::new(Details::SerializeValueWithSchema {
852 value_type: "char",
853 value: format!("{value}. Cause: {cause}"),
854 schema: schema.clone(),
855 })
856 };
857
858 match schema {
859 Schema::String | Schema::Bytes => self.write_bytes(String::from(value).as_bytes()),
860 Schema::Union(union_schema) => {
861 for (i, variant_schema) in union_schema.schemas.iter().enumerate() {
862 match variant_schema {
863 Schema::String | Schema::Bytes => {
864 encode_int(i as i32, &mut *self.writer)?;
865 return self.serialize_char_with_schema(value, variant_schema);
866 }
867 _ => { }
868 }
869 }
870 Err(create_error(format!(
871 "Cannot find a matching String or Bytes schema in {union_schema:?}"
872 )))
873 }
874 expected => Err(create_error(format!("Expected {expected}. Got: char"))),
875 }
876 }
877
878 fn serialize_str_with_schema(&mut self, value: &str, schema: &Schema) -> Result<usize, Error> {
879 let create_error = |cause: String| {
880 Error::new(Details::SerializeValueWithSchema {
881 value_type: "string",
882 value: format!("{value}. Cause: {cause}"),
883 schema: schema.clone(),
884 })
885 };
886
887 match schema {
888 Schema::String | Schema::Bytes | Schema::Uuid => self.write_bytes(value.as_bytes()),
889 Schema::BigDecimal => {
890 let decimal_val =
892 BigDecimal::from_str(value).map_err(|e| create_error(e.to_string()))?;
893 let decimal_bytes = big_decimal_as_bytes(&decimal_val)?;
894 self.write_bytes(decimal_bytes.as_slice())
895 }
896 Schema::Fixed(fixed_schema) => {
897 if value.len() == fixed_schema.size {
898 self.writer
899 .write(value.as_bytes())
900 .map_err(|e| Details::WriteBytes(e).into())
901 } else {
902 Err(create_error(format!(
903 "Fixed schema size ({}) does not match the value length ({})",
904 fixed_schema.size,
905 value.len()
906 )))
907 }
908 }
909 Schema::Ref { name } => {
910 let ref_schema = self.get_ref_schema(name)?;
911 self.serialize_str_with_schema(value, ref_schema)
912 }
913 Schema::Union(union_schema) => {
914 for (i, variant_schema) in union_schema.schemas.iter().enumerate() {
915 match variant_schema {
916 Schema::String
917 | Schema::Bytes
918 | Schema::Uuid
919 | Schema::Fixed(_)
920 | Schema::Ref { name: _ } => {
921 encode_int(i as i32, &mut *self.writer)?;
922 return self.serialize_str_with_schema(value, variant_schema);
923 }
924 _ => { }
925 }
926 }
927 Err(create_error(format!(
928 "Expected one of the union variants {:?}. Got: String",
929 union_schema.schemas
930 )))
931 }
932 expected => Err(create_error(format!("Expected: {expected}. Got: String"))),
933 }
934 }
935
936 fn serialize_bytes_with_schema(
937 &mut self,
938 value: &[u8],
939 schema: &Schema,
940 ) -> Result<usize, Error> {
941 let create_error = |cause: String| {
942 use std::fmt::Write;
943 let mut v_str = String::with_capacity(value.len());
944 for b in value {
945 if write!(&mut v_str, "{b:x}").is_err() {
946 v_str.push_str("??");
947 }
948 }
949 Error::new(Details::SerializeValueWithSchema {
950 value_type: "bytes",
951 value: format!("{v_str}. Cause: {cause}"),
952 schema: schema.clone(),
953 })
954 };
955
956 match schema {
957 Schema::String | Schema::Bytes | Schema::Uuid | Schema::BigDecimal => {
958 self.write_bytes(value)
959 }
960 Schema::Fixed(fixed_schema) => {
961 if value.len() == fixed_schema.size {
962 self.writer
963 .write(value)
964 .map_err(|e| Details::WriteBytes(e).into())
965 } else {
966 Err(create_error(format!(
967 "Fixed schema size ({}) does not match the value length ({})",
968 fixed_schema.size,
969 value.len()
970 )))
971 }
972 }
973 Schema::Duration => {
974 if value.len() == 12 {
975 self.writer
976 .write(value)
977 .map_err(|e| Details::WriteBytes(e).into())
978 } else {
979 Err(create_error(format!(
980 "Duration length must be 12! Got ({})",
981 value.len()
982 )))
983 }
984 }
985 Schema::Decimal(decimal_schema) => match decimal_schema.inner.as_ref() {
986 Schema::Bytes => self.write_bytes(value),
987 Schema::Fixed(fixed_schema) => match fixed_schema.size.checked_sub(value.len()) {
988 Some(pad) => {
989 let pad_val = match value.len() {
990 0 => 0,
991 _ => value[0],
992 };
993 let padding = vec![pad_val; pad];
994 self.writer
995 .write(padding.as_slice())
996 .map_err(Details::WriteBytes)?;
997 self.writer
998 .write(value)
999 .map_err(|e| Details::WriteBytes(e).into())
1000 }
1001 None => Err(Details::CompareFixedSizes {
1002 size: fixed_schema.size,
1003 n: value.len(),
1004 }
1005 .into()),
1006 },
1007 unsupported => Err(create_error(format!(
1008 "Decimal schema's inner should be Bytes or Fixed schema. Got: {unsupported}"
1009 ))),
1010 },
1011 Schema::Ref { name } => {
1012 let ref_schema = self.get_ref_schema(name)?;
1013 self.serialize_bytes_with_schema(value, ref_schema)
1014 }
1015 Schema::Union(union_schema) => {
1016 for (i, variant_schema) in union_schema.schemas.iter().enumerate() {
1017 match variant_schema {
1018 Schema::String
1019 | Schema::Bytes
1020 | Schema::Uuid
1021 | Schema::BigDecimal
1022 | Schema::Fixed(_)
1023 | Schema::Duration
1024 | Schema::Decimal(_)
1025 | Schema::Ref { name: _ } => {
1026 encode_int(i as i32, &mut *self.writer)?;
1027 return self.serialize_bytes_with_schema(value, variant_schema);
1028 }
1029 _ => { }
1030 }
1031 }
1032 Err(create_error(format!(
1033 "Cannot find a matching String, Bytes, Uuid, BigDecimal, Fixed, Duration, Decimal or Ref schema in {union_schema:?}"
1034 )))
1035 }
1036 unsupported => Err(create_error(format!(
1037 "Expected String, Bytes, Uuid, BigDecimal, Fixed, Duration, Decimal, Ref or Union schema. Got: {unsupported}"
1038 ))),
1039 }
1040 }
1041
1042 fn serialize_none_with_schema(&mut self, schema: &Schema) -> Result<usize, Error> {
1043 let create_error = |cause: String| {
1044 Error::new(Details::SerializeValueWithSchema {
1045 value_type: "none",
1046 value: format!("None. Cause: {cause}"),
1047 schema: schema.clone(),
1048 })
1049 };
1050
1051 match schema {
1052 Schema::Null => Ok(0),
1053 Schema::Union(union_schema) => {
1054 for (i, variant_schema) in union_schema.schemas.iter().enumerate() {
1055 match variant_schema {
1056 Schema::Null => {
1057 return encode_int(i as i32, &mut *self.writer);
1058 }
1059 _ => { }
1060 }
1061 }
1062 Err(create_error(format!(
1063 "Cannot find a matching Null schema in {:?}",
1064 union_schema.schemas
1065 )))
1066 }
1067 expected => Err(create_error(format!("Expected: {expected}. Got: Null"))),
1068 }
1069 }
1070
1071 fn serialize_some_with_schema<T>(&mut self, value: &T, schema: &Schema) -> Result<usize, Error>
1072 where
1073 T: ?Sized + ser::Serialize,
1074 {
1075 let create_error = |cause: String| {
1076 Error::new(Details::SerializeValueWithSchema {
1077 value_type: "some",
1078 value: format!("Some(?). Cause: {cause}"),
1079 schema: schema.clone(),
1080 })
1081 };
1082
1083 match schema {
1084 Schema::Union(union_schema) => {
1085 for (i, variant_schema) in union_schema.schemas.iter().enumerate() {
1086 match variant_schema {
1087 Schema::Null => { }
1088 _ => {
1089 encode_long(i as i64, &mut *self.writer)?;
1090 let mut variant_ser = SchemaAwareWriteSerializer::new(
1091 &mut *self.writer,
1092 variant_schema,
1093 self.names,
1094 self.enclosing_namespace.clone(),
1095 );
1096 return value.serialize(&mut variant_ser);
1097 }
1098 }
1099 }
1100 Err(create_error(format!(
1101 "Cannot find a matching Null schema in {:?}",
1102 union_schema.schemas
1103 )))
1104 }
1105 _ => value.serialize(self),
1106 }
1107 }
1108
1109 fn serialize_unit_struct_with_schema(
1110 &mut self,
1111 name: &'static str,
1112 schema: &Schema,
1113 ) -> Result<usize, Error> {
1114 let create_error = |cause: String| {
1115 Error::new(Details::SerializeValueWithSchema {
1116 value_type: "unit struct",
1117 value: format!("{name}. Cause: {cause}"),
1118 schema: schema.clone(),
1119 })
1120 };
1121
1122 match schema {
1123 Schema::Record(sch) => match sch.fields.len() {
1124 0 => Ok(0),
1125 too_many => Err(create_error(format!(
1126 "Too many fields: {too_many}. Expected: 0"
1127 ))),
1128 },
1129 Schema::Null => Ok(0),
1130 Schema::Ref { name: ref_name } => {
1131 let ref_schema = self.get_ref_schema(ref_name)?;
1132 self.serialize_unit_struct_with_schema(name, ref_schema)
1133 }
1134 Schema::Union(union_schema) => {
1135 for (i, variant_schema) in union_schema.schemas.iter().enumerate() {
1136 match variant_schema {
1137 Schema::Record(record_schema) if record_schema.fields.is_empty() => {
1138 encode_int(i as i32, &mut *self.writer)?;
1139 return self.serialize_unit_struct_with_schema(name, variant_schema);
1140 }
1141 Schema::Null | Schema::Ref { name: _ } => {
1142 encode_int(i as i32, &mut *self.writer)?;
1143 return self.serialize_unit_struct_with_schema(name, variant_schema);
1144 }
1145 _ => { }
1146 }
1147 }
1148 Err(create_error(format!(
1149 "Cannot find a matching Null schema in {union_schema:?}"
1150 )))
1151 }
1152 unsupported => Err(create_error(format!(
1153 "Expected Null or Union schema. Got: {unsupported}"
1154 ))),
1155 }
1156 }
1157
1158 fn serialize_unit_variant_with_schema(
1159 &mut self,
1160 name: &'static str,
1161 variant_index: u32,
1162 variant: &'static str,
1163 schema: &Schema,
1164 ) -> Result<usize, Error> {
1165 let create_error = |cause: String| {
1166 Error::new(Details::SerializeValueWithSchema {
1167 value_type: "unit variant",
1168 value: format!("{name}::{variant} (index={variant_index}). Cause: {cause}"),
1169 schema: schema.clone(),
1170 })
1171 };
1172
1173 match schema {
1174 Schema::Enum(enum_schema) => {
1175 if variant_index as usize >= enum_schema.symbols.len() {
1176 return Err(create_error(format!(
1177 "Variant index out of bounds: {}. The Enum schema has '{}' symbols",
1178 variant_index,
1179 enum_schema.symbols.len()
1180 )));
1181 }
1182
1183 encode_int(variant_index as i32, &mut self.writer)
1184 }
1185 Schema::Union(union_schema) => {
1186 if variant_index as usize >= union_schema.schemas.len() {
1187 return Err(create_error(format!(
1188 "Variant index out of bounds: {}. The union schema has '{}' schemas",
1189 variant_index,
1190 union_schema.schemas.len()
1191 )));
1192 }
1193
1194 encode_int(variant_index as i32, &mut self.writer)?;
1195 self.serialize_unit_struct_with_schema(
1196 name,
1197 &union_schema.schemas[variant_index as usize],
1198 )
1199 }
1200 Schema::Ref { name: ref_name } => {
1201 let ref_schema = self.get_ref_schema(ref_name)?;
1202 self.serialize_unit_variant_with_schema(name, variant_index, variant, ref_schema)
1203 }
1204 unsupported => Err(create_error(format!(
1205 "Unsupported schema: {unsupported:?}. Expected: Enum, Union or Ref"
1206 ))),
1207 }
1208 }
1209
1210 fn serialize_newtype_struct_with_schema<T>(
1211 &mut self,
1212 _name: &'static str,
1213 value: &T,
1214 schema: &Schema,
1215 ) -> Result<usize, Error>
1216 where
1217 T: ?Sized + ser::Serialize,
1218 {
1219 let mut inner_ser = SchemaAwareWriteSerializer::new(
1220 &mut *self.writer,
1221 schema,
1222 self.names,
1223 self.enclosing_namespace.clone(),
1224 );
1225 value.serialize(&mut inner_ser)
1227 }
1228
1229 fn serialize_newtype_variant_with_schema<T>(
1230 &mut self,
1231 name: &'static str,
1232 variant_index: u32,
1233 variant: &'static str,
1234 value: &T,
1235 schema: &Schema,
1236 ) -> Result<usize, Error>
1237 where
1238 T: ?Sized + ser::Serialize,
1239 {
1240 let create_error = |cause: String| {
1241 Error::new(Details::SerializeValueWithSchema {
1242 value_type: "newtype variant",
1243 value: format!("{name}::{variant}(?) (index={variant_index}). Cause: {cause}"),
1244 schema: schema.clone(),
1245 })
1246 };
1247
1248 match schema {
1249 Schema::Union(union_schema) => {
1250 let variant_schema = union_schema
1251 .schemas
1252 .get(variant_index as usize)
1253 .ok_or_else(|| {
1254 create_error(format!(
1255 "No variant schema at position {variant_index} for {union_schema:?}"
1256 ))
1257 })?;
1258
1259 encode_int(variant_index as i32, &mut self.writer)?;
1260 self.serialize_newtype_struct_with_schema(variant, value, variant_schema)
1261 }
1262 _ => Err(create_error(format!(
1263 "Expected Union schema. Got: {schema}"
1264 ))),
1265 }
1266 }
1267
1268 fn serialize_seq_with_schema<'a>(
1269 &'a mut self,
1270 len: Option<usize>,
1271 schema: &'s Schema,
1272 ) -> Result<SchemaAwareWriteSerializeSeq<'a, 's, W>, Error> {
1273 let create_error = |cause: String| {
1274 let len_str = len
1275 .map(|l| format!("{l}"))
1276 .unwrap_or_else(|| String::from("?"));
1277
1278 Error::new(Details::SerializeValueWithSchema {
1279 value_type: "sequence",
1280 value: format!("sequence (len={len_str}). Cause: {cause}"),
1281 schema: schema.clone(),
1282 })
1283 };
1284
1285 match schema {
1286 Schema::Array(array_schema) => Ok(SchemaAwareWriteSerializeSeq::new(
1287 self,
1288 array_schema.items.as_ref(),
1289 len,
1290 )),
1291 Schema::Union(union_schema) => {
1292 for (i, variant_schema) in union_schema.schemas.iter().enumerate() {
1293 match variant_schema {
1294 Schema::Array(_) => {
1295 encode_int(i as i32, &mut *self.writer)?;
1296 return self.serialize_seq_with_schema(len, variant_schema);
1297 }
1298 _ => { }
1299 }
1300 }
1301 Err(create_error(format!(
1302 "Expected Array schema in {union_schema:?}"
1303 )))
1304 }
1305 _ => Err(create_error(format!("Expected: {schema}. Got: Array"))),
1306 }
1307 }
1308
1309 fn serialize_tuple_with_schema<'a>(
1310 &'a mut self,
1311 len: usize,
1312 schema: &'s Schema,
1313 ) -> Result<SchemaAwareWriteSerializeSeq<'a, 's, W>, Error> {
1314 let create_error = |cause: String| {
1315 Error::new(Details::SerializeValueWithSchema {
1316 value_type: "tuple",
1317 value: format!("tuple (len={len}). Cause: {cause}"),
1318 schema: schema.clone(),
1319 })
1320 };
1321
1322 match schema {
1323 Schema::Array(array_schema) => Ok(SchemaAwareWriteSerializeSeq::new(
1324 self,
1325 array_schema.items.as_ref(),
1326 Some(len),
1327 )),
1328 Schema::Union(union_schema) => {
1329 for (i, variant_schema) in union_schema.schemas.iter().enumerate() {
1330 match variant_schema {
1331 Schema::Array(_) => {
1332 encode_int(i as i32, &mut *self.writer)?;
1333 return self.serialize_tuple_with_schema(len, variant_schema);
1334 }
1335 _ => { }
1336 }
1337 }
1338 Err(create_error(format!(
1339 "Expected Array schema in {union_schema:?}"
1340 )))
1341 }
1342 _ => Err(create_error(format!("Expected: {schema}. Got: Array"))),
1343 }
1344 }
1345
1346 fn serialize_tuple_struct_with_schema<'a>(
1347 &'a mut self,
1348 name: &'static str,
1349 len: usize,
1350 schema: &'s Schema,
1351 ) -> Result<SchemaAwareWriteSerializeTupleStruct<'a, 's, W>, Error> {
1352 let create_error = |cause: String| {
1353 Error::new(Details::SerializeValueWithSchema {
1354 value_type: "tuple struct",
1355 value: format!(
1356 "{name}({}). Cause: {cause}",
1357 vec!["?"; len].as_slice().join(",")
1358 ),
1359 schema: schema.clone(),
1360 })
1361 };
1362
1363 match schema {
1364 Schema::Array(sch) => Ok(SchemaAwareWriteSerializeTupleStruct::Array(
1365 SchemaAwareWriteSerializeSeq::new(self, &sch.items, Some(len)),
1366 )),
1367 Schema::Record(sch) => Ok(SchemaAwareWriteSerializeTupleStruct::Record(
1368 SchemaAwareWriteSerializeStruct::new(self, sch),
1369 )),
1370 Schema::Ref { name: ref_name } => {
1371 let ref_schema = self.get_ref_schema(ref_name)?;
1372 self.serialize_tuple_struct_with_schema(name, len, ref_schema)
1373 }
1374 Schema::Union(union_schema) => {
1375 for (i, variant_schema) in union_schema.schemas.iter().enumerate() {
1376 match variant_schema {
1377 Schema::Record(inner) => {
1378 if inner.fields.len() == len {
1379 encode_int(i as i32, &mut *self.writer)?;
1380 return self.serialize_tuple_struct_with_schema(
1381 name,
1382 len,
1383 variant_schema,
1384 );
1385 }
1386 }
1387 Schema::Array(_) | Schema::Ref { name: _ } => {
1388 encode_int(i as i32, &mut *self.writer)?;
1389 return self.serialize_tuple_struct_with_schema(
1390 name,
1391 len,
1392 variant_schema,
1393 );
1394 }
1395 _ => { }
1396 }
1397 }
1398 Err(create_error(format!(
1399 "Expected Record, Array or Ref schema in {union_schema:?}"
1400 )))
1401 }
1402 _ => Err(create_error(format!(
1403 "Expected Record, Array, Ref or Union schema. Got: {schema}"
1404 ))),
1405 }
1406 }
1407
1408 fn serialize_tuple_variant_with_schema<'a>(
1409 &'a mut self,
1410 name: &'static str,
1411 variant_index: u32,
1412 variant: &'static str,
1413 len: usize,
1414 schema: &'s Schema,
1415 ) -> Result<SchemaAwareWriteSerializeTupleStruct<'a, 's, W>, Error> {
1416 let create_error = |cause: String| {
1417 Error::new(Details::SerializeValueWithSchema {
1418 value_type: "tuple variant",
1419 value: format!(
1420 "{name}::{variant}({}) (index={variant_index}). Cause: {cause}",
1421 vec!["?"; len].as_slice().join(",")
1422 ),
1423 schema: schema.clone(),
1424 })
1425 };
1426
1427 match schema {
1428 Schema::Union(union_schema) => {
1429 let variant_schema = union_schema
1430 .schemas
1431 .get(variant_index as usize)
1432 .ok_or_else(|| {
1433 create_error(format!(
1434 "Cannot find a variant at position {variant_index} in {union_schema:?}"
1435 ))
1436 })?;
1437
1438 encode_int(variant_index as i32, &mut self.writer)?;
1439 self.serialize_tuple_struct_with_schema(variant, len, variant_schema)
1440 }
1441 _ => Err(create_error(format!(
1442 "Expected Union schema. Got: {schema}"
1443 ))),
1444 }
1445 }
1446
1447 fn serialize_map_with_schema<'a>(
1448 &'a mut self,
1449 len: Option<usize>,
1450 schema: &'s Schema,
1451 ) -> Result<SchemaAwareWriteSerializeMap<'a, 's, W>, Error> {
1452 let create_error = |cause: String| {
1453 let len_str = len
1454 .map(|l| format!("{l}"))
1455 .unwrap_or_else(|| String::from("?"));
1456
1457 Error::new(Details::SerializeValueWithSchema {
1458 value_type: "map",
1459 value: format!("map (size={len_str}). Cause: {cause}"),
1460 schema: schema.clone(),
1461 })
1462 };
1463
1464 match schema {
1465 Schema::Map(map_schema) => Ok(SchemaAwareWriteSerializeMap::new(
1466 self,
1467 map_schema.types.as_ref(),
1468 len,
1469 )),
1470 Schema::Union(union_schema) => {
1471 for (i, variant_schema) in union_schema.schemas.iter().enumerate() {
1472 match variant_schema {
1473 Schema::Map(_) => {
1474 encode_int(i as i32, &mut *self.writer)?;
1475 return self.serialize_map_with_schema(len, variant_schema);
1476 }
1477 _ => { }
1478 }
1479 }
1480 Err(create_error(format!(
1481 "Expected a Map schema in {union_schema:?}"
1482 )))
1483 }
1484 _ => Err(create_error(format!(
1485 "Expected Map or Union schema. Got: {schema}"
1486 ))),
1487 }
1488 }
1489
1490 fn serialize_struct_with_schema<'a>(
1491 &'a mut self,
1492 name: &'static str,
1493 len: usize,
1494 schema: &'s Schema,
1495 ) -> Result<SchemaAwareWriteSerializeStruct<'a, 's, W>, Error> {
1496 let create_error = |cause: String| {
1497 Error::new(Details::SerializeValueWithSchema {
1498 value_type: "struct",
1499 value: format!("{name}{{ ... }}. Cause: {cause}"),
1500 schema: schema.clone(),
1501 })
1502 };
1503
1504 match schema {
1505 Schema::Record(record_schema) => {
1506 Ok(SchemaAwareWriteSerializeStruct::new(self, record_schema))
1507 }
1508 Schema::Ref { name: ref_name } => {
1509 let ref_schema = self.get_ref_schema(ref_name)?;
1510 self.serialize_struct_with_schema(name, len, ref_schema)
1511 }
1512 Schema::Union(union_schema) => {
1513 for (i, variant_schema) in union_schema.schemas.iter().enumerate() {
1514 match variant_schema {
1515 Schema::Record(inner)
1516 if inner.fields.len() == len && inner.name.name == name =>
1517 {
1518 encode_int(i as i32, &mut *self.writer)?;
1519 return self.serialize_struct_with_schema(name, len, variant_schema);
1520 }
1521 Schema::Ref { name: _ } => {
1522 encode_int(i as i32, &mut *self.writer)?;
1523 return self.serialize_struct_with_schema(name, len, variant_schema);
1524 }
1525 _ => { }
1526 }
1527 }
1528 Err(create_error(format!(
1529 "Expected Record or Ref schema in {union_schema:?}"
1530 )))
1531 }
1532 _ => Err(create_error(format!(
1533 "Expected Record, Ref or Union schema. Got: {schema}"
1534 ))),
1535 }
1536 }
1537
1538 fn serialize_struct_variant_with_schema<'a>(
1539 &'a mut self,
1540 name: &'static str,
1541 variant_index: u32,
1542 variant: &'static str,
1543 len: usize,
1544 schema: &'s Schema,
1545 ) -> Result<SchemaAwareWriteSerializeStruct<'a, 's, W>, Error> {
1546 let create_error = |cause: String| {
1547 Error::new(Details::SerializeValueWithSchema {
1548 value_type: "struct variant",
1549 value: format!("{name}::{variant}{{ ... }} (size={len}. Cause: {cause})"),
1550 schema: schema.clone(),
1551 })
1552 };
1553
1554 match schema {
1555 Schema::Union(union_schema) => {
1556 let variant_schema = union_schema
1557 .schemas
1558 .get(variant_index as usize)
1559 .ok_or_else(|| {
1560 create_error(format!(
1561 "Cannot find variant at position {variant_index} in {union_schema:?}"
1562 ))
1563 })?;
1564
1565 encode_int(variant_index as i32, &mut self.writer)?;
1566 self.serialize_struct_with_schema(variant, len, variant_schema)
1567 }
1568 _ => Err(create_error(format!(
1569 "Expected Union schema. Got: {schema}"
1570 ))),
1571 }
1572 }
1573}
1574
1575impl<'a, 's, W: Write> ser::Serializer for &'a mut SchemaAwareWriteSerializer<'s, W> {
1576 type Ok = usize;
1577 type Error = Error;
1578 type SerializeSeq = SchemaAwareWriteSerializeSeq<'a, 's, W>;
1579 type SerializeTuple = SchemaAwareWriteSerializeSeq<'a, 's, W>;
1580 type SerializeTupleStruct = SchemaAwareWriteSerializeTupleStruct<'a, 's, W>;
1581 type SerializeTupleVariant = SchemaAwareWriteSerializeTupleStruct<'a, 's, W>;
1582 type SerializeMap = SchemaAwareWriteSerializeMap<'a, 's, W>;
1583 type SerializeStruct = SchemaAwareWriteSerializeStruct<'a, 's, W>;
1584 type SerializeStructVariant = SchemaAwareWriteSerializeStruct<'a, 's, W>;
1585
1586 fn serialize_bool(self, v: bool) -> Result<Self::Ok, Self::Error> {
1587 self.serialize_bool_with_schema(v, self.root_schema)
1588 }
1589
1590 fn serialize_i8(self, v: i8) -> Result<Self::Ok, Self::Error> {
1591 self.serialize_i32(v as i32)
1592 }
1593
1594 fn serialize_i16(self, v: i16) -> Result<Self::Ok, Self::Error> {
1595 self.serialize_i32(v as i32)
1596 }
1597
1598 fn serialize_i32(self, v: i32) -> Result<Self::Ok, Self::Error> {
1599 self.serialize_i32_with_schema(v, self.root_schema)
1600 }
1601
1602 fn serialize_i64(self, v: i64) -> Result<Self::Ok, Self::Error> {
1603 self.serialize_i64_with_schema(v, self.root_schema)
1604 }
1605
1606 fn serialize_u8(self, v: u8) -> Result<Self::Ok, Self::Error> {
1607 self.serialize_u8_with_schema(v, self.root_schema)
1608 }
1609
1610 fn serialize_u16(self, v: u16) -> Result<Self::Ok, Self::Error> {
1611 self.serialize_u32(v as u32)
1612 }
1613
1614 fn serialize_u32(self, v: u32) -> Result<Self::Ok, Self::Error> {
1615 self.serialize_u32_with_schema(v, self.root_schema)
1616 }
1617
1618 fn serialize_u64(self, v: u64) -> Result<Self::Ok, Self::Error> {
1619 self.serialize_u64_with_schema(v, self.root_schema)
1620 }
1621
1622 fn serialize_f32(self, v: f32) -> Result<Self::Ok, Self::Error> {
1623 self.serialize_f32_with_schema(v, self.root_schema)
1624 }
1625
1626 fn serialize_f64(self, v: f64) -> Result<Self::Ok, Self::Error> {
1627 self.serialize_f64_with_schema(v, self.root_schema)
1628 }
1629
1630 fn serialize_char(self, v: char) -> Result<Self::Ok, Self::Error> {
1631 self.serialize_char_with_schema(v, self.root_schema)
1632 }
1633
1634 fn serialize_str(self, v: &str) -> Result<Self::Ok, Self::Error> {
1635 self.serialize_str_with_schema(v, self.root_schema)
1636 }
1637
1638 fn serialize_bytes(self, v: &[u8]) -> Result<Self::Ok, Self::Error> {
1639 self.serialize_bytes_with_schema(v, self.root_schema)
1640 }
1641
1642 fn serialize_none(self) -> Result<Self::Ok, Self::Error> {
1643 self.serialize_none_with_schema(self.root_schema)
1644 }
1645
1646 fn serialize_some<T>(self, value: &T) -> Result<Self::Ok, Self::Error>
1647 where
1648 T: ?Sized + ser::Serialize,
1649 {
1650 self.serialize_some_with_schema(value, self.root_schema)
1651 }
1652
1653 fn serialize_unit(self) -> Result<Self::Ok, Self::Error> {
1654 self.serialize_none()
1655 }
1656
1657 fn serialize_unit_struct(self, name: &'static str) -> Result<Self::Ok, Self::Error> {
1658 self.serialize_unit_struct_with_schema(name, self.root_schema)
1659 }
1660
1661 fn serialize_unit_variant(
1662 self,
1663 name: &'static str,
1664 variant_index: u32,
1665 variant: &'static str,
1666 ) -> Result<Self::Ok, Self::Error> {
1667 self.serialize_unit_variant_with_schema(name, variant_index, variant, self.root_schema)
1668 }
1669
1670 fn serialize_newtype_struct<T>(
1671 self,
1672 name: &'static str,
1673 value: &T,
1674 ) -> Result<Self::Ok, Self::Error>
1675 where
1676 T: ?Sized + ser::Serialize,
1677 {
1678 self.serialize_newtype_struct_with_schema(name, value, self.root_schema)
1679 }
1680
1681 fn serialize_newtype_variant<T>(
1682 self,
1683 name: &'static str,
1684 variant_index: u32,
1685 variant: &'static str,
1686 value: &T,
1687 ) -> Result<Self::Ok, Self::Error>
1688 where
1689 T: ?Sized + ser::Serialize,
1690 {
1691 self.serialize_newtype_variant_with_schema(
1692 name,
1693 variant_index,
1694 variant,
1695 value,
1696 self.root_schema,
1697 )
1698 }
1699
1700 fn serialize_seq(self, len: Option<usize>) -> Result<Self::SerializeSeq, Self::Error> {
1701 self.serialize_seq_with_schema(len, self.root_schema)
1702 }
1703
1704 fn serialize_tuple(self, len: usize) -> Result<Self::SerializeTuple, Self::Error> {
1705 self.serialize_tuple_with_schema(len, self.root_schema)
1706 }
1707
1708 fn serialize_tuple_struct(
1709 self,
1710 name: &'static str,
1711 len: usize,
1712 ) -> Result<Self::SerializeTupleStruct, Self::Error> {
1713 self.serialize_tuple_struct_with_schema(name, len, self.root_schema)
1714 }
1715
1716 fn serialize_tuple_variant(
1717 self,
1718 name: &'static str,
1719 variant_index: u32,
1720 variant: &'static str,
1721 len: usize,
1722 ) -> Result<Self::SerializeTupleVariant, Self::Error> {
1723 self.serialize_tuple_variant_with_schema(
1724 name,
1725 variant_index,
1726 variant,
1727 len,
1728 self.root_schema,
1729 )
1730 }
1731
1732 fn serialize_map(self, len: Option<usize>) -> Result<Self::SerializeMap, Self::Error> {
1733 self.serialize_map_with_schema(len, self.root_schema)
1734 }
1735
1736 fn serialize_struct(
1737 self,
1738 name: &'static str,
1739 len: usize,
1740 ) -> Result<Self::SerializeStruct, Self::Error> {
1741 self.serialize_struct_with_schema(name, len, self.root_schema)
1742 }
1743
1744 fn serialize_struct_variant(
1745 self,
1746 name: &'static str,
1747 variant_index: u32,
1748 variant: &'static str,
1749 len: usize,
1750 ) -> Result<Self::SerializeStructVariant, Self::Error> {
1751 self.serialize_struct_variant_with_schema(
1752 name,
1753 variant_index,
1754 variant,
1755 len,
1756 self.root_schema,
1757 )
1758 }
1759
1760 fn is_human_readable(&self) -> bool {
1761 crate::util::is_human_readable()
1762 }
1763}
1764
1765#[cfg(test)]
1766mod tests {
1767 use super::*;
1768 use crate::{
1769 Days, Duration, Millis, Months, decimal::Decimal, error::Details, schema::ResolvedSchema,
1770 };
1771 use apache_avro_test_helper::TestResult;
1772 use bigdecimal::BigDecimal;
1773 use num_bigint::{BigInt, Sign};
1774 use serde::Serialize;
1775 use serde_bytes::{ByteArray, Bytes};
1776 use serial_test::serial;
1777 use std::{
1778 collections::{BTreeMap, HashMap},
1779 marker::PhantomData,
1780 sync::atomic::Ordering,
1781 };
1782 use uuid::Uuid;
1783
1784 #[test]
1785 fn test_serialize_null() -> TestResult {
1786 let schema = Schema::Null;
1787 let mut buffer: Vec<u8> = Vec::new();
1788 let names = HashMap::new();
1789 let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None);
1790
1791 ().serialize(&mut serializer)?;
1792 None::<()>.serialize(&mut serializer)?;
1793 None::<i32>.serialize(&mut serializer)?;
1794 None::<String>.serialize(&mut serializer)?;
1795 assert!("".serialize(&mut serializer).is_err());
1796 assert!(Some("").serialize(&mut serializer).is_err());
1797
1798 assert_eq!(buffer.as_slice(), Vec::<u8>::new().as_slice());
1799
1800 Ok(())
1801 }
1802
1803 #[test]
1804 fn test_serialize_bool() -> TestResult {
1805 let schema = Schema::Boolean;
1806 let mut buffer: Vec<u8> = Vec::new();
1807 let names = HashMap::new();
1808 let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None);
1809
1810 true.serialize(&mut serializer)?;
1811 false.serialize(&mut serializer)?;
1812 assert!("".serialize(&mut serializer).is_err());
1813 assert!(Some("").serialize(&mut serializer).is_err());
1814
1815 assert_eq!(buffer.as_slice(), &[1, 0]);
1816
1817 Ok(())
1818 }
1819
1820 #[test]
1821 fn test_serialize_int() -> TestResult {
1822 let schema = Schema::Int;
1823 let mut buffer: Vec<u8> = Vec::new();
1824 let names = HashMap::new();
1825 let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None);
1826
1827 4u8.serialize(&mut serializer)?;
1828 31u16.serialize(&mut serializer)?;
1829 13u32.serialize(&mut serializer)?;
1830 7i8.serialize(&mut serializer)?;
1831 (-57i16).serialize(&mut serializer)?;
1832 129i32.serialize(&mut serializer)?;
1833 assert!("".serialize(&mut serializer).is_err());
1834 assert!(Some("").serialize(&mut serializer).is_err());
1835
1836 assert_eq!(buffer.as_slice(), &[8, 62, 26, 14, 113, 130, 2]);
1837
1838 Ok(())
1839 }
1840
1841 #[test]
1842 fn test_serialize_long() -> TestResult {
1843 let schema = Schema::Long;
1844 let mut buffer: Vec<u8> = Vec::new();
1845 let names = HashMap::new();
1846 let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None);
1847
1848 4u8.serialize(&mut serializer)?;
1849 31u16.serialize(&mut serializer)?;
1850 13u32.serialize(&mut serializer)?;
1851 291u64.serialize(&mut serializer)?;
1852 7i8.serialize(&mut serializer)?;
1853 (-57i16).serialize(&mut serializer)?;
1854 129i32.serialize(&mut serializer)?;
1855 (-432i64).serialize(&mut serializer)?;
1856 assert!("".serialize(&mut serializer).is_err());
1857 assert!(Some("").serialize(&mut serializer).is_err());
1858
1859 assert_eq!(
1860 buffer.as_slice(),
1861 &[8, 62, 26, 198, 4, 14, 113, 130, 2, 223, 6]
1862 );
1863
1864 Ok(())
1865 }
1866
1867 #[test]
1868 fn test_serialize_float() -> TestResult {
1869 let schema = Schema::Float;
1870 let mut buffer: Vec<u8> = Vec::new();
1871 let names = HashMap::new();
1872 let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None);
1873
1874 4.7f32.serialize(&mut serializer)?;
1875 (-14.1f64).serialize(&mut serializer)?;
1876 assert!("".serialize(&mut serializer).is_err());
1877 assert!(Some("").serialize(&mut serializer).is_err());
1878
1879 assert_eq!(buffer.as_slice(), &[102, 102, 150, 64, 154, 153, 97, 193]);
1880
1881 Ok(())
1882 }
1883
1884 #[test]
1885 fn test_serialize_double() -> TestResult {
1886 let schema = Schema::Float;
1887 let mut buffer: Vec<u8> = Vec::new();
1888 let names = HashMap::new();
1889 let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None);
1890
1891 4.7f32.serialize(&mut serializer)?;
1892 (-14.1f64).serialize(&mut serializer)?;
1893 assert!("".serialize(&mut serializer).is_err());
1894 assert!(Some("").serialize(&mut serializer).is_err());
1895
1896 assert_eq!(buffer.as_slice(), &[102, 102, 150, 64, 154, 153, 97, 193]);
1897
1898 Ok(())
1899 }
1900
1901 #[test]
1902 fn test_serialize_bytes() -> TestResult {
1903 let schema = Schema::Bytes;
1904 let mut buffer: Vec<u8> = Vec::new();
1905 let names = HashMap::new();
1906 let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None);
1907
1908 'a'.serialize(&mut serializer)?;
1909 "test".serialize(&mut serializer)?;
1910 Bytes::new(&[12, 3, 7, 91, 4]).serialize(&mut serializer)?;
1911 assert!(().serialize(&mut serializer).is_err());
1912 assert!(PhantomData::<String>.serialize(&mut serializer).is_err());
1913
1914 assert_eq!(
1915 buffer.as_slice(),
1916 &[2, b'a', 8, b't', b'e', b's', b't', 10, 12, 3, 7, 91, 4]
1917 );
1918
1919 Ok(())
1920 }
1921
1922 #[test]
1923 fn test_serialize_string() -> TestResult {
1924 let schema = Schema::String;
1925 let mut buffer: Vec<u8> = Vec::new();
1926 let names = HashMap::new();
1927 let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None);
1928
1929 'a'.serialize(&mut serializer)?;
1930 "test".serialize(&mut serializer)?;
1931 Bytes::new(&[12, 3, 7, 91, 4]).serialize(&mut serializer)?;
1932 assert!(().serialize(&mut serializer).is_err());
1933 assert!(PhantomData::<String>.serialize(&mut serializer).is_err());
1934
1935 assert_eq!(
1936 buffer.as_slice(),
1937 &[2, b'a', 8, b't', b'e', b's', b't', 10, 12, 3, 7, 91, 4]
1938 );
1939
1940 Ok(())
1941 }
1942
1943 #[test]
1944 fn test_serialize_record() -> TestResult {
1945 let schema = Schema::parse_str(
1946 r#"{
1947 "type": "record",
1948 "name": "TestRecord",
1949 "fields": [
1950 {"name": "stringField", "type": "string"},
1951 {"name": "intField", "type": "int"}
1952 ]
1953 }"#,
1954 )?;
1955
1956 #[derive(Serialize)]
1957 #[serde(rename_all = "camelCase")]
1958 struct GoodTestRecord {
1959 string_field: String,
1960 int_field: i32,
1961 }
1962
1963 #[derive(Serialize)]
1964 #[serde(rename_all = "camelCase")]
1965 struct BadTestRecord {
1966 foo_string_field: String,
1967 bar_int_field: i32,
1968 }
1969
1970 let mut buffer: Vec<u8> = Vec::new();
1971 let names = HashMap::new();
1972 let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None);
1973
1974 let good_record = GoodTestRecord {
1975 string_field: String::from("test"),
1976 int_field: 10,
1977 };
1978 good_record.serialize(&mut serializer)?;
1979
1980 let bad_record = BadTestRecord {
1981 foo_string_field: String::from("test"),
1982 bar_int_field: 10,
1983 };
1984 assert!(bad_record.serialize(&mut serializer).is_err());
1985
1986 assert!("".serialize(&mut serializer).is_err());
1987 assert!(Some("").serialize(&mut serializer).is_err());
1988
1989 assert_eq!(buffer.as_slice(), &[8, b't', b'e', b's', b't', 20]);
1990
1991 Ok(())
1992 }
1993
1994 #[test]
1995 fn test_serialize_empty_record() -> TestResult {
1996 let schema = Schema::parse_str(
1997 r#"{
1998 "type": "record",
1999 "name": "EmptyRecord",
2000 "fields": []
2001 }"#,
2002 )?;
2003
2004 let mut buffer: Vec<u8> = Vec::new();
2005 let names = HashMap::new();
2006 let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None);
2007
2008 #[derive(Serialize)]
2009 struct EmptyRecord;
2010 EmptyRecord.serialize(&mut serializer)?;
2011
2012 #[derive(Serialize)]
2013 struct NonEmptyRecord {
2014 foo: String,
2015 }
2016 let record = NonEmptyRecord {
2017 foo: "bar".to_string(),
2018 };
2019 match record
2020 .serialize(&mut serializer)
2021 .map_err(Error::into_details)
2022 {
2023 Err(Details::FieldName(field_name)) if field_name == "foo" => (),
2024 unexpected => panic!("Expected an error. Got: {unexpected:?}"),
2025 }
2026
2027 match ().serialize(&mut serializer).map_err(Error::into_details) {
2028 Err(Details::SerializeValueWithSchema {
2029 value_type,
2030 value,
2031 schema,
2032 }) => {
2033 assert_eq!(value_type, "none"); assert_eq!(value, "None. Cause: Expected: Record. Got: Null");
2035 assert_eq!(schema, schema);
2036 }
2037 unexpected => panic!("Expected an error. Got: {unexpected:?}"),
2038 }
2039
2040 assert_eq!(buffer.len(), 0);
2041
2042 Ok(())
2043 }
2044
2045 #[test]
2046 fn test_serialize_enum() -> TestResult {
2047 let schema = Schema::parse_str(
2048 r#"{
2049 "type": "enum",
2050 "name": "Suit",
2051 "symbols": ["SPADES", "HEARTS", "DIAMONDS", "CLUBS"]
2052 }"#,
2053 )?;
2054
2055 #[derive(Serialize)]
2056 enum Suit {
2057 Spades,
2058 Hearts,
2059 Diamonds,
2060 Clubs,
2061 }
2062
2063 let mut buffer: Vec<u8> = Vec::new();
2064 let names = HashMap::new();
2065 let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None);
2066
2067 Suit::Spades.serialize(&mut serializer)?;
2068 Suit::Hearts.serialize(&mut serializer)?;
2069 Suit::Diamonds.serialize(&mut serializer)?;
2070 Suit::Clubs.serialize(&mut serializer)?;
2071 match None::<()>
2072 .serialize(&mut serializer)
2073 .map_err(Error::into_details)
2074 {
2075 Err(Details::SerializeValueWithSchema {
2076 value_type,
2077 value,
2078 schema,
2079 }) => {
2080 assert_eq!(value_type, "none");
2081 assert_eq!(value, "None. Cause: Expected: Enum. Got: Null");
2082 assert_eq!(schema, schema);
2083 }
2084 unexpected => panic!("Expected an error. Got: {unexpected:?}"),
2085 }
2086
2087 assert_eq!(buffer.as_slice(), &[0, 2, 4, 6]);
2088
2089 Ok(())
2090 }
2091
2092 #[test]
2093 fn test_serialize_array() -> TestResult {
2094 let schema = Schema::parse_str(
2095 r#"{
2096 "type": "array",
2097 "items": "long"
2098 }"#,
2099 )?;
2100
2101 let mut buffer: Vec<u8> = Vec::new();
2102 let names = HashMap::new();
2103 let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None);
2104
2105 let arr: Vec<i64> = vec![10, 5, 400];
2106 arr.serialize(&mut serializer)?;
2107
2108 match vec![1_f32]
2109 .serialize(&mut serializer)
2110 .map_err(Error::into_details)
2111 {
2112 Err(Details::SerializeValueWithSchema {
2113 value_type,
2114 value,
2115 schema,
2116 }) => {
2117 assert_eq!(value_type, "f32");
2118 assert_eq!(value, "1. Cause: Expected: Long. Got: Float");
2119 assert_eq!(schema, schema);
2120 }
2121 unexpected => panic!("Expected an error. Got: {unexpected:?}"),
2122 }
2123
2124 assert_eq!(buffer.as_slice(), &[6, 20, 10, 160, 6, 0]);
2125
2126 Ok(())
2127 }
2128
2129 #[test]
2130 fn test_serialize_map() -> TestResult {
2131 let schema = Schema::parse_str(
2132 r#"{
2133 "type": "map",
2134 "values": "long"
2135 }"#,
2136 )?;
2137
2138 let mut buffer: Vec<u8> = Vec::new();
2139 let names = HashMap::new();
2140 let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None);
2141
2142 let mut map: BTreeMap<String, i64> = BTreeMap::new();
2143 map.insert(String::from("item1"), 10);
2144 map.insert(String::from("item2"), 400);
2145
2146 map.serialize(&mut serializer)?;
2147
2148 let mut map: BTreeMap<String, &str> = BTreeMap::new();
2149 map.insert(String::from("item1"), "value1");
2150 match map.serialize(&mut serializer).map_err(Error::into_details) {
2151 Err(Details::SerializeValueWithSchema {
2152 value_type,
2153 value,
2154 schema,
2155 }) => {
2156 assert_eq!(value_type, "string");
2157 assert_eq!(value, "value1. Cause: Expected: Long. Got: String");
2158 assert_eq!(schema, schema);
2159 }
2160 unexpected => panic!("Expected an error. Got: {unexpected:?}"),
2161 }
2162
2163 assert_eq!(
2164 buffer.as_slice(),
2165 &[
2166 4, 10, b'i', b't', b'e', b'm', b'1', 20, 10, b'i', b't', b'e', b'm', b'2', 160, 6,
2167 0
2168 ]
2169 );
2170
2171 Ok(())
2172 }
2173
2174 #[test]
2175 fn test_serialize_nullable_union() -> TestResult {
2176 let schema = Schema::parse_str(
2177 r#"{
2178 "type": ["null", "long"]
2179 }"#,
2180 )?;
2181
2182 #[derive(Serialize)]
2183 enum NullableLong {
2184 Null,
2185 Long(i64),
2186 }
2187
2188 let mut buffer: Vec<u8> = Vec::new();
2189 let names = HashMap::new();
2190 let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None);
2191
2192 Some(10i64).serialize(&mut serializer)?;
2193 None::<i64>.serialize(&mut serializer)?;
2194 NullableLong::Long(400).serialize(&mut serializer)?;
2195 NullableLong::Null.serialize(&mut serializer)?;
2196
2197 match "invalid"
2198 .serialize(&mut serializer)
2199 .map_err(Error::into_details)
2200 {
2201 Err(Details::SerializeValueWithSchema {
2202 value_type,
2203 value,
2204 schema,
2205 }) => {
2206 assert_eq!(value_type, "string");
2207 assert_eq!(
2208 value,
2209 "invalid. Cause: Expected one of the union variants [Null, Long]. Got: String"
2210 );
2211 assert_eq!(schema, schema);
2212 }
2213 unexpected => panic!("Expected an error. Got: {unexpected:?}"),
2214 }
2215
2216 assert_eq!(buffer.as_slice(), &[2, 20, 0, 2, 160, 6, 0]);
2217
2218 Ok(())
2219 }
2220
2221 #[test]
2222 fn test_serialize_union() -> TestResult {
2223 let schema = Schema::parse_str(
2224 r#"{
2225 "type": ["null", "long", "string"]
2226 }"#,
2227 )?;
2228
2229 #[derive(Serialize)]
2230 enum LongOrString {
2231 Null,
2232 Long(i64),
2233 Str(String),
2234 }
2235
2236 let mut buffer: Vec<u8> = Vec::new();
2237 let names = HashMap::new();
2238 let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None);
2239
2240 LongOrString::Null.serialize(&mut serializer)?;
2241 LongOrString::Long(400).serialize(&mut serializer)?;
2242 LongOrString::Str(String::from("test")).serialize(&mut serializer)?;
2243
2244 match 1_f64
2245 .serialize(&mut serializer)
2246 .map_err(Error::into_details)
2247 {
2248 Err(Details::SerializeValueWithSchema {
2249 value_type,
2250 value,
2251 schema,
2252 }) => {
2253 assert_eq!(value_type, "f64");
2254 assert_eq!(
2255 value,
2256 "1. Cause: Cannot find a Double schema in [Null, Long, String]"
2257 );
2258 assert_eq!(schema, schema);
2259 }
2260 unexpected => panic!("Expected an error. Got: {unexpected:?}"),
2261 }
2262
2263 assert_eq!(
2264 buffer.as_slice(),
2265 &[0, 2, 160, 6, 4, 8, b't', b'e', b's', b't']
2266 );
2267
2268 Ok(())
2269 }
2270
2271 #[test]
2272 fn test_serialize_fixed() -> TestResult {
2273 let schema = Schema::parse_str(
2274 r#"{
2275 "type": "fixed",
2276 "size": 8,
2277 "name": "LongVal"
2278 }"#,
2279 )?;
2280
2281 let mut buffer: Vec<u8> = Vec::new();
2282 let names = HashMap::new();
2283 let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None);
2284
2285 Bytes::new(&[10, 124, 31, 97, 14, 201, 3, 88]).serialize(&mut serializer)?;
2286
2287 match Bytes::new(&[123])
2289 .serialize(&mut serializer)
2290 .map_err(Error::into_details)
2291 {
2292 Err(Details::SerializeValueWithSchema {
2293 value_type,
2294 value,
2295 schema,
2296 }) => {
2297 assert_eq!(value_type, "bytes");
2298 assert_eq!(
2299 value,
2300 "7b. Cause: Fixed schema size (8) does not match the value length (1)"
2301 ); assert_eq!(schema, schema);
2303 }
2304 unexpected => panic!("Expected an error. Got: {unexpected:?}"),
2305 }
2306
2307 match [1; 8]
2309 .serialize(&mut serializer)
2310 .map_err(Error::into_details)
2311 {
2312 Err(Details::SerializeValueWithSchema {
2313 value_type,
2314 value,
2315 schema,
2316 }) => {
2317 assert_eq!(value_type, "tuple"); assert_eq!(value, "tuple (len=8). Cause: Expected: Fixed. Got: Array");
2319 assert_eq!(schema, schema);
2320 }
2321 unexpected => panic!("Expected an error. Got: {unexpected:?}"),
2322 }
2323
2324 match &[1, 2, 3, 4, 5, 6, 7, 8]
2326 .serialize(&mut serializer)
2327 .map_err(Error::into_details)
2328 {
2329 Err(Details::SerializeValueWithSchema {
2330 value_type,
2331 value,
2332 schema,
2333 }) => {
2334 assert_eq!(*value_type, "tuple"); assert_eq!(value, "tuple (len=8). Cause: Expected: Fixed. Got: Array");
2336 assert_eq!(schema, schema);
2337 }
2338 unexpected => panic!("Expected an error. Got: {unexpected:?}"),
2339 }
2340
2341 assert_eq!(buffer.as_slice(), &[10, 124, 31, 97, 14, 201, 3, 88]);
2342
2343 Ok(())
2344 }
2345
2346 #[test]
2347 fn test_serialize_decimal_bytes() -> TestResult {
2348 let schema = Schema::parse_str(
2349 r#"{
2350 "type": "bytes",
2351 "logicalType": "decimal",
2352 "precision": 16,
2353 "scale": 2
2354 }"#,
2355 )?;
2356
2357 let mut buffer: Vec<u8> = Vec::new();
2358 let names = HashMap::new();
2359 let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None);
2360
2361 let val = Decimal::from(&[251, 155]);
2362 val.serialize(&mut serializer)?;
2363
2364 match ().serialize(&mut serializer).map_err(Error::into_details) {
2365 Err(Details::SerializeValueWithSchema {
2366 value_type,
2367 value,
2368 schema,
2369 }) => {
2370 assert_eq!(value_type, "none");
2371 assert_eq!(value, "None. Cause: Expected: Decimal. Got: Null");
2372 assert_eq!(schema, schema);
2373 }
2374 unexpected => panic!("Expected an error. Got: {unexpected:?}"),
2375 }
2376
2377 assert_eq!(buffer.as_slice(), &[4, 251, 155]);
2378
2379 Ok(())
2380 }
2381
2382 #[test]
2383 fn test_serialize_decimal_fixed() -> TestResult {
2384 let schema = Schema::parse_str(
2385 r#"{
2386 "type": "fixed",
2387 "name": "FixedDecimal",
2388 "size": 8,
2389 "logicalType": "decimal",
2390 "precision": 16,
2391 "scale": 2
2392 }"#,
2393 )?;
2394
2395 let mut buffer: Vec<u8> = Vec::new();
2396 let names = HashMap::new();
2397 let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None);
2398
2399 let val = Decimal::from(&[0, 0, 0, 0, 0, 0, 251, 155]);
2400 val.serialize(&mut serializer)?;
2401
2402 match ().serialize(&mut serializer).map_err(Error::into_details) {
2403 Err(Details::SerializeValueWithSchema {
2404 value_type,
2405 value,
2406 schema,
2407 }) => {
2408 assert_eq!(value_type, "none");
2409 assert_eq!(value, "None. Cause: Expected: Decimal. Got: Null");
2410 assert_eq!(schema, schema);
2411 }
2412 unexpected => panic!("Expected an error. Got: {unexpected:?}"),
2413 }
2414
2415 assert_eq!(buffer.as_slice(), &[0, 0, 0, 0, 0, 0, 251, 155]);
2416
2417 Ok(())
2418 }
2419
2420 #[test]
2421 #[serial(serde_is_human_readable)]
2422 fn test_serialize_bigdecimal() -> TestResult {
2423 let schema = Schema::parse_str(
2424 r#"{
2425 "type": "bytes",
2426 "logicalType": "big-decimal"
2427 }"#,
2428 )?;
2429
2430 crate::util::SERDE_HUMAN_READABLE.store(true, Ordering::Release);
2431 let mut buffer: Vec<u8> = Vec::new();
2432 let names = HashMap::new();
2433 let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None);
2434
2435 let val = BigDecimal::new(BigInt::new(Sign::Plus, vec![50024]), 2);
2436 val.serialize(&mut serializer)?;
2437
2438 assert_eq!(buffer.as_slice(), &[10, 6, 0, 195, 104, 4]);
2439
2440 Ok(())
2441 }
2442
2443 #[test]
2444 #[serial(serde_is_human_readable)]
2445 fn test_serialize_uuid() -> TestResult {
2446 let schema = Schema::parse_str(
2447 r#"{
2448 "type": "string",
2449 "logicalType": "uuid"
2450 }"#,
2451 )?;
2452
2453 crate::util::SERDE_HUMAN_READABLE.store(true, Ordering::Release);
2454 let mut buffer: Vec<u8> = Vec::new();
2455 let names = HashMap::new();
2456 let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None);
2457
2458 "8c28da81-238c-4326-bddd-4e3d00cc5099"
2459 .parse::<Uuid>()?
2460 .serialize(&mut serializer)?;
2461
2462 match 1_u8.serialize(&mut serializer).map_err(Error::into_details) {
2463 Err(Details::SerializeValueWithSchema {
2464 value_type,
2465 value,
2466 schema,
2467 }) => {
2468 assert_eq!(value_type, "u8");
2469 assert_eq!(value, "1. Cause: Expected: Uuid. Got: Int");
2470 assert_eq!(schema, schema);
2471 }
2472 unexpected => panic!("Expected an error. Got: {unexpected:?}"),
2473 }
2474
2475 assert_eq!(
2476 buffer.as_slice(),
2477 &[
2478 72, b'8', b'c', b'2', b'8', b'd', b'a', b'8', b'1', b'-', b'2', b'3', b'8', b'c',
2479 b'-', b'4', b'3', b'2', b'6', b'-', b'b', b'd', b'd', b'd', b'-', b'4', b'e', b'3',
2480 b'd', b'0', b'0', b'c', b'c', b'5', b'0', b'9', b'9'
2481 ]
2482 );
2483
2484 Ok(())
2485 }
2486
2487 #[test]
2488 fn test_serialize_date() -> TestResult {
2489 let schema = Schema::parse_str(
2490 r#"{
2491 "type": "int",
2492 "logicalType": "date"
2493 }"#,
2494 )?;
2495
2496 let mut buffer: Vec<u8> = Vec::new();
2497 let names = HashMap::new();
2498 let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None);
2499
2500 100_u8.serialize(&mut serializer)?;
2501 1000_u16.serialize(&mut serializer)?;
2502 10000_u32.serialize(&mut serializer)?;
2503 1000_i16.serialize(&mut serializer)?;
2504 10000_i32.serialize(&mut serializer)?;
2505
2506 match 10000_f32
2507 .serialize(&mut serializer)
2508 .map_err(Error::into_details)
2509 {
2510 Err(Details::SerializeValueWithSchema {
2511 value_type,
2512 value,
2513 schema,
2514 }) => {
2515 assert_eq!(value_type, "f32");
2516 assert_eq!(value, "10000. Cause: Expected: Date. Got: Float");
2517 assert_eq!(schema, schema);
2518 }
2519 unexpected => panic!("Expected an error. Got: {unexpected:?}"),
2520 }
2521
2522 assert_eq!(
2523 buffer.as_slice(),
2524 &[200, 1, 208, 15, 160, 156, 1, 208, 15, 160, 156, 1]
2525 );
2526
2527 Ok(())
2528 }
2529
2530 #[test]
2531 fn test_serialize_time_millis() -> TestResult {
2532 let schema = Schema::parse_str(
2533 r#"{
2534 "type": "int",
2535 "logicalType": "time-millis"
2536 }"#,
2537 )?;
2538
2539 let mut buffer: Vec<u8> = Vec::new();
2540 let names = HashMap::new();
2541 let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None);
2542
2543 100_u8.serialize(&mut serializer)?;
2544 1000_u16.serialize(&mut serializer)?;
2545 10000_u32.serialize(&mut serializer)?;
2546 1000_i16.serialize(&mut serializer)?;
2547 10000_i32.serialize(&mut serializer)?;
2548
2549 match 10000_f32
2550 .serialize(&mut serializer)
2551 .map_err(Error::into_details)
2552 {
2553 Err(Details::SerializeValueWithSchema {
2554 value_type,
2555 value,
2556 schema,
2557 }) => {
2558 assert_eq!(value_type, "f32");
2559 assert_eq!(value, "10000. Cause: Expected: TimeMillis. Got: Float");
2560 assert_eq!(schema, schema);
2561 }
2562 unexpected => panic!("Expected an error. Got: {unexpected:?}"),
2563 }
2564
2565 assert_eq!(
2566 buffer.as_slice(),
2567 &[200, 1, 208, 15, 160, 156, 1, 208, 15, 160, 156, 1]
2568 );
2569
2570 Ok(())
2571 }
2572
2573 #[test]
2574 fn test_serialize_time_micros() -> TestResult {
2575 let schema = Schema::parse_str(
2576 r#"{
2577 "type": "long",
2578 "logicalType": "time-micros"
2579 }"#,
2580 )?;
2581
2582 let mut buffer: Vec<u8> = Vec::new();
2583 let names = HashMap::new();
2584 let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None);
2585
2586 100_u8.serialize(&mut serializer)?;
2587 1000_u16.serialize(&mut serializer)?;
2588 10000_u32.serialize(&mut serializer)?;
2589 1000_i16.serialize(&mut serializer)?;
2590 10000_i32.serialize(&mut serializer)?;
2591 10000_i64.serialize(&mut serializer)?;
2592
2593 match 10000_f32
2594 .serialize(&mut serializer)
2595 .map_err(Error::into_details)
2596 {
2597 Err(Details::SerializeValueWithSchema {
2598 value_type,
2599 value,
2600 schema,
2601 }) => {
2602 assert_eq!(value_type, "f32");
2603 assert_eq!(value, "10000. Cause: Expected: TimeMicros. Got: Float");
2604 assert_eq!(schema, schema);
2605 }
2606 unexpected => panic!("Expected an error. Got: {unexpected:?}"),
2607 }
2608
2609 assert_eq!(
2610 buffer.as_slice(),
2611 &[
2612 200, 1, 208, 15, 160, 156, 1, 208, 15, 160, 156, 1, 160, 156, 1
2613 ]
2614 );
2615
2616 Ok(())
2617 }
2618
2619 #[test]
2620 fn test_serialize_timestamp() -> TestResult {
2621 for precision in ["millis", "micros", "nanos"] {
2622 let schema = Schema::parse_str(&format!(
2623 r#"{{
2624 "type": "long",
2625 "logicalType": "timestamp-{precision}"
2626 }}"#
2627 ))?;
2628
2629 let mut buffer: Vec<u8> = Vec::new();
2630 let names = HashMap::new();
2631 let mut serializer =
2632 SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None);
2633
2634 100_u8.serialize(&mut serializer)?;
2635 1000_u16.serialize(&mut serializer)?;
2636 10000_u32.serialize(&mut serializer)?;
2637 1000_i16.serialize(&mut serializer)?;
2638 10000_i32.serialize(&mut serializer)?;
2639 10000_i64.serialize(&mut serializer)?;
2640
2641 match 10000_f64
2642 .serialize(&mut serializer)
2643 .map_err(Error::into_details)
2644 {
2645 Err(Details::SerializeValueWithSchema {
2646 value_type,
2647 value,
2648 schema,
2649 }) => {
2650 let mut capital_precision = precision.to_string();
2651 if let Some(c) = capital_precision.chars().next() {
2652 capital_precision.replace_range(..1, &c.to_uppercase().to_string());
2653 }
2654 assert_eq!(value_type, "f64");
2655 assert_eq!(
2656 value,
2657 format!(
2658 "10000. Cause: Expected: Timestamp{capital_precision}. Got: Double"
2659 )
2660 );
2661 assert_eq!(schema, schema);
2662 }
2663 unexpected => panic!("Expected an error. Got: {unexpected:?}"),
2664 }
2665
2666 assert_eq!(
2667 buffer.as_slice(),
2668 &[
2669 200, 1, 208, 15, 160, 156, 1, 208, 15, 160, 156, 1, 160, 156, 1
2670 ]
2671 );
2672 }
2673
2674 Ok(())
2675 }
2676
2677 #[test]
2678 fn test_serialize_duration() -> TestResult {
2679 let schema = Schema::parse_str(
2680 r#"{
2681 "type": "fixed",
2682 "size": 12,
2683 "name": "duration",
2684 "logicalType": "duration"
2685 }"#,
2686 )?;
2687
2688 let mut buffer: Vec<u8> = Vec::new();
2689 let names = HashMap::new();
2690 let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None);
2691
2692 let duration_bytes =
2693 ByteArray::new(Duration::new(Months::new(3), Days::new(2), Millis::new(1200)).into());
2694 duration_bytes.serialize(&mut serializer)?;
2695
2696 match [1; 12]
2697 .serialize(&mut serializer)
2698 .map_err(Error::into_details)
2699 {
2700 Err(Details::SerializeValueWithSchema {
2701 value_type,
2702 value,
2703 schema,
2704 }) => {
2705 assert_eq!(value_type, "tuple"); assert_eq!(
2707 value,
2708 "tuple (len=12). Cause: Expected: Duration. Got: Array"
2709 );
2710 assert_eq!(schema, schema);
2711 }
2712 unexpected => panic!("Expected an error. Got: {unexpected:?}"),
2713 }
2714
2715 assert_eq!(buffer.as_slice(), &[3, 0, 0, 0, 2, 0, 0, 0, 176, 4, 0, 0]);
2716
2717 Ok(())
2718 }
2719
2720 #[test]
2721 #[serial(serde_is_human_readable)] fn test_serialize_recursive_record() -> TestResult {
2723 let schema = Schema::parse_str(
2724 r#"{
2725 "type": "record",
2726 "name": "TestRecord",
2727 "fields": [
2728 {"name": "stringField", "type": "string"},
2729 {"name": "intField", "type": "int"},
2730 {"name": "bigDecimalField", "type": {"type": "bytes", "logicalType": "big-decimal"}},
2731 {"name": "uuidField", "type": "fixed", "size": 16, "logicalType": "uuid"},
2732 {"name": "innerRecord", "type": ["null", "TestRecord"]}
2733 ]
2734 }"#,
2735 )?;
2736
2737 #[derive(Serialize)]
2738 #[serde(rename_all = "camelCase")]
2739 struct TestRecord {
2740 string_field: String,
2741 int_field: i32,
2742 big_decimal_field: BigDecimal,
2743 uuid_field: Uuid,
2744 inner_record: Option<Box<TestRecord>>,
2746 }
2747
2748 crate::util::SERDE_HUMAN_READABLE.store(true, Ordering::Release);
2749 let mut buffer: Vec<u8> = Vec::new();
2750 let rs = ResolvedSchema::try_from(&schema)?;
2751 let mut serializer =
2752 SchemaAwareWriteSerializer::new(&mut buffer, &schema, rs.get_names(), None);
2753
2754 let good_record = TestRecord {
2755 string_field: String::from("test"),
2756 int_field: 10,
2757 big_decimal_field: BigDecimal::new(BigInt::new(Sign::Plus, vec![50024]), 2),
2758 uuid_field: "8c28da81-238c-4326-bddd-4e3d00cc5098".parse::<Uuid>()?,
2759 inner_record: Some(Box::new(TestRecord {
2760 string_field: String::from("inner_test"),
2761 int_field: 100,
2762 big_decimal_field: BigDecimal::new(BigInt::new(Sign::Plus, vec![20038]), 2),
2763 uuid_field: "8c28da81-238c-4326-bddd-4e3d00cc5099".parse::<Uuid>()?,
2764 inner_record: None,
2765 })),
2766 };
2767 good_record.serialize(&mut serializer)?;
2768
2769 assert_eq!(
2770 buffer.as_slice(),
2771 &[
2772 8, 116, 101, 115, 116, 20, 10, 6, 0, 195, 104, 4, 72, 56, 99, 50, 56, 100, 97, 56,
2773 49, 45, 50, 51, 56, 99, 45, 52, 51, 50, 54, 45, 98, 100, 100, 100, 45, 52, 101, 51,
2774 100, 48, 48, 99, 99, 53, 48, 57, 56, 2, 20, 105, 110, 110, 101, 114, 95, 116, 101,
2775 115, 116, 200, 1, 8, 4, 78, 70, 4, 72, 56, 99, 50, 56, 100, 97, 56, 49, 45, 50, 51,
2776 56, 99, 45, 52, 51, 50, 54, 45, 98, 100, 100, 100, 45, 52, 101, 51, 100, 48, 48,
2777 99, 99, 53, 48, 57, 57, 0
2778 ]
2779 );
2780
2781 Ok(())
2782 }
2783}