1use crate::schema::{DecimalSchema, InnerDecimalSchema, UuidSchema};
22use crate::{
23 bigdecimal::big_decimal_as_bytes,
24 encode::{encode_int, encode_long},
25 error::{Details, Error},
26 schema::{Name, NamesRef, Namespace, RecordField, RecordSchema, Schema},
27 serde::util::StringSerializer,
28};
29use bigdecimal::BigDecimal;
30use serde::{Serialize, ser};
31use std::{borrow::Cow, cmp::Ordering, collections::HashMap, io::Write, str::FromStr};
32
33const COLLECTION_SERIALIZER_ITEM_LIMIT: usize = 1024;
34const COLLECTION_SERIALIZER_DEFAULT_INIT_ITEM_CAPACITY: usize = 32;
35const SINGLE_VALUE_INIT_BUFFER_SIZE: usize = 128;
36
37pub struct SchemaAwareWriteSerializeSeq<'a, 's, W: Write> {
44 ser: &'a mut SchemaAwareWriteSerializer<'s, W>,
45 item_schema: &'s Schema,
46 item_buffer_size: usize,
47 item_buffers: Vec<Vec<u8>>,
48 bytes_written: usize,
49}
50
51impl<'a, 's, W: Write> SchemaAwareWriteSerializeSeq<'a, 's, W> {
52 fn new(
53 ser: &'a mut SchemaAwareWriteSerializer<'s, W>,
54 item_schema: &'s Schema,
55 len: Option<usize>,
56 ) -> SchemaAwareWriteSerializeSeq<'a, 's, W> {
57 SchemaAwareWriteSerializeSeq {
58 ser,
59 item_schema,
60 item_buffer_size: SINGLE_VALUE_INIT_BUFFER_SIZE,
61 item_buffers: Vec::with_capacity(
62 len.unwrap_or(COLLECTION_SERIALIZER_DEFAULT_INIT_ITEM_CAPACITY),
63 ),
64 bytes_written: 0,
65 }
66 }
67
68 fn write_buffered_items(&mut self) -> Result<(), Error> {
69 if !self.item_buffers.is_empty() {
70 self.bytes_written +=
71 encode_long(self.item_buffers.len() as i64, &mut self.ser.writer)?;
72 for item in self.item_buffers.drain(..) {
73 self.bytes_written += self
74 .ser
75 .writer
76 .write(item.as_slice())
77 .map_err(Details::WriteBytes)?;
78 }
79 }
80
81 Ok(())
82 }
83
84 fn serialize_element<T: ser::Serialize>(&mut self, value: &T) -> Result<(), Error> {
85 let mut item_buffer: Vec<u8> = Vec::with_capacity(self.item_buffer_size);
86 let mut item_ser = SchemaAwareWriteSerializer::new(
87 &mut item_buffer,
88 self.item_schema,
89 self.ser.names,
90 self.ser.enclosing_namespace.clone(),
91 );
92 value.serialize(&mut item_ser)?;
93
94 self.item_buffer_size = std::cmp::max(self.item_buffer_size, item_buffer.len() + 16);
95
96 self.item_buffers.push(item_buffer);
97
98 if self.item_buffers.len() > COLLECTION_SERIALIZER_ITEM_LIMIT {
99 self.write_buffered_items()?;
100 }
101
102 Ok(())
103 }
104
105 fn end(mut self) -> Result<usize, Error> {
106 self.write_buffered_items()?;
107 self.bytes_written += self.ser.writer.write(&[0u8]).map_err(Details::WriteBytes)?;
108
109 Ok(self.bytes_written)
110 }
111}
112
113impl<W: Write> ser::SerializeSeq for SchemaAwareWriteSerializeSeq<'_, '_, W> {
114 type Ok = usize;
115 type Error = Error;
116
117 fn serialize_element<T>(&mut self, value: &T) -> Result<(), Self::Error>
118 where
119 T: ?Sized + ser::Serialize,
120 {
121 self.serialize_element(&value)
122 }
123
124 fn end(self) -> Result<Self::Ok, Self::Error> {
125 self.end()
126 }
127}
128
129impl<W: Write> ser::SerializeTuple for SchemaAwareWriteSerializeSeq<'_, '_, W> {
130 type Ok = usize;
131 type Error = Error;
132
133 fn serialize_element<T>(&mut self, value: &T) -> Result<(), Self::Error>
134 where
135 T: ?Sized + ser::Serialize,
136 {
137 ser::SerializeSeq::serialize_element(self, value)
138 }
139
140 fn end(self) -> Result<Self::Ok, Self::Error> {
141 ser::SerializeSeq::end(self)
142 }
143}
144
145pub struct SchemaAwareWriteSerializeMap<'a, 's, W: Write> {
152 ser: &'a mut SchemaAwareWriteSerializer<'s, W>,
153 item_schema: &'s Schema,
154 item_buffer_size: usize,
155 item_buffers: Vec<Vec<u8>>,
156 bytes_written: usize,
157}
158
159impl<'a, 's, W: Write> SchemaAwareWriteSerializeMap<'a, 's, W> {
160 fn new(
161 ser: &'a mut SchemaAwareWriteSerializer<'s, W>,
162 item_schema: &'s Schema,
163 len: Option<usize>,
164 ) -> SchemaAwareWriteSerializeMap<'a, 's, W> {
165 SchemaAwareWriteSerializeMap {
166 ser,
167 item_schema,
168 item_buffer_size: SINGLE_VALUE_INIT_BUFFER_SIZE,
169 item_buffers: Vec::with_capacity(
170 len.unwrap_or(COLLECTION_SERIALIZER_DEFAULT_INIT_ITEM_CAPACITY),
171 ),
172 bytes_written: 0,
173 }
174 }
175
176 fn write_buffered_items(&mut self) -> Result<(), Error> {
177 if !self.item_buffers.is_empty() {
178 self.bytes_written +=
179 encode_long(self.item_buffers.len() as i64, &mut self.ser.writer)?;
180 for item in self.item_buffers.drain(..) {
181 self.bytes_written += self
182 .ser
183 .writer
184 .write(item.as_slice())
185 .map_err(Details::WriteBytes)?;
186 }
187 }
188
189 Ok(())
190 }
191}
192
193impl<W: Write> ser::SerializeMap for SchemaAwareWriteSerializeMap<'_, '_, W> {
194 type Ok = usize;
195 type Error = Error;
196
197 fn serialize_key<T>(&mut self, key: &T) -> Result<(), Self::Error>
198 where
199 T: ?Sized + ser::Serialize,
200 {
201 let mut element_buffer: Vec<u8> = Vec::with_capacity(self.item_buffer_size);
202 let string_schema = Schema::String;
203 let mut key_ser = SchemaAwareWriteSerializer::new(
204 &mut element_buffer,
205 &string_schema,
206 self.ser.names,
207 self.ser.enclosing_namespace.clone(),
208 );
209 key.serialize(&mut key_ser)?;
210
211 self.item_buffers.push(element_buffer);
212
213 Ok(())
214 }
215
216 fn serialize_value<T>(&mut self, value: &T) -> Result<(), Self::Error>
217 where
218 T: ?Sized + ser::Serialize,
219 {
220 let last_index = self.item_buffers.len() - 1;
221 let element_buffer = &mut self.item_buffers[last_index];
222 let mut val_ser = SchemaAwareWriteSerializer::new(
223 element_buffer,
224 self.item_schema,
225 self.ser.names,
226 self.ser.enclosing_namespace.clone(),
227 );
228 value.serialize(&mut val_ser)?;
229
230 self.item_buffer_size = std::cmp::max(self.item_buffer_size, element_buffer.len() + 16);
231
232 if self.item_buffers.len() > COLLECTION_SERIALIZER_ITEM_LIMIT {
233 self.write_buffered_items()?;
234 }
235
236 Ok(())
237 }
238
239 fn end(mut self) -> Result<Self::Ok, Self::Error> {
240 self.write_buffered_items()?;
241 self.bytes_written += self.ser.writer.write(&[0u8]).map_err(Details::WriteBytes)?;
242
243 Ok(self.bytes_written)
244 }
245}
246
247pub struct SchemaAwareWriteSerializeStruct<'a, 's, W: Write> {
252 ser: &'a mut SchemaAwareWriteSerializer<'s, W>,
253 record_schema: &'s RecordSchema,
254 field_cache: HashMap<usize, Vec<u8>>,
256 map_field_name: Option<String>,
258 field_position: usize,
259 bytes_written: usize,
260}
261
262impl<'a, 's, W: Write> SchemaAwareWriteSerializeStruct<'a, 's, W> {
263 fn new(
264 ser: &'a mut SchemaAwareWriteSerializer<'s, W>,
265 record_schema: &'s RecordSchema,
266 ) -> SchemaAwareWriteSerializeStruct<'a, 's, W> {
267 SchemaAwareWriteSerializeStruct {
268 ser,
269 record_schema,
270 field_cache: HashMap::new(),
271 map_field_name: None,
272 field_position: 0,
273 bytes_written: 0,
274 }
275 }
276
277 fn serialize_next_field<T>(&mut self, field: &RecordField, value: &T) -> Result<(), Error>
278 where
279 T: ?Sized + ser::Serialize,
280 {
281 match self.field_position.cmp(&field.position) {
282 Ordering::Equal => {
283 let mut value_ser = SchemaAwareWriteSerializer::new(
285 &mut *self.ser.writer,
286 &field.schema,
287 self.ser.names,
288 self.ser.enclosing_namespace.clone(),
289 );
290 self.bytes_written += value.serialize(&mut value_ser)?;
291
292 self.field_position += 1;
293 while let Some(bytes) = self.field_cache.remove(&self.field_position) {
294 self.ser
295 .writer
296 .write_all(&bytes)
297 .map_err(Details::WriteBytes)?;
298 self.bytes_written += bytes.len();
299 self.field_position += 1;
300 }
301 Ok(())
302 }
303 Ordering::Less => {
304 let mut bytes = Vec::new();
307 let mut value_ser = SchemaAwareWriteSerializer::new(
308 &mut bytes,
309 &field.schema,
310 self.ser.names,
311 self.ser.enclosing_namespace.clone(),
312 );
313 value.serialize(&mut value_ser)?;
314 if self.field_cache.insert(field.position, bytes).is_some() {
315 Err(Details::FieldNameDuplicate(field.name.clone()).into())
316 } else {
317 Ok(())
318 }
319 }
320 Ordering::Greater => {
321 Err(Details::FieldNameDuplicate(field.name.clone()).into())
324 }
325 }
326 }
327
328 fn end(mut self) -> Result<usize, Error> {
329 while self.field_position != self.record_schema.fields.len() {
331 let field_info = &self.record_schema.fields[self.field_position];
332 if let Some(bytes) = self.field_cache.remove(&self.field_position) {
333 self.ser
334 .writer
335 .write_all(&bytes)
336 .map_err(Details::WriteBytes)?;
337 self.bytes_written += bytes.len();
338 self.field_position += 1;
339 } else if let Some(default) = &field_info.default {
340 self.serialize_next_field(field_info, default)
341 .map_err(|e| Details::SerializeRecordFieldWithSchema {
342 field_name: field_info.name.clone(),
343 record_schema: Schema::Record(self.record_schema.clone()),
344 error: Box::new(e),
345 })?;
346 } else {
347 return Err(Details::MissingDefaultForSkippedField {
348 field_name: field_info.name.clone(),
349 schema: Schema::Record(self.record_schema.clone()),
350 }
351 .into());
352 }
353 }
354
355 debug_assert!(
356 self.field_cache.is_empty(),
357 "There should be no more unwritten fields at this point: {:?}",
358 self.field_cache
359 );
360 debug_assert!(
361 self.map_field_name.is_none(),
362 "There should be no field name at this point: field {:?}",
363 self.map_field_name
364 );
365 Ok(self.bytes_written)
366 }
367}
368
369impl<W: Write> ser::SerializeStruct for SchemaAwareWriteSerializeStruct<'_, '_, W> {
370 type Ok = usize;
371 type Error = Error;
372
373 fn serialize_field<T>(&mut self, key: &'static str, value: &T) -> Result<(), Self::Error>
374 where
375 T: ?Sized + ser::Serialize,
376 {
377 let record_field = self
378 .record_schema
379 .lookup
380 .get(key)
381 .and_then(|idx| self.record_schema.fields.get(*idx));
382
383 match record_field {
384 Some(field) => self.serialize_next_field(field, value).map_err(|e| {
385 Details::SerializeRecordFieldWithSchema {
386 field_name: key.to_string(),
387 record_schema: Schema::Record(self.record_schema.clone()),
388 error: Box::new(e),
389 }
390 .into()
391 }),
392 None => Err(Details::FieldName(String::from(key)).into()),
393 }
394 }
395
396 fn skip_field(&mut self, key: &'static str) -> Result<(), Self::Error> {
397 let skipped_field = self
398 .record_schema
399 .lookup
400 .get(key)
401 .and_then(|idx| self.record_schema.fields.get(*idx));
402
403 if let Some(skipped_field) = skipped_field {
404 if let Some(default) = &skipped_field.default {
405 self.serialize_next_field(skipped_field, default)
406 .map_err(|e| Details::SerializeRecordFieldWithSchema {
407 field_name: key.to_string(),
408 record_schema: Schema::Record(self.record_schema.clone()),
409 error: Box::new(e),
410 })?;
411 } else {
412 return Err(Details::MissingDefaultForSkippedField {
413 field_name: key.to_string(),
414 schema: Schema::Record(self.record_schema.clone()),
415 }
416 .into());
417 }
418 } else {
419 return Err(Details::GetField(key.to_string()).into());
420 }
421
422 Ok(())
423 }
424
425 fn end(self) -> Result<Self::Ok, Self::Error> {
426 self.end()
427 }
428}
429
430impl<W: Write> ser::SerializeMap for SchemaAwareWriteSerializeStruct<'_, '_, W> {
432 type Ok = usize;
433 type Error = Error;
434
435 fn serialize_key<T>(&mut self, key: &T) -> Result<(), Self::Error>
436 where
437 T: ?Sized + Serialize,
438 {
439 let name = key.serialize(StringSerializer)?;
440 let old = self.map_field_name.replace(name);
441 debug_assert!(
442 old.is_none(),
443 "Expected a value instead of a key: old key: {old:?}, new key: {:?}",
444 self.map_field_name
445 );
446 Ok(())
447 }
448
449 fn serialize_value<T>(&mut self, value: &T) -> Result<(), Self::Error>
450 where
451 T: ?Sized + Serialize,
452 {
453 let key = self.map_field_name.take().ok_or(Details::MapNoKey)?;
454 let record_field = self
455 .record_schema
456 .lookup
457 .get(&key)
458 .and_then(|idx| self.record_schema.fields.get(*idx));
459 match record_field {
460 Some(field) => self.serialize_next_field(field, value).map_err(|e| {
461 Details::SerializeRecordFieldWithSchema {
462 field_name: key.to_string(),
463 record_schema: Schema::Record(self.record_schema.clone()),
464 error: Box::new(e),
465 }
466 .into()
467 }),
468 None => Err(Details::FieldName(key).into()),
469 }
470 }
471
472 fn end(self) -> Result<Self::Ok, Self::Error> {
473 self.end()
474 }
475}
476
477impl<W: Write> ser::SerializeStructVariant for SchemaAwareWriteSerializeStruct<'_, '_, W> {
478 type Ok = usize;
479 type Error = Error;
480
481 fn serialize_field<T>(&mut self, key: &'static str, value: &T) -> Result<(), Self::Error>
482 where
483 T: ?Sized + ser::Serialize,
484 {
485 ser::SerializeStruct::serialize_field(self, key, value)
486 }
487
488 fn end(self) -> Result<Self::Ok, Self::Error> {
489 ser::SerializeStruct::end(self)
490 }
491}
492
493pub enum SchemaAwareWriteSerializeMapOrStruct<'a, 's, W: Write> {
497 Struct(SchemaAwareWriteSerializeStruct<'a, 's, W>),
498 Map(SchemaAwareWriteSerializeMap<'a, 's, W>),
499}
500
501impl<W: Write> ser::SerializeMap for SchemaAwareWriteSerializeMapOrStruct<'_, '_, W> {
502 type Ok = usize;
503 type Error = Error;
504
505 fn serialize_key<T>(&mut self, key: &T) -> Result<(), Self::Error>
506 where
507 T: ?Sized + Serialize,
508 {
509 match self {
510 Self::Struct(s) => s.serialize_key(key),
511 Self::Map(s) => s.serialize_key(key),
512 }
513 }
514
515 fn serialize_value<T>(&mut self, value: &T) -> Result<(), Self::Error>
516 where
517 T: ?Sized + Serialize,
518 {
519 match self {
520 Self::Struct(s) => s.serialize_value(value),
521 Self::Map(s) => s.serialize_value(value),
522 }
523 }
524
525 fn end(self) -> Result<Self::Ok, Self::Error> {
526 match self {
527 Self::Struct(s) => s.end(),
528 Self::Map(s) => s.end(),
529 }
530 }
531}
532
533pub enum SchemaAwareWriteSerializeTupleStruct<'a, 's, W: Write> {
537 Record(SchemaAwareWriteSerializeStruct<'a, 's, W>),
538 Array(SchemaAwareWriteSerializeSeq<'a, 's, W>),
539}
540
541impl<W: Write> SchemaAwareWriteSerializeTupleStruct<'_, '_, W> {
542 fn serialize_field<T>(&mut self, value: &T) -> Result<(), Error>
543 where
544 T: ?Sized + ser::Serialize,
545 {
546 use SchemaAwareWriteSerializeTupleStruct::*;
547 match self {
548 Record(_record_ser) => {
549 unimplemented!("Tuple struct serialization to record is not supported!");
550 }
551 Array(array_ser) => array_ser.serialize_element(&value),
552 }
553 }
554
555 fn end(self) -> Result<usize, Error> {
556 use SchemaAwareWriteSerializeTupleStruct::*;
557 match self {
558 Record(record_ser) => record_ser.end(),
559 Array(array_ser) => array_ser.end(),
560 }
561 }
562}
563
564impl<W: Write> ser::SerializeTupleStruct for SchemaAwareWriteSerializeTupleStruct<'_, '_, W> {
565 type Ok = usize;
566 type Error = Error;
567
568 fn serialize_field<T>(&mut self, value: &T) -> Result<(), Self::Error>
569 where
570 T: ?Sized + ser::Serialize,
571 {
572 self.serialize_field(&value)
573 }
574
575 fn end(self) -> Result<Self::Ok, Self::Error> {
576 self.end()
577 }
578}
579
580impl<W: Write> ser::SerializeTupleVariant for SchemaAwareWriteSerializeTupleStruct<'_, '_, W> {
581 type Ok = usize;
582 type Error = Error;
583
584 fn serialize_field<T>(&mut self, value: &T) -> Result<(), Self::Error>
585 where
586 T: ?Sized + ser::Serialize,
587 {
588 self.serialize_field(&value)
589 }
590
591 fn end(self) -> Result<Self::Ok, Self::Error> {
592 self.end()
593 }
594}
595
596pub struct SchemaAwareWriteSerializer<'s, W: Write> {
602 writer: &'s mut W,
603 root_schema: &'s Schema,
604 names: &'s NamesRef<'s>,
605 enclosing_namespace: Namespace,
606}
607
608impl<'s, W: Write> SchemaAwareWriteSerializer<'s, W> {
609 pub fn new(
619 writer: &'s mut W,
620 schema: &'s Schema,
621 names: &'s NamesRef<'s>,
622 enclosing_namespace: Namespace,
623 ) -> SchemaAwareWriteSerializer<'s, W> {
624 SchemaAwareWriteSerializer {
625 writer,
626 root_schema: schema,
627 names,
628 enclosing_namespace,
629 }
630 }
631
632 fn get_ref_schema(&self, name: &'s Name) -> Result<&'s Schema, Error> {
633 let full_name = match name.namespace {
634 Some(_) => Cow::Borrowed(name),
635 None => Cow::Owned(Name {
636 name: name.name.clone(),
637 namespace: self.enclosing_namespace.clone(),
638 }),
639 };
640
641 let ref_schema = self.names.get(full_name.as_ref()).copied();
642
643 ref_schema.ok_or_else(|| Details::SchemaResolutionError(full_name.as_ref().clone()).into())
644 }
645
646 fn write_bytes(&mut self, bytes: &[u8]) -> Result<usize, Error> {
647 let mut bytes_written: usize = 0;
648
649 bytes_written += encode_long(bytes.len() as i64, &mut self.writer)?;
650 self.writer.write_all(bytes).map_err(Details::WriteBytes)?;
652 bytes_written += bytes.len();
653
654 Ok(bytes_written)
655 }
656
657 fn serialize_bool_with_schema(&mut self, value: bool, schema: &Schema) -> Result<usize, Error> {
658 let create_error = |cause: String| {
659 Error::new(Details::SerializeValueWithSchema {
660 value_type: "bool",
661 value: format!("{value}. Cause: {cause}"),
662 schema: schema.clone(),
663 })
664 };
665
666 match schema {
667 Schema::Boolean => self
668 .writer
669 .write(&[u8::from(value)])
670 .map_err(|e| Details::WriteBytes(e).into()),
671 Schema::Union(union_schema) => {
672 for (i, variant_schema) in union_schema.schemas.iter().enumerate() {
673 match variant_schema {
674 Schema::Boolean => {
675 encode_int(i as i32, &mut *self.writer)?;
676 return self.serialize_bool_with_schema(value, variant_schema);
677 }
678 _ => { }
679 }
680 }
681 Err(create_error(format!(
682 "No matching Schema::Bool found in {:?}",
683 union_schema.schemas
684 )))
685 }
686 expected => Err(create_error(format!("Expected {expected}. Got: Bool"))),
687 }
688 }
689
690 fn serialize_i32_with_schema(&mut self, value: i32, schema: &Schema) -> Result<usize, Error> {
691 let create_error = |cause: String| {
692 Error::new(Details::SerializeValueWithSchema {
693 value_type: "int (i8 | i16 | i32)",
694 value: format!("{value}. Cause: {cause}"),
695 schema: schema.clone(),
696 })
697 };
698
699 match schema {
700 Schema::Int | Schema::TimeMillis | Schema::Date => encode_int(value, &mut self.writer),
701 Schema::Long
702 | Schema::TimeMicros
703 | Schema::TimestampMillis
704 | Schema::TimestampMicros
705 | Schema::TimestampNanos
706 | Schema::LocalTimestampMillis
707 | Schema::LocalTimestampMicros
708 | Schema::LocalTimestampNanos => encode_long(value as i64, &mut self.writer),
709 Schema::Union(union_schema) => {
710 for (i, variant_schema) in union_schema.schemas.iter().enumerate() {
711 match variant_schema {
712 Schema::Int
713 | Schema::TimeMillis
714 | Schema::Date
715 | Schema::Long
716 | Schema::TimeMicros
717 | Schema::TimestampMillis
718 | Schema::TimestampMicros
719 | Schema::TimestampNanos
720 | Schema::LocalTimestampMillis
721 | Schema::LocalTimestampMicros
722 | Schema::LocalTimestampNanos => {
723 encode_int(i as i32, &mut *self.writer)?;
724 return self.serialize_i32_with_schema(value, variant_schema);
725 }
726 _ => { }
727 }
728 }
729 Err(create_error(format!(
730 "Cannot find a matching int-like schema in {union_schema:?}"
731 )))
732 }
733 expected => Err(create_error(format!("Expected {expected}. Got: Int/Long"))),
734 }
735 }
736
737 fn serialize_i64_with_schema(&mut self, value: i64, schema: &Schema) -> Result<usize, Error> {
738 let create_error = |cause: String| {
739 Error::new(Details::SerializeValueWithSchema {
740 value_type: "i64",
741 value: format!("{value}. Cause: {cause}"),
742 schema: schema.clone(),
743 })
744 };
745
746 match schema {
747 Schema::Int | Schema::TimeMillis | Schema::Date => {
748 let int_value =
749 i32::try_from(value).map_err(|cause| create_error(cause.to_string()))?;
750 encode_int(int_value, &mut self.writer)
751 }
752 Schema::Long
753 | Schema::TimeMicros
754 | Schema::TimestampMillis
755 | Schema::TimestampMicros
756 | Schema::TimestampNanos
757 | Schema::LocalTimestampMillis
758 | Schema::LocalTimestampMicros
759 | Schema::LocalTimestampNanos => encode_long(value, &mut self.writer),
760 Schema::Union(union_schema) => {
761 for (i, variant_schema) in union_schema.schemas.iter().enumerate() {
762 match variant_schema {
763 Schema::Int
764 | Schema::TimeMillis
765 | Schema::Date
766 | Schema::Long
767 | Schema::TimeMicros
768 | Schema::TimestampMillis
769 | Schema::TimestampMicros
770 | Schema::TimestampNanos
771 | Schema::LocalTimestampMillis
772 | Schema::LocalTimestampMicros
773 | Schema::LocalTimestampNanos => {
774 encode_int(i as i32, &mut *self.writer)?;
775 return self.serialize_i64_with_schema(value, variant_schema);
776 }
777 _ => { }
778 }
779 }
780 Err(create_error(format!(
781 "Cannot find a matching int/long-like schema in {:?}",
782 union_schema.schemas
783 )))
784 }
785 expected => Err(create_error(format!("Expected: {expected}. Got: Int/Long"))),
786 }
787 }
788
789 fn serialize_i128_with_schema(&mut self, value: i128, schema: &Schema) -> Result<usize, Error> {
790 let create_error = |cause: String| {
791 Error::new(Details::SerializeValueWithSchema {
792 value_type: "i128",
793 value: format!("{value}. Cause: {cause}"),
794 schema: schema.clone(),
795 })
796 };
797
798 match schema {
799 Schema::Fixed(fixed) if fixed.size == 16 && fixed.name.name == "i128" => {
800 self.writer
801 .write_all(&value.to_le_bytes())
802 .map_err(Details::WriteBytes)?;
803 Ok(16)
804 }
805 Schema::Union(union_schema) => {
806 for (i, variant_schema) in union_schema.schemas.iter().enumerate() {
807 match variant_schema {
808 Schema::Fixed(fixed) if fixed.size == 16 && fixed.name.name == "i128" => {
809 encode_int(i as i32, &mut *self.writer)?;
810 return self.serialize_i128_with_schema(value, variant_schema);
811 }
812 _ => { }
813 }
814 }
815 Err(create_error(format!(
816 "Cannot find a Fixed(size = 16, name = \"i128\") schema in {:?}",
817 union_schema.schemas
818 )))
819 }
820 expected => Err(create_error(format!("Expected {expected}. Got: i128"))),
821 }
822 }
823
824 fn serialize_u8_with_schema(&mut self, value: u8, schema: &Schema) -> Result<usize, Error> {
825 let create_error = |cause: String| {
826 Error::new(Details::SerializeValueWithSchema {
827 value_type: "u8",
828 value: format!("{value}. Cause: {cause}"),
829 schema: schema.clone(),
830 })
831 };
832
833 match schema {
834 Schema::Int | Schema::TimeMillis | Schema::Date => {
835 encode_int(value as i32, &mut self.writer)
836 }
837 Schema::Long
838 | Schema::TimeMicros
839 | Schema::TimestampMillis
840 | Schema::TimestampMicros
841 | Schema::TimestampNanos
842 | Schema::LocalTimestampMillis
843 | Schema::LocalTimestampMicros
844 | Schema::LocalTimestampNanos => encode_long(value as i64, &mut self.writer),
845 Schema::Bytes => self.write_bytes(&[value]),
846 Schema::Union(union_schema) => {
847 for (i, variant_schema) in union_schema.schemas.iter().enumerate() {
848 match variant_schema {
849 Schema::Int
850 | Schema::TimeMillis
851 | Schema::Date
852 | Schema::Long
853 | Schema::TimeMicros
854 | Schema::TimestampMillis
855 | Schema::TimestampMicros
856 | Schema::TimestampNanos
857 | Schema::LocalTimestampMillis
858 | Schema::LocalTimestampMicros
859 | Schema::LocalTimestampNanos
860 | Schema::Bytes => {
861 encode_int(i as i32, &mut *self.writer)?;
862 return self.serialize_u8_with_schema(value, variant_schema);
863 }
864 _ => { }
865 }
866 }
867 Err(create_error(format!(
868 "Cannot find a matching Int-like, Long-like or Bytes schema in {union_schema:?}"
869 )))
870 }
871 expected => Err(create_error(format!("Expected: {expected}. Got: Int"))),
872 }
873 }
874
875 fn serialize_u32_with_schema(&mut self, value: u32, schema: &Schema) -> Result<usize, Error> {
876 let create_error = |cause: String| {
877 Error::new(Details::SerializeValueWithSchema {
878 value_type: "unsigned int (u16 | u32)",
879 value: format!("{value}. Cause: {cause}"),
880 schema: schema.clone(),
881 })
882 };
883
884 match schema {
885 Schema::Int | Schema::TimeMillis | Schema::Date => {
886 let int_value =
887 i32::try_from(value).map_err(|cause| create_error(cause.to_string()))?;
888 encode_int(int_value, &mut self.writer)
889 }
890 Schema::Long
891 | Schema::TimeMicros
892 | Schema::TimestampMillis
893 | Schema::TimestampMicros
894 | Schema::TimestampNanos
895 | Schema::LocalTimestampMillis
896 | Schema::LocalTimestampMicros
897 | Schema::LocalTimestampNanos => encode_long(value as i64, &mut self.writer),
898 Schema::Union(union_schema) => {
899 for (i, variant_schema) in union_schema.schemas.iter().enumerate() {
900 match variant_schema {
901 Schema::Int
902 | Schema::TimeMillis
903 | Schema::Date
904 | Schema::Long
905 | Schema::TimeMicros
906 | Schema::TimestampMillis
907 | Schema::TimestampMicros
908 | Schema::TimestampNanos
909 | Schema::LocalTimestampMillis
910 | Schema::LocalTimestampMicros
911 | Schema::LocalTimestampNanos => {
912 encode_int(i as i32, &mut *self.writer)?;
913 return self.serialize_u32_with_schema(value, variant_schema);
914 }
915 _ => { }
916 }
917 }
918 Err(create_error(format!(
919 "Cannot find a matching Int-like or Long-like schema in {union_schema:?}"
920 )))
921 }
922 expected => Err(create_error(format!("Expected: {expected}. Got: Int/Long"))),
923 }
924 }
925
926 fn serialize_u64_with_schema(&mut self, value: u64, schema: &Schema) -> Result<usize, Error> {
927 let create_error = |cause: String| {
928 Error::new(Details::SerializeValueWithSchema {
929 value_type: "u64",
930 value: format!("{value}. Cause: {cause}"),
931 schema: schema.clone(),
932 })
933 };
934
935 match schema {
936 Schema::Int | Schema::TimeMillis | Schema::Date => {
937 let int_value =
938 i32::try_from(value).map_err(|cause| create_error(cause.to_string()))?;
939 encode_int(int_value, &mut self.writer)
940 }
941 Schema::Long
942 | Schema::TimeMicros
943 | Schema::TimestampMillis
944 | Schema::TimestampMicros
945 | Schema::TimestampNanos
946 | Schema::LocalTimestampMillis
947 | Schema::LocalTimestampMicros
948 | Schema::LocalTimestampNanos => {
949 let long_value =
950 i64::try_from(value).map_err(|cause| create_error(cause.to_string()))?;
951 encode_long(long_value, &mut self.writer)
952 }
953 Schema::Fixed(fixed) if fixed.size == 8 && fixed.name.name == "u64" => {
954 self.writer
955 .write_all(&value.to_le_bytes())
956 .map_err(Details::WriteBytes)?;
957 Ok(8)
958 }
959 Schema::Union(union_schema) => {
960 for (i, variant_schema) in union_schema.schemas.iter().enumerate() {
961 match variant_schema {
962 Schema::Int
963 | Schema::TimeMillis
964 | Schema::Date
965 | Schema::Long
966 | Schema::TimeMicros
967 | Schema::TimestampMillis
968 | Schema::TimestampMicros
969 | Schema::TimestampNanos
970 | Schema::LocalTimestampMillis
971 | Schema::LocalTimestampMicros
972 | Schema::LocalTimestampNanos => {
973 encode_int(i as i32, &mut *self.writer)?;
974 return self.serialize_u64_with_schema(value, variant_schema);
975 }
976 Schema::Fixed(fixed) if fixed.size == 8 && fixed.name.name == "u64" => {
977 encode_int(i as i32, &mut *self.writer)?;
978 return self.serialize_u64_with_schema(value, variant_schema);
979 }
980 _ => { }
981 }
982 }
983 Err(create_error(format!(
984 "Cannot find a matching Int-like, Long-like or Fixed(size = 8, name \"u64\") schema in {:?}",
985 union_schema.schemas
986 )))
987 }
988 expected => Err(create_error(format!("Expected {expected}. Got: u64"))),
989 }
990 }
991
992 fn serialize_u128_with_schema(&mut self, value: u128, schema: &Schema) -> Result<usize, Error> {
993 let create_error = |cause: String| {
994 Error::new(Details::SerializeValueWithSchema {
995 value_type: "u128",
996 value: format!("{value}. Cause: {cause}"),
997 schema: schema.clone(),
998 })
999 };
1000
1001 match schema {
1002 Schema::Fixed(fixed) if fixed.size == 16 && fixed.name.name == "u128" => {
1003 self.writer
1004 .write_all(&value.to_le_bytes())
1005 .map_err(Details::WriteBytes)?;
1006 Ok(16)
1007 }
1008 Schema::Union(union_schema) => {
1009 for (i, variant_schema) in union_schema.schemas.iter().enumerate() {
1010 match variant_schema {
1011 Schema::Fixed(fixed) if fixed.size == 16 && fixed.name.name == "u128" => {
1012 encode_int(i as i32, &mut *self.writer)?;
1013 return self.serialize_u128_with_schema(value, variant_schema);
1014 }
1015 _ => { }
1016 }
1017 }
1018 Err(create_error(format!(
1019 "Cannot find a Fixed(size = 16, name = \"u128\") schema in {:?}",
1020 union_schema.schemas
1021 )))
1022 }
1023 expected => Err(create_error(format!("Expected {expected}. Got: u128"))),
1024 }
1025 }
1026
1027 fn serialize_f32_with_schema(&mut self, value: f32, schema: &Schema) -> Result<usize, Error> {
1028 let create_error = |cause: String| {
1029 Error::new(Details::SerializeValueWithSchema {
1030 value_type: "f32",
1031 value: format!("{value}. Cause: {cause}"),
1032 schema: schema.clone(),
1033 })
1034 };
1035
1036 match schema {
1037 Schema::Float => self
1038 .writer
1039 .write(&value.to_le_bytes())
1040 .map_err(|e| Details::WriteBytes(e).into()),
1041 Schema::Double => self
1042 .writer
1043 .write(&(value as f64).to_le_bytes())
1044 .map_err(|e| Details::WriteBytes(e).into()),
1045 Schema::Union(union_schema) => {
1046 for (i, variant_schema) in union_schema.schemas.iter().enumerate() {
1047 match variant_schema {
1048 Schema::Float | Schema::Double => {
1049 encode_int(i as i32, &mut *self.writer)?;
1050 return self.serialize_f32_with_schema(value, variant_schema);
1051 }
1052 _ => { }
1053 }
1054 }
1055 Err(create_error(format!(
1056 "Cannot find a Float schema in {:?}",
1057 union_schema.schemas
1058 )))
1059 }
1060 expected => Err(create_error(format!("Expected: {expected}. Got: Float"))),
1061 }
1062 }
1063
1064 fn serialize_f64_with_schema(&mut self, value: f64, schema: &Schema) -> Result<usize, Error> {
1065 let create_error = |cause: String| {
1066 Error::new(Details::SerializeValueWithSchema {
1067 value_type: "f64",
1068 value: format!("{value}. Cause: {cause}"),
1069 schema: schema.clone(),
1070 })
1071 };
1072
1073 match schema {
1074 Schema::Float => self
1075 .writer
1076 .write(&(value as f32).to_le_bytes())
1077 .map_err(|e| Details::WriteBytes(e).into()),
1078 Schema::Double => self
1079 .writer
1080 .write(&value.to_le_bytes())
1081 .map_err(|e| Details::WriteBytes(e).into()),
1082 Schema::Union(union_schema) => {
1083 for (i, variant_schema) in union_schema.schemas.iter().enumerate() {
1084 match variant_schema {
1085 Schema::Float | Schema::Double => {
1086 encode_int(i as i32, &mut *self.writer)?;
1087 return self.serialize_f64_with_schema(value, variant_schema);
1088 }
1089 _ => { }
1090 }
1091 }
1092 Err(create_error(format!(
1093 "Cannot find a Double schema in {:?}",
1094 union_schema.schemas
1095 )))
1096 }
1097 expected => Err(create_error(format!("Expected: {expected}. Got: Double"))),
1098 }
1099 }
1100
1101 fn serialize_char_with_schema(&mut self, value: char, schema: &Schema) -> Result<usize, Error> {
1102 let create_error = |cause: String| {
1103 Error::new(Details::SerializeValueWithSchema {
1104 value_type: "char",
1105 value: format!("{value}. Cause: {cause}"),
1106 schema: schema.clone(),
1107 })
1108 };
1109
1110 match schema {
1111 Schema::String | Schema::Bytes => self.write_bytes(String::from(value).as_bytes()),
1112 Schema::Fixed(fixed) if fixed.size == 4 && fixed.name.name == "char" => {
1113 self.writer
1114 .write_all(&u32::from(value).to_le_bytes())
1115 .map_err(Details::WriteBytes)?;
1116 Ok(4)
1117 }
1118 Schema::Union(union_schema) => {
1119 for (i, variant_schema) in union_schema.schemas.iter().enumerate() {
1120 match variant_schema {
1121 Schema::String | Schema::Bytes => {
1122 encode_int(i as i32, &mut *self.writer)?;
1123 return self.serialize_char_with_schema(value, variant_schema);
1124 }
1125 Schema::Fixed(fixed) if fixed.size == 4 && fixed.name.name == "char" => {
1126 encode_int(i as i32, &mut *self.writer)?;
1127 return self.serialize_char_with_schema(value, variant_schema);
1128 }
1129 _ => { }
1130 }
1131 }
1132 Err(create_error(format!(
1133 "Cannot find a matching String, Bytes or Fixed(size = 4, name = \"char\") schema in {union_schema:?}"
1134 )))
1135 }
1136 expected => Err(create_error(format!("Expected {expected}. Got: char"))),
1137 }
1138 }
1139
1140 fn serialize_str_with_schema(&mut self, value: &str, schema: &Schema) -> Result<usize, Error> {
1141 let create_error = |cause: String| {
1142 Error::new(Details::SerializeValueWithSchema {
1143 value_type: "string",
1144 value: format!("{value}. Cause: {cause}"),
1145 schema: schema.clone(),
1146 })
1147 };
1148
1149 match schema {
1150 Schema::String | Schema::Bytes | Schema::Uuid(UuidSchema::String) => {
1151 self.write_bytes(value.as_bytes())
1152 }
1153 Schema::Uuid(UuidSchema::Bytes | UuidSchema::Fixed(_)) => {
1154 Err(create_error("Expected bytes but got a string. Did you mean to use `Schema::Uuid(UuidSchema::String)` or `utils::serde_set_human_readable(false)`?".to_string()))
1155 }
1156 Schema::BigDecimal => {
1157 let decimal_val =
1159 BigDecimal::from_str(value).map_err(|e| create_error(e.to_string()))?;
1160 let decimal_bytes = big_decimal_as_bytes(&decimal_val)?;
1161 self.write_bytes(decimal_bytes.as_slice())
1162 }
1163 Schema::Fixed(fixed_schema) => {
1164 if value.len() == fixed_schema.size {
1165 self.writer
1166 .write(value.as_bytes())
1167 .map_err(|e| Details::WriteBytes(e).into())
1168 } else {
1169 Err(create_error(format!(
1170 "Fixed schema size ({}) does not match the value length ({})",
1171 fixed_schema.size,
1172 value.len()
1173 )))
1174 }
1175 }
1176 Schema::Ref { name } => {
1177 let ref_schema = self.get_ref_schema(name)?;
1178 self.serialize_str_with_schema(value, ref_schema)
1179 }
1180 Schema::Union(union_schema) => {
1181 for (i, variant_schema) in union_schema.schemas.iter().enumerate() {
1182 match variant_schema {
1183 Schema::String
1184 | Schema::Bytes
1185 | Schema::Uuid(UuidSchema::String)
1186 | Schema::Fixed(_)
1187 | Schema::Ref { name: _ } => {
1188 encode_int(i as i32, &mut *self.writer)?;
1189 return self.serialize_str_with_schema(value, variant_schema);
1190 }
1191 _ => { }
1192 }
1193 }
1194 Err(create_error(format!(
1195 "Expected one of the union variants {:?}. Got: String",
1196 union_schema.schemas
1197 )))
1198 }
1199 expected => Err(create_error(format!("Expected: {expected}. Got: String"))),
1200 }
1201 }
1202
1203 fn serialize_bytes_with_schema(
1204 &mut self,
1205 value: &[u8],
1206 schema: &Schema,
1207 ) -> Result<usize, Error> {
1208 let create_error = |cause: String| {
1209 use std::fmt::Write;
1210 let mut v_str = String::with_capacity(value.len());
1211 for b in value {
1212 if write!(&mut v_str, "{b:x}").is_err() {
1213 v_str.push_str("??");
1214 }
1215 }
1216 Error::new(Details::SerializeValueWithSchema {
1217 value_type: "bytes",
1218 value: format!("{v_str}. Cause: {cause}"),
1219 schema: schema.clone(),
1220 })
1221 };
1222
1223 match schema {
1224 Schema::String | Schema::Bytes | Schema::BigDecimal => self.write_bytes(value),
1225 Schema::Uuid(UuidSchema::Bytes) => {
1226 if value.len() == 16 {
1227 self.write_bytes(value)
1228 } else {
1229 Err(create_error(format!(
1230 "Expected 16 bytes for `Schema::Uuid(Bytes)` but got {} bytes",
1231 value.len()
1232 )))
1233 }
1234 }
1235 Schema::Uuid(UuidSchema::String) if value.len() == 16 => {
1236 Err(create_error("Expected a string, but got 16 bytes. Did you mean to use `Schema::Uuid(UuidSchema::Fixed)` or `utils::serde_set_human_readable(true)`?".to_string()))
1237 }
1238 Schema::Fixed(fixed_schema) | Schema::Uuid(UuidSchema::Fixed(fixed_schema)) => {
1239 if value.len() == fixed_schema.size {
1240 self.writer
1241 .write(value)
1242 .map_err(|e| Details::WriteBytes(e).into())
1243 } else {
1244 Err(create_error(format!(
1245 "Fixed schema size ({}) does not match the value length ({})",
1246 fixed_schema.size,
1247 value.len()
1248 )))
1249 }
1250 }
1251 Schema::Duration(_) => {
1252 if value.len() == 12 {
1253 self.writer
1254 .write(value)
1255 .map_err(|e| Details::WriteBytes(e).into())
1256 } else {
1257 Err(create_error(format!(
1258 "Duration length must be 12! Got ({})",
1259 value.len()
1260 )))
1261 }
1262 }
1263 Schema::Decimal(decimal_schema) => match &decimal_schema.inner {
1264 InnerDecimalSchema::Bytes => self.write_bytes(value),
1265 InnerDecimalSchema::Fixed(fixed_schema) => {
1266 match fixed_schema.size.checked_sub(value.len()) {
1267 Some(pad) => {
1268 let pad_val = match value.len() {
1269 0 => 0,
1270 _ => value[0],
1271 };
1272 let padding = vec![pad_val; pad];
1273 let mut bytes_written = self
1274 .writer
1275 .write(padding.as_slice())
1276 .map_err(Details::WriteBytes)?;
1277 bytes_written +=
1278 self.writer.write(value).map_err(Details::WriteBytes)?;
1279 Ok(bytes_written)
1280 }
1281 None => Err(Details::CompareFixedSizes {
1282 size: fixed_schema.size,
1283 n: value.len(),
1284 }
1285 .into()),
1286 }
1287 }
1288 },
1289 Schema::Ref { name } => {
1290 let ref_schema = self.get_ref_schema(name)?;
1291 self.serialize_bytes_with_schema(value, ref_schema)
1292 }
1293 Schema::Union(union_schema) => {
1294 for (i, variant_schema) in union_schema.schemas.iter().enumerate() {
1295 match variant_schema {
1296 Schema::String
1297 | Schema::Bytes
1298 | Schema::BigDecimal
1299 | Schema::Decimal(DecimalSchema {
1300 inner: InnerDecimalSchema::Bytes,
1301 ..
1302 })
1303 | Schema::Ref { name: _ } => {
1304 encode_int(i as i32, &mut *self.writer)?;
1305 return self.serialize_bytes_with_schema(value, variant_schema);
1306 }
1307 Schema::Uuid(UuidSchema::Bytes) if value.len() == 16 => {
1308 encode_int(i as i32, &mut *self.writer)?;
1309 return self.serialize_bytes_with_schema(value, variant_schema);
1310 }
1311 Schema::Fixed(fixed) | Schema::Uuid(UuidSchema::Fixed(fixed))
1312 if fixed.size == value.len() =>
1313 {
1314 encode_int(i as i32, &mut *self.writer)?;
1315 return self.serialize_bytes_with_schema(value, variant_schema);
1316 }
1317 Schema::Decimal(DecimalSchema {
1318 inner: InnerDecimalSchema::Fixed(fixed),
1319 ..
1320 }) if fixed.size >= value.len() => {
1321 encode_int(i as i32, &mut *self.writer)?;
1322 return self.serialize_bytes_with_schema(value, variant_schema);
1323 }
1324 Schema::Duration(_) if value.len() == 12 => {
1325 encode_int(i as i32, &mut *self.writer)?;
1326 return self.serialize_bytes_with_schema(value, variant_schema);
1327 }
1328 _ => { }
1329 }
1330 }
1331 Err(create_error(format!(
1332 "Cannot find a matching String, Bytes, Uuid, BigDecimal, Fixed, Duration, Decimal or Ref schema in {union_schema:?}"
1333 )))
1334 }
1335 unsupported => Err(create_error(format!(
1336 "Expected String, Bytes, Uuid, BigDecimal, Fixed, Duration, Decimal, Ref or Union schema. Got: {unsupported}"
1337 ))),
1338 }
1339 }
1340
1341 fn serialize_none_with_schema(&mut self, schema: &Schema) -> Result<usize, Error> {
1342 let create_error = |cause: String| {
1343 Error::new(Details::SerializeValueWithSchema {
1344 value_type: "none",
1345 value: format!("None. Cause: {cause}"),
1346 schema: schema.clone(),
1347 })
1348 };
1349
1350 match schema {
1351 Schema::Null => Ok(0),
1352 Schema::Union(union_schema) => {
1353 for (i, variant_schema) in union_schema.schemas.iter().enumerate() {
1354 match variant_schema {
1355 Schema::Null => {
1356 return encode_int(i as i32, &mut *self.writer);
1357 }
1358 _ => { }
1359 }
1360 }
1361 Err(create_error(format!(
1362 "Cannot find a matching Null schema in {:?}",
1363 union_schema.schemas
1364 )))
1365 }
1366 expected => Err(create_error(format!("Expected: {expected}. Got: Null"))),
1367 }
1368 }
1369
1370 fn serialize_some_with_schema<T>(&mut self, value: &T, schema: &Schema) -> Result<usize, Error>
1371 where
1372 T: ?Sized + ser::Serialize,
1373 {
1374 let mut inner_ser = SchemaAwareWriteSerializer::new(
1375 &mut *self.writer,
1376 schema,
1377 self.names,
1378 self.enclosing_namespace.clone(),
1379 );
1380 value.serialize(&mut inner_ser)
1381 }
1382
1383 fn serialize_unit_struct_with_schema(
1384 &mut self,
1385 name: &'static str,
1386 schema: &Schema,
1387 ) -> Result<usize, Error> {
1388 let create_error = |cause: String| {
1389 Error::new(Details::SerializeValueWithSchema {
1390 value_type: "unit struct",
1391 value: format!("{name}. Cause: {cause}"),
1392 schema: schema.clone(),
1393 })
1394 };
1395
1396 match schema {
1397 Schema::Record(sch) => match sch.fields.len() {
1398 0 => Ok(0),
1399 too_many => Err(create_error(format!(
1400 "Too many fields: {too_many}. Expected: 0"
1401 ))),
1402 },
1403 Schema::Null => Ok(0),
1404 Schema::Ref { name: ref_name } => {
1405 let ref_schema = self.get_ref_schema(ref_name)?;
1406 self.serialize_unit_struct_with_schema(name, ref_schema)
1407 }
1408 Schema::Union(union_schema) => {
1409 for (i, variant_schema) in union_schema.schemas.iter().enumerate() {
1410 match variant_schema {
1411 Schema::Record(record_schema) if record_schema.fields.is_empty() => {
1412 encode_int(i as i32, &mut *self.writer)?;
1413 return self.serialize_unit_struct_with_schema(name, variant_schema);
1414 }
1415 Schema::Null | Schema::Ref { name: _ } => {
1416 encode_int(i as i32, &mut *self.writer)?;
1417 return self.serialize_unit_struct_with_schema(name, variant_schema);
1418 }
1419 _ => { }
1420 }
1421 }
1422 Err(create_error(format!(
1423 "Cannot find a matching Null schema in {union_schema:?}"
1424 )))
1425 }
1426 unsupported => Err(create_error(format!(
1427 "Expected Null or Union schema. Got: {unsupported}"
1428 ))),
1429 }
1430 }
1431
1432 fn serialize_unit_variant_with_schema(
1433 &mut self,
1434 name: &'static str,
1435 variant_index: u32,
1436 variant: &'static str,
1437 schema: &Schema,
1438 ) -> Result<usize, Error> {
1439 let create_error = |cause: String| {
1440 Error::new(Details::SerializeValueWithSchema {
1441 value_type: "unit variant",
1442 value: format!("{name}::{variant} (index={variant_index}). Cause: {cause}"),
1443 schema: schema.clone(),
1444 })
1445 };
1446
1447 match schema {
1448 Schema::Enum(enum_schema) => {
1449 if variant_index as usize >= enum_schema.symbols.len() {
1450 return Err(create_error(format!(
1451 "Variant index out of bounds: {}. The Enum schema has '{}' symbols",
1452 variant_index,
1453 enum_schema.symbols.len()
1454 )));
1455 }
1456
1457 encode_int(variant_index as i32, &mut self.writer)
1458 }
1459 Schema::Union(union_schema) => {
1460 if variant_index as usize >= union_schema.schemas.len() {
1461 return Err(create_error(format!(
1462 "Variant index out of bounds: {}. The union schema has '{}' schemas",
1463 variant_index,
1464 union_schema.schemas.len()
1465 )));
1466 }
1467
1468 encode_int(variant_index as i32, &mut self.writer)?;
1469 self.serialize_unit_struct_with_schema(
1470 name,
1471 &union_schema.schemas[variant_index as usize],
1472 )
1473 }
1474 Schema::Ref { name: ref_name } => {
1475 let ref_schema = self.get_ref_schema(ref_name)?;
1476 self.serialize_unit_variant_with_schema(name, variant_index, variant, ref_schema)
1477 }
1478 unsupported => Err(create_error(format!(
1479 "Unsupported schema: {unsupported:?}. Expected: Enum, Union or Ref"
1480 ))),
1481 }
1482 }
1483
1484 fn serialize_newtype_struct_with_schema<T>(
1485 &mut self,
1486 _name: &'static str,
1487 value: &T,
1488 schema: &Schema,
1489 ) -> Result<usize, Error>
1490 where
1491 T: ?Sized + ser::Serialize,
1492 {
1493 let mut inner_ser = SchemaAwareWriteSerializer::new(
1494 &mut *self.writer,
1495 schema,
1496 self.names,
1497 self.enclosing_namespace.clone(),
1498 );
1499 value.serialize(&mut inner_ser)
1501 }
1502
1503 fn serialize_newtype_variant_with_schema<T>(
1504 &mut self,
1505 name: &'static str,
1506 variant_index: u32,
1507 variant: &'static str,
1508 value: &T,
1509 schema: &Schema,
1510 ) -> Result<usize, Error>
1511 where
1512 T: ?Sized + ser::Serialize,
1513 {
1514 let create_error = |cause: String| {
1515 Error::new(Details::SerializeValueWithSchema {
1516 value_type: "newtype variant",
1517 value: format!("{name}::{variant}(?) (index={variant_index}). Cause: {cause}"),
1518 schema: schema.clone(),
1519 })
1520 };
1521
1522 match schema {
1523 Schema::Union(union_schema) => {
1524 let variant_schema = union_schema
1525 .schemas
1526 .get(variant_index as usize)
1527 .ok_or_else(|| {
1528 create_error(format!(
1529 "No variant schema at position {variant_index} for {union_schema:?}"
1530 ))
1531 })?;
1532
1533 encode_int(variant_index as i32, &mut self.writer)?;
1534 self.serialize_newtype_struct_with_schema(variant, value, variant_schema)
1535 }
1536 _ => Err(create_error(format!(
1537 "Expected Union schema. Got: {schema}"
1538 ))),
1539 }
1540 }
1541
1542 fn serialize_seq_with_schema<'a>(
1543 &'a mut self,
1544 len: Option<usize>,
1545 schema: &'s Schema,
1546 ) -> Result<SchemaAwareWriteSerializeSeq<'a, 's, W>, Error> {
1547 let create_error = |cause: String| {
1548 let len_str = len
1549 .map(|l| format!("{l}"))
1550 .unwrap_or_else(|| String::from("?"));
1551
1552 Error::new(Details::SerializeValueWithSchema {
1553 value_type: "sequence",
1554 value: format!("sequence (len={len_str}). Cause: {cause}"),
1555 schema: schema.clone(),
1556 })
1557 };
1558
1559 match schema {
1560 Schema::Array(array_schema) => Ok(SchemaAwareWriteSerializeSeq::new(
1561 self,
1562 array_schema.items.as_ref(),
1563 len,
1564 )),
1565 Schema::Union(union_schema) => {
1566 for (i, variant_schema) in union_schema.schemas.iter().enumerate() {
1567 match variant_schema {
1568 Schema::Array(_) => {
1569 encode_int(i as i32, &mut *self.writer)?;
1570 return self.serialize_seq_with_schema(len, variant_schema);
1571 }
1572 _ => { }
1573 }
1574 }
1575 Err(create_error(format!(
1576 "Expected Array schema in {union_schema:?}"
1577 )))
1578 }
1579 _ => Err(create_error(format!("Expected: {schema}. Got: Array"))),
1580 }
1581 }
1582
1583 fn serialize_tuple_with_schema<'a>(
1584 &'a mut self,
1585 len: usize,
1586 schema: &'s Schema,
1587 ) -> Result<SchemaAwareWriteSerializeSeq<'a, 's, W>, Error> {
1588 let create_error = |cause: String| {
1589 Error::new(Details::SerializeValueWithSchema {
1590 value_type: "tuple",
1591 value: format!("tuple (len={len}). Cause: {cause}"),
1592 schema: schema.clone(),
1593 })
1594 };
1595
1596 match schema {
1597 Schema::Array(array_schema) => Ok(SchemaAwareWriteSerializeSeq::new(
1598 self,
1599 array_schema.items.as_ref(),
1600 Some(len),
1601 )),
1602 Schema::Union(union_schema) => {
1603 for (i, variant_schema) in union_schema.schemas.iter().enumerate() {
1604 match variant_schema {
1605 Schema::Array(_) => {
1606 encode_int(i as i32, &mut *self.writer)?;
1607 return self.serialize_tuple_with_schema(len, variant_schema);
1608 }
1609 _ => { }
1610 }
1611 }
1612 Err(create_error(format!(
1613 "Expected Array schema in {union_schema:?}"
1614 )))
1615 }
1616 _ => Err(create_error(format!("Expected: {schema}. Got: Array"))),
1617 }
1618 }
1619
1620 fn serialize_tuple_struct_with_schema<'a>(
1621 &'a mut self,
1622 name: &'static str,
1623 len: usize,
1624 schema: &'s Schema,
1625 ) -> Result<SchemaAwareWriteSerializeTupleStruct<'a, 's, W>, Error> {
1626 let create_error = |cause: String| {
1627 Error::new(Details::SerializeValueWithSchema {
1628 value_type: "tuple struct",
1629 value: format!(
1630 "{name}({}). Cause: {cause}",
1631 vec!["?"; len].as_slice().join(",")
1632 ),
1633 schema: schema.clone(),
1634 })
1635 };
1636
1637 match schema {
1638 Schema::Array(sch) => Ok(SchemaAwareWriteSerializeTupleStruct::Array(
1639 SchemaAwareWriteSerializeSeq::new(self, &sch.items, Some(len)),
1640 )),
1641 Schema::Record(sch) => Ok(SchemaAwareWriteSerializeTupleStruct::Record(
1642 SchemaAwareWriteSerializeStruct::new(self, sch),
1643 )),
1644 Schema::Ref { name: ref_name } => {
1645 let ref_schema = self.get_ref_schema(ref_name)?;
1646 self.serialize_tuple_struct_with_schema(name, len, ref_schema)
1647 }
1648 Schema::Union(union_schema) => {
1649 for (i, variant_schema) in union_schema.schemas.iter().enumerate() {
1650 match variant_schema {
1651 Schema::Record(inner) => {
1652 if inner.fields.len() == len {
1653 encode_int(i as i32, &mut *self.writer)?;
1654 return self.serialize_tuple_struct_with_schema(
1655 name,
1656 len,
1657 variant_schema,
1658 );
1659 }
1660 }
1661 Schema::Array(_) | Schema::Ref { name: _ } => {
1662 encode_int(i as i32, &mut *self.writer)?;
1663 return self.serialize_tuple_struct_with_schema(
1664 name,
1665 len,
1666 variant_schema,
1667 );
1668 }
1669 _ => { }
1670 }
1671 }
1672 Err(create_error(format!(
1673 "Expected Record, Array or Ref schema in {union_schema:?}"
1674 )))
1675 }
1676 _ => Err(create_error(format!(
1677 "Expected Record, Array, Ref or Union schema. Got: {schema}"
1678 ))),
1679 }
1680 }
1681
1682 fn serialize_tuple_variant_with_schema<'a>(
1683 &'a mut self,
1684 name: &'static str,
1685 variant_index: u32,
1686 variant: &'static str,
1687 len: usize,
1688 schema: &'s Schema,
1689 ) -> Result<SchemaAwareWriteSerializeTupleStruct<'a, 's, W>, Error> {
1690 let create_error = |cause: String| {
1691 Error::new(Details::SerializeValueWithSchema {
1692 value_type: "tuple variant",
1693 value: format!(
1694 "{name}::{variant}({}) (index={variant_index}). Cause: {cause}",
1695 vec!["?"; len].as_slice().join(",")
1696 ),
1697 schema: schema.clone(),
1698 })
1699 };
1700
1701 match schema {
1702 Schema::Union(union_schema) => {
1703 let variant_schema = union_schema
1704 .schemas
1705 .get(variant_index as usize)
1706 .ok_or_else(|| {
1707 create_error(format!(
1708 "Cannot find a variant at position {variant_index} in {union_schema:?}"
1709 ))
1710 })?;
1711
1712 encode_int(variant_index as i32, &mut self.writer)?;
1713 self.serialize_tuple_struct_with_schema(variant, len, variant_schema)
1714 }
1715 _ => Err(create_error(format!(
1716 "Expected Union schema. Got: {schema}"
1717 ))),
1718 }
1719 }
1720
1721 fn serialize_map_with_schema<'a>(
1722 &'a mut self,
1723 len: Option<usize>,
1724 schema: &'s Schema,
1725 ) -> Result<SchemaAwareWriteSerializeMapOrStruct<'a, 's, W>, Error> {
1726 let create_error = |cause: String| {
1727 let len_str = len
1728 .map(|l| format!("{l}"))
1729 .unwrap_or_else(|| String::from("?"));
1730
1731 Error::new(Details::SerializeValueWithSchema {
1732 value_type: "map",
1733 value: format!("map (size={len_str}). Cause: {cause}"),
1734 schema: schema.clone(),
1735 })
1736 };
1737
1738 match schema {
1739 Schema::Map(map_schema) => Ok(SchemaAwareWriteSerializeMapOrStruct::Map(
1740 SchemaAwareWriteSerializeMap::new(self, map_schema.types.as_ref(), len),
1741 )),
1742 Schema::Ref { name: ref_name } => {
1743 let ref_schema = self.get_ref_schema(ref_name)?;
1744 self.serialize_map_with_schema(len, ref_schema)
1745 }
1746 Schema::Union(union_schema) => {
1747 for (i, variant_schema) in union_schema.schemas.iter().enumerate() {
1748 match variant_schema {
1749 Schema::Map(_) | Schema::Record(_) | Schema::Ref { .. } => {
1750 encode_int(i as i32, &mut *self.writer)?;
1751 return self.serialize_map_with_schema(len, variant_schema);
1752 }
1753 _ => { }
1754 }
1755 }
1756 Err(create_error(format!(
1757 "Expected a Map schema in {union_schema:?}"
1758 )))
1759 }
1760 Schema::Record(record_schema) => Ok(SchemaAwareWriteSerializeMapOrStruct::Struct(
1761 SchemaAwareWriteSerializeStruct::new(self, record_schema),
1762 )),
1763 _ => Err(create_error(format!(
1764 "Expected Map or Union schema. Got: {schema}"
1765 ))),
1766 }
1767 }
1768
1769 fn serialize_struct_with_schema<'a>(
1770 &'a mut self,
1771 name: &'static str,
1772 len: usize,
1773 schema: &'s Schema,
1774 ) -> Result<SchemaAwareWriteSerializeStruct<'a, 's, W>, Error> {
1775 let create_error = |cause: String| {
1776 Error::new(Details::SerializeValueWithSchema {
1777 value_type: "struct",
1778 value: format!("{name}{{ ... }}. Cause: {cause}"),
1779 schema: schema.clone(),
1780 })
1781 };
1782
1783 match schema {
1784 Schema::Record(record_schema) => {
1785 Ok(SchemaAwareWriteSerializeStruct::new(self, record_schema))
1786 }
1787 Schema::Ref { name: ref_name } => {
1788 let ref_schema = self.get_ref_schema(ref_name)?;
1789 self.serialize_struct_with_schema(name, len, ref_schema)
1790 }
1791 Schema::Union(union_schema) => {
1792 for (i, variant_schema) in union_schema.schemas.iter().enumerate() {
1793 match variant_schema {
1794 Schema::Record(inner)
1795 if inner.fields.len() == len && inner.name.name == name =>
1796 {
1797 encode_int(i as i32, &mut *self.writer)?;
1798 return self.serialize_struct_with_schema(name, len, variant_schema);
1799 }
1800 Schema::Ref { name: _ } => {
1801 encode_int(i as i32, &mut *self.writer)?;
1802 return self.serialize_struct_with_schema(name, len, variant_schema);
1803 }
1804 _ => { }
1805 }
1806 }
1807 Err(create_error(format!(
1808 "Expected Record or Ref schema in {union_schema:?}"
1809 )))
1810 }
1811 _ => Err(create_error(format!(
1812 "Expected Record, Ref or Union schema. Got: {schema}"
1813 ))),
1814 }
1815 }
1816
1817 fn serialize_struct_variant_with_schema<'a>(
1818 &'a mut self,
1819 name: &'static str,
1820 variant_index: u32,
1821 variant: &'static str,
1822 len: usize,
1823 schema: &'s Schema,
1824 ) -> Result<SchemaAwareWriteSerializeStruct<'a, 's, W>, Error> {
1825 let create_error = |cause: String| {
1826 Error::new(Details::SerializeValueWithSchema {
1827 value_type: "struct variant",
1828 value: format!("{name}::{variant}{{ ... }} (size={len}. Cause: {cause})"),
1829 schema: schema.clone(),
1830 })
1831 };
1832
1833 match schema {
1834 Schema::Union(union_schema) => {
1835 let variant_schema = union_schema
1836 .schemas
1837 .get(variant_index as usize)
1838 .ok_or_else(|| {
1839 create_error(format!(
1840 "Cannot find variant at position {variant_index} in {union_schema:?}"
1841 ))
1842 })?;
1843
1844 encode_int(variant_index as i32, &mut self.writer)?;
1845 self.serialize_struct_with_schema(variant, len, variant_schema)
1846 }
1847 _ => Err(create_error(format!(
1848 "Expected Union schema. Got: {schema}"
1849 ))),
1850 }
1851 }
1852}
1853
1854impl<'a, 's, W: Write> ser::Serializer for &'a mut SchemaAwareWriteSerializer<'s, W> {
1855 type Ok = usize;
1856 type Error = Error;
1857 type SerializeSeq = SchemaAwareWriteSerializeSeq<'a, 's, W>;
1858 type SerializeTuple = SchemaAwareWriteSerializeSeq<'a, 's, W>;
1859 type SerializeTupleStruct = SchemaAwareWriteSerializeTupleStruct<'a, 's, W>;
1860 type SerializeTupleVariant = SchemaAwareWriteSerializeTupleStruct<'a, 's, W>;
1861 type SerializeMap = SchemaAwareWriteSerializeMapOrStruct<'a, 's, W>;
1862 type SerializeStruct = SchemaAwareWriteSerializeStruct<'a, 's, W>;
1863 type SerializeStructVariant = SchemaAwareWriteSerializeStruct<'a, 's, W>;
1864
1865 fn serialize_bool(self, v: bool) -> Result<Self::Ok, Self::Error> {
1866 self.serialize_bool_with_schema(v, self.root_schema)
1867 }
1868
1869 fn serialize_i8(self, v: i8) -> Result<Self::Ok, Self::Error> {
1870 self.serialize_i32(v as i32)
1871 }
1872
1873 fn serialize_i16(self, v: i16) -> Result<Self::Ok, Self::Error> {
1874 self.serialize_i32(v as i32)
1875 }
1876
1877 fn serialize_i32(self, v: i32) -> Result<Self::Ok, Self::Error> {
1878 self.serialize_i32_with_schema(v, self.root_schema)
1879 }
1880
1881 fn serialize_i64(self, v: i64) -> Result<Self::Ok, Self::Error> {
1882 self.serialize_i64_with_schema(v, self.root_schema)
1883 }
1884
1885 fn serialize_i128(self, v: i128) -> Result<Self::Ok, Self::Error> {
1886 self.serialize_i128_with_schema(v, self.root_schema)
1887 }
1888
1889 fn serialize_u8(self, v: u8) -> Result<Self::Ok, Self::Error> {
1890 self.serialize_u8_with_schema(v, self.root_schema)
1891 }
1892
1893 fn serialize_u16(self, v: u16) -> Result<Self::Ok, Self::Error> {
1894 self.serialize_u32(v as u32)
1895 }
1896
1897 fn serialize_u32(self, v: u32) -> Result<Self::Ok, Self::Error> {
1898 self.serialize_u32_with_schema(v, self.root_schema)
1899 }
1900
1901 fn serialize_u64(self, v: u64) -> Result<Self::Ok, Self::Error> {
1902 self.serialize_u64_with_schema(v, self.root_schema)
1903 }
1904
1905 fn serialize_u128(self, v: u128) -> Result<Self::Ok, Self::Error> {
1906 self.serialize_u128_with_schema(v, self.root_schema)
1907 }
1908
1909 fn serialize_f32(self, v: f32) -> Result<Self::Ok, Self::Error> {
1910 self.serialize_f32_with_schema(v, self.root_schema)
1911 }
1912
1913 fn serialize_f64(self, v: f64) -> Result<Self::Ok, Self::Error> {
1914 self.serialize_f64_with_schema(v, self.root_schema)
1915 }
1916
1917 fn serialize_char(self, v: char) -> Result<Self::Ok, Self::Error> {
1918 self.serialize_char_with_schema(v, self.root_schema)
1919 }
1920
1921 fn serialize_str(self, v: &str) -> Result<Self::Ok, Self::Error> {
1922 self.serialize_str_with_schema(v, self.root_schema)
1923 }
1924
1925 fn serialize_bytes(self, v: &[u8]) -> Result<Self::Ok, Self::Error> {
1926 self.serialize_bytes_with_schema(v, self.root_schema)
1927 }
1928
1929 fn serialize_none(self) -> Result<Self::Ok, Self::Error> {
1930 self.serialize_none_with_schema(self.root_schema)
1931 }
1932
1933 fn serialize_some<T>(self, value: &T) -> Result<Self::Ok, Self::Error>
1934 where
1935 T: ?Sized + ser::Serialize,
1936 {
1937 self.serialize_some_with_schema(value, self.root_schema)
1938 }
1939
1940 fn serialize_unit(self) -> Result<Self::Ok, Self::Error> {
1941 self.serialize_none()
1942 }
1943
1944 fn serialize_unit_struct(self, name: &'static str) -> Result<Self::Ok, Self::Error> {
1945 self.serialize_unit_struct_with_schema(name, self.root_schema)
1946 }
1947
1948 fn serialize_unit_variant(
1949 self,
1950 name: &'static str,
1951 variant_index: u32,
1952 variant: &'static str,
1953 ) -> Result<Self::Ok, Self::Error> {
1954 self.serialize_unit_variant_with_schema(name, variant_index, variant, self.root_schema)
1955 }
1956
1957 fn serialize_newtype_struct<T>(
1958 self,
1959 name: &'static str,
1960 value: &T,
1961 ) -> Result<Self::Ok, Self::Error>
1962 where
1963 T: ?Sized + ser::Serialize,
1964 {
1965 self.serialize_newtype_struct_with_schema(name, value, self.root_schema)
1966 }
1967
1968 fn serialize_newtype_variant<T>(
1969 self,
1970 name: &'static str,
1971 variant_index: u32,
1972 variant: &'static str,
1973 value: &T,
1974 ) -> Result<Self::Ok, Self::Error>
1975 where
1976 T: ?Sized + ser::Serialize,
1977 {
1978 self.serialize_newtype_variant_with_schema(
1979 name,
1980 variant_index,
1981 variant,
1982 value,
1983 self.root_schema,
1984 )
1985 }
1986
1987 fn serialize_seq(self, len: Option<usize>) -> Result<Self::SerializeSeq, Self::Error> {
1988 self.serialize_seq_with_schema(len, self.root_schema)
1989 }
1990
1991 fn serialize_tuple(self, len: usize) -> Result<Self::SerializeTuple, Self::Error> {
1992 self.serialize_tuple_with_schema(len, self.root_schema)
1993 }
1994
1995 fn serialize_tuple_struct(
1996 self,
1997 name: &'static str,
1998 len: usize,
1999 ) -> Result<Self::SerializeTupleStruct, Self::Error> {
2000 self.serialize_tuple_struct_with_schema(name, len, self.root_schema)
2001 }
2002
2003 fn serialize_tuple_variant(
2004 self,
2005 name: &'static str,
2006 variant_index: u32,
2007 variant: &'static str,
2008 len: usize,
2009 ) -> Result<Self::SerializeTupleVariant, Self::Error> {
2010 self.serialize_tuple_variant_with_schema(
2011 name,
2012 variant_index,
2013 variant,
2014 len,
2015 self.root_schema,
2016 )
2017 }
2018
2019 fn serialize_map(self, len: Option<usize>) -> Result<Self::SerializeMap, Self::Error> {
2020 self.serialize_map_with_schema(len, self.root_schema)
2021 }
2022
2023 fn serialize_struct(
2024 self,
2025 name: &'static str,
2026 len: usize,
2027 ) -> Result<Self::SerializeStruct, Self::Error> {
2028 self.serialize_struct_with_schema(name, len, self.root_schema)
2029 }
2030
2031 fn serialize_struct_variant(
2032 self,
2033 name: &'static str,
2034 variant_index: u32,
2035 variant: &'static str,
2036 len: usize,
2037 ) -> Result<Self::SerializeStructVariant, Self::Error> {
2038 self.serialize_struct_variant_with_schema(
2039 name,
2040 variant_index,
2041 variant,
2042 len,
2043 self.root_schema,
2044 )
2045 }
2046
2047 fn is_human_readable(&self) -> bool {
2048 crate::util::is_human_readable()
2049 }
2050}
2051
2052#[cfg(test)]
2053mod tests {
2054 use super::*;
2055 use crate::schema::FixedSchema;
2056 use crate::{
2057 Days, Duration, Millis, Months, Reader, Writer, decimal::Decimal, error::Details,
2058 from_value, schema::ResolvedSchema,
2059 };
2060 use apache_avro_test_helper::TestResult;
2061 use bigdecimal::BigDecimal;
2062 use num_bigint::{BigInt, Sign};
2063 use serde::{Deserialize, Serialize};
2064 use serde_bytes::{ByteArray, Bytes};
2065 use std::{
2066 collections::{BTreeMap, HashMap},
2067 marker::PhantomData,
2068 };
2069 use uuid::Uuid;
2070
2071 #[test]
2072 fn test_serialize_null() -> TestResult {
2073 let schema = Schema::Null;
2074 let mut buffer: Vec<u8> = Vec::new();
2075 let names = HashMap::new();
2076 let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None);
2077
2078 ().serialize(&mut serializer)?;
2079 None::<()>.serialize(&mut serializer)?;
2080 None::<i32>.serialize(&mut serializer)?;
2081 None::<String>.serialize(&mut serializer)?;
2082 assert!("".serialize(&mut serializer).is_err());
2083 assert!(Some("").serialize(&mut serializer).is_err());
2084
2085 assert_eq!(buffer.as_slice(), Vec::<u8>::new().as_slice());
2086
2087 Ok(())
2088 }
2089
2090 #[test]
2091 fn test_serialize_bool() -> TestResult {
2092 let schema = Schema::Boolean;
2093 let mut buffer: Vec<u8> = Vec::new();
2094 let names = HashMap::new();
2095 let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None);
2096
2097 true.serialize(&mut serializer)?;
2098 false.serialize(&mut serializer)?;
2099 assert!("".serialize(&mut serializer).is_err());
2100 assert!(Some("").serialize(&mut serializer).is_err());
2101
2102 assert_eq!(buffer.as_slice(), &[1, 0]);
2103
2104 Ok(())
2105 }
2106
2107 #[test]
2108 fn test_serialize_int() -> TestResult {
2109 let schema = Schema::Int;
2110 let mut buffer: Vec<u8> = Vec::new();
2111 let names = HashMap::new();
2112 let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None);
2113
2114 4u8.serialize(&mut serializer)?;
2115 31u16.serialize(&mut serializer)?;
2116 13u32.serialize(&mut serializer)?;
2117 7i8.serialize(&mut serializer)?;
2118 (-57i16).serialize(&mut serializer)?;
2119 129i32.serialize(&mut serializer)?;
2120 assert!("".serialize(&mut serializer).is_err());
2121 assert!(Some("").serialize(&mut serializer).is_err());
2122
2123 assert_eq!(buffer.as_slice(), &[8, 62, 26, 14, 113, 130, 2]);
2124
2125 Ok(())
2126 }
2127
2128 #[test]
2129 fn test_serialize_long() -> TestResult {
2130 let schema = Schema::Long;
2131 let mut buffer: Vec<u8> = Vec::new();
2132 let names = HashMap::new();
2133 let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None);
2134
2135 4u8.serialize(&mut serializer)?;
2136 31u16.serialize(&mut serializer)?;
2137 13u32.serialize(&mut serializer)?;
2138 291u64.serialize(&mut serializer)?;
2139 7i8.serialize(&mut serializer)?;
2140 (-57i16).serialize(&mut serializer)?;
2141 129i32.serialize(&mut serializer)?;
2142 (-432i64).serialize(&mut serializer)?;
2143 assert!("".serialize(&mut serializer).is_err());
2144 assert!(Some("").serialize(&mut serializer).is_err());
2145
2146 assert_eq!(
2147 buffer.as_slice(),
2148 &[8, 62, 26, 198, 4, 14, 113, 130, 2, 223, 6]
2149 );
2150
2151 Ok(())
2152 }
2153
2154 #[test]
2155 fn test_serialize_float() -> TestResult {
2156 let schema = Schema::Float;
2157 let mut buffer: Vec<u8> = Vec::new();
2158 let names = HashMap::new();
2159 let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None);
2160
2161 4.7f32.serialize(&mut serializer)?;
2162 (-14.1f64).serialize(&mut serializer)?;
2163 assert!("".serialize(&mut serializer).is_err());
2164 assert!(Some("").serialize(&mut serializer).is_err());
2165
2166 assert_eq!(buffer.as_slice(), &[102, 102, 150, 64, 154, 153, 97, 193]);
2167
2168 Ok(())
2169 }
2170
2171 #[test]
2172 fn test_serialize_double() -> TestResult {
2173 let schema = Schema::Float;
2174 let mut buffer: Vec<u8> = Vec::new();
2175 let names = HashMap::new();
2176 let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None);
2177
2178 4.7f32.serialize(&mut serializer)?;
2179 (-14.1f64).serialize(&mut serializer)?;
2180 assert!("".serialize(&mut serializer).is_err());
2181 assert!(Some("").serialize(&mut serializer).is_err());
2182
2183 assert_eq!(buffer.as_slice(), &[102, 102, 150, 64, 154, 153, 97, 193]);
2184
2185 Ok(())
2186 }
2187
2188 #[test]
2189 fn test_serialize_bytes() -> TestResult {
2190 let schema = Schema::Bytes;
2191 let mut buffer: Vec<u8> = Vec::new();
2192 let names = HashMap::new();
2193 let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None);
2194
2195 'a'.serialize(&mut serializer)?;
2196 "test".serialize(&mut serializer)?;
2197 Bytes::new(&[12, 3, 7, 91, 4]).serialize(&mut serializer)?;
2198 assert!(().serialize(&mut serializer).is_err());
2199 assert!(PhantomData::<String>.serialize(&mut serializer).is_err());
2200
2201 assert_eq!(
2202 buffer.as_slice(),
2203 &[2, b'a', 8, b't', b'e', b's', b't', 10, 12, 3, 7, 91, 4]
2204 );
2205
2206 Ok(())
2207 }
2208
2209 #[test]
2210 fn test_serialize_string() -> TestResult {
2211 let schema = Schema::String;
2212 let mut buffer: Vec<u8> = Vec::new();
2213 let names = HashMap::new();
2214 let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None);
2215
2216 'a'.serialize(&mut serializer)?;
2217 "test".serialize(&mut serializer)?;
2218 Bytes::new(&[12, 3, 7, 91, 4]).serialize(&mut serializer)?;
2219 assert!(().serialize(&mut serializer).is_err());
2220 assert!(PhantomData::<String>.serialize(&mut serializer).is_err());
2221
2222 assert_eq!(
2223 buffer.as_slice(),
2224 &[2, b'a', 8, b't', b'e', b's', b't', 10, 12, 3, 7, 91, 4]
2225 );
2226
2227 Ok(())
2228 }
2229
2230 #[test]
2231 fn test_serialize_record() -> TestResult {
2232 let schema = Schema::parse_str(
2233 r#"{
2234 "type": "record",
2235 "name": "TestRecord",
2236 "fields": [
2237 {"name": "stringField", "type": "string"},
2238 {"name": "intField", "type": "int"}
2239 ]
2240 }"#,
2241 )?;
2242
2243 #[derive(Serialize)]
2244 #[serde(rename_all = "camelCase")]
2245 struct GoodTestRecord {
2246 string_field: String,
2247 int_field: i32,
2248 }
2249
2250 #[derive(Serialize)]
2251 #[serde(rename_all = "camelCase")]
2252 struct BadTestRecord {
2253 foo_string_field: String,
2254 bar_int_field: i32,
2255 }
2256
2257 let mut buffer: Vec<u8> = Vec::new();
2258 let names = HashMap::new();
2259 let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None);
2260
2261 let good_record = GoodTestRecord {
2262 string_field: String::from("test"),
2263 int_field: 10,
2264 };
2265 good_record.serialize(&mut serializer)?;
2266
2267 let bad_record = BadTestRecord {
2268 foo_string_field: String::from("test"),
2269 bar_int_field: 10,
2270 };
2271 assert!(bad_record.serialize(&mut serializer).is_err());
2272
2273 assert!("".serialize(&mut serializer).is_err());
2274 assert!(Some("").serialize(&mut serializer).is_err());
2275
2276 assert_eq!(buffer.as_slice(), &[8, b't', b'e', b's', b't', 20]);
2277
2278 Ok(())
2279 }
2280
2281 #[test]
2282 fn test_serialize_empty_record() -> TestResult {
2283 let schema = Schema::parse_str(
2284 r#"{
2285 "type": "record",
2286 "name": "EmptyRecord",
2287 "fields": []
2288 }"#,
2289 )?;
2290
2291 let mut buffer: Vec<u8> = Vec::new();
2292 let names = HashMap::new();
2293 let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None);
2294
2295 #[derive(Serialize)]
2296 struct EmptyRecord;
2297 EmptyRecord.serialize(&mut serializer)?;
2298
2299 #[derive(Serialize)]
2300 struct NonEmptyRecord {
2301 foo: String,
2302 }
2303 let record = NonEmptyRecord {
2304 foo: "bar".to_string(),
2305 };
2306 match record
2307 .serialize(&mut serializer)
2308 .map_err(Error::into_details)
2309 {
2310 Err(Details::FieldName(field_name)) if field_name == "foo" => (),
2311 unexpected => panic!("Expected an error. Got: {unexpected:?}"),
2312 }
2313
2314 match ().serialize(&mut serializer).map_err(Error::into_details) {
2315 Err(Details::SerializeValueWithSchema {
2316 value_type,
2317 value,
2318 schema,
2319 }) => {
2320 assert_eq!(value_type, "none"); assert_eq!(value, "None. Cause: Expected: Record. Got: Null");
2322 assert_eq!(schema, schema);
2323 }
2324 unexpected => panic!("Expected an error. Got: {unexpected:?}"),
2325 }
2326
2327 assert_eq!(buffer.len(), 0);
2328
2329 Ok(())
2330 }
2331
2332 #[test]
2333 fn test_serialize_enum() -> TestResult {
2334 let schema = Schema::parse_str(
2335 r#"{
2336 "type": "enum",
2337 "name": "Suit",
2338 "symbols": ["SPADES", "HEARTS", "DIAMONDS", "CLUBS"]
2339 }"#,
2340 )?;
2341
2342 #[derive(Serialize)]
2343 enum Suit {
2344 Spades,
2345 Hearts,
2346 Diamonds,
2347 Clubs,
2348 }
2349
2350 let mut buffer: Vec<u8> = Vec::new();
2351 let names = HashMap::new();
2352 let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None);
2353
2354 Suit::Spades.serialize(&mut serializer)?;
2355 Suit::Hearts.serialize(&mut serializer)?;
2356 Suit::Diamonds.serialize(&mut serializer)?;
2357 Suit::Clubs.serialize(&mut serializer)?;
2358 match None::<()>
2359 .serialize(&mut serializer)
2360 .map_err(Error::into_details)
2361 {
2362 Err(Details::SerializeValueWithSchema {
2363 value_type,
2364 value,
2365 schema,
2366 }) => {
2367 assert_eq!(value_type, "none");
2368 assert_eq!(value, "None. Cause: Expected: Enum. Got: Null");
2369 assert_eq!(schema, schema);
2370 }
2371 unexpected => panic!("Expected an error. Got: {unexpected:?}"),
2372 }
2373
2374 assert_eq!(buffer.as_slice(), &[0, 2, 4, 6]);
2375
2376 Ok(())
2377 }
2378
2379 #[test]
2380 fn test_serialize_array() -> TestResult {
2381 let schema = Schema::parse_str(
2382 r#"{
2383 "type": "array",
2384 "items": "long"
2385 }"#,
2386 )?;
2387
2388 let mut buffer: Vec<u8> = Vec::new();
2389 let names = HashMap::new();
2390 let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None);
2391
2392 let arr: Vec<i64> = vec![10, 5, 400];
2393 arr.serialize(&mut serializer)?;
2394
2395 match vec![1_f32]
2396 .serialize(&mut serializer)
2397 .map_err(Error::into_details)
2398 {
2399 Err(Details::SerializeValueWithSchema {
2400 value_type,
2401 value,
2402 schema,
2403 }) => {
2404 assert_eq!(value_type, "f32");
2405 assert_eq!(value, "1. Cause: Expected: Long. Got: Float");
2406 assert_eq!(schema, schema);
2407 }
2408 unexpected => panic!("Expected an error. Got: {unexpected:?}"),
2409 }
2410
2411 assert_eq!(buffer.as_slice(), &[6, 20, 10, 160, 6, 0]);
2412
2413 Ok(())
2414 }
2415
2416 #[test]
2417 fn test_serialize_map() -> TestResult {
2418 let schema = Schema::parse_str(
2419 r#"{
2420 "type": "map",
2421 "values": "long"
2422 }"#,
2423 )?;
2424
2425 let mut buffer: Vec<u8> = Vec::new();
2426 let names = HashMap::new();
2427 let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None);
2428
2429 let mut map: BTreeMap<String, i64> = BTreeMap::new();
2430 map.insert(String::from("item1"), 10);
2431 map.insert(String::from("item2"), 400);
2432
2433 map.serialize(&mut serializer)?;
2434
2435 let mut map: BTreeMap<String, &str> = BTreeMap::new();
2436 map.insert(String::from("item1"), "value1");
2437 match map.serialize(&mut serializer).map_err(Error::into_details) {
2438 Err(Details::SerializeValueWithSchema {
2439 value_type,
2440 value,
2441 schema,
2442 }) => {
2443 assert_eq!(value_type, "string");
2444 assert_eq!(value, "value1. Cause: Expected: Long. Got: String");
2445 assert_eq!(schema, schema);
2446 }
2447 unexpected => panic!("Expected an error. Got: {unexpected:?}"),
2448 }
2449
2450 assert_eq!(
2451 buffer.as_slice(),
2452 &[
2453 4, 10, b'i', b't', b'e', b'm', b'1', 20, 10, b'i', b't', b'e', b'm', b'2', 160, 6,
2454 0
2455 ]
2456 );
2457
2458 Ok(())
2459 }
2460
2461 #[test]
2462 fn test_serialize_nullable_union() -> TestResult {
2463 let schema = Schema::parse_str(
2464 r#"{
2465 "type": ["null", "long"]
2466 }"#,
2467 )?;
2468
2469 #[derive(Serialize)]
2470 enum NullableLong {
2471 Null,
2472 Long(i64),
2473 }
2474
2475 let mut buffer: Vec<u8> = Vec::new();
2476 let names = HashMap::new();
2477 let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None);
2478
2479 Some(10i64).serialize(&mut serializer)?;
2480 None::<i64>.serialize(&mut serializer)?;
2481 NullableLong::Long(400).serialize(&mut serializer)?;
2482 NullableLong::Null.serialize(&mut serializer)?;
2483
2484 match "invalid"
2485 .serialize(&mut serializer)
2486 .map_err(Error::into_details)
2487 {
2488 Err(Details::SerializeValueWithSchema {
2489 value_type,
2490 value,
2491 schema,
2492 }) => {
2493 assert_eq!(value_type, "string");
2494 assert_eq!(
2495 value,
2496 "invalid. Cause: Expected one of the union variants [Null, Long]. Got: String"
2497 );
2498 assert_eq!(schema, schema);
2499 }
2500 unexpected => panic!("Expected an error. Got: {unexpected:?}"),
2501 }
2502
2503 assert_eq!(buffer.as_slice(), &[2, 20, 0, 2, 160, 6, 0]);
2504
2505 Ok(())
2506 }
2507
2508 #[test]
2509 fn test_serialize_union() -> TestResult {
2510 let schema = Schema::parse_str(
2511 r#"{
2512 "type": ["null", "long", "string"]
2513 }"#,
2514 )?;
2515
2516 #[derive(Serialize)]
2517 enum LongOrString {
2518 Null,
2519 Long(i64),
2520 Str(String),
2521 }
2522
2523 let mut buffer: Vec<u8> = Vec::new();
2524 let names = HashMap::new();
2525 let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None);
2526
2527 LongOrString::Null.serialize(&mut serializer)?;
2528 LongOrString::Long(400).serialize(&mut serializer)?;
2529 LongOrString::Str(String::from("test")).serialize(&mut serializer)?;
2530
2531 match 1_f64
2532 .serialize(&mut serializer)
2533 .map_err(Error::into_details)
2534 {
2535 Err(Details::SerializeValueWithSchema {
2536 value_type,
2537 value,
2538 schema,
2539 }) => {
2540 assert_eq!(value_type, "f64");
2541 assert_eq!(
2542 value,
2543 "1. Cause: Cannot find a Double schema in [Null, Long, String]"
2544 );
2545 assert_eq!(schema, schema);
2546 }
2547 unexpected => panic!("Expected an error. Got: {unexpected:?}"),
2548 }
2549
2550 assert_eq!(
2551 buffer.as_slice(),
2552 &[0, 2, 160, 6, 4, 8, b't', b'e', b's', b't']
2553 );
2554
2555 Ok(())
2556 }
2557
2558 #[test]
2559 fn test_serialize_fixed() -> TestResult {
2560 let schema = Schema::parse_str(
2561 r#"{
2562 "type": "fixed",
2563 "size": 8,
2564 "name": "LongVal"
2565 }"#,
2566 )?;
2567
2568 let mut buffer: Vec<u8> = Vec::new();
2569 let names = HashMap::new();
2570 let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None);
2571
2572 Bytes::new(&[10, 124, 31, 97, 14, 201, 3, 88]).serialize(&mut serializer)?;
2573
2574 match Bytes::new(&[123])
2576 .serialize(&mut serializer)
2577 .map_err(Error::into_details)
2578 {
2579 Err(Details::SerializeValueWithSchema {
2580 value_type,
2581 value,
2582 schema,
2583 }) => {
2584 assert_eq!(value_type, "bytes");
2585 assert_eq!(
2586 value,
2587 "7b. Cause: Fixed schema size (8) does not match the value length (1)"
2588 ); assert_eq!(schema, schema);
2590 }
2591 unexpected => panic!("Expected an error. Got: {unexpected:?}"),
2592 }
2593
2594 match [1; 8]
2596 .serialize(&mut serializer)
2597 .map_err(Error::into_details)
2598 {
2599 Err(Details::SerializeValueWithSchema {
2600 value_type,
2601 value,
2602 schema,
2603 }) => {
2604 assert_eq!(value_type, "tuple"); assert_eq!(value, "tuple (len=8). Cause: Expected: Fixed. Got: Array");
2606 assert_eq!(schema, schema);
2607 }
2608 unexpected => panic!("Expected an error. Got: {unexpected:?}"),
2609 }
2610
2611 match &[1, 2, 3, 4, 5, 6, 7, 8]
2613 .serialize(&mut serializer)
2614 .map_err(Error::into_details)
2615 {
2616 Err(Details::SerializeValueWithSchema {
2617 value_type,
2618 value,
2619 schema,
2620 }) => {
2621 assert_eq!(*value_type, "tuple"); assert_eq!(value, "tuple (len=8). Cause: Expected: Fixed. Got: Array");
2623 assert_eq!(schema, schema);
2624 }
2625 unexpected => panic!("Expected an error. Got: {unexpected:?}"),
2626 }
2627
2628 assert_eq!(buffer.as_slice(), &[10, 124, 31, 97, 14, 201, 3, 88]);
2629
2630 Ok(())
2631 }
2632
2633 #[test]
2634 fn test_serialize_decimal_bytes() -> TestResult {
2635 let schema = Schema::parse_str(
2636 r#"{
2637 "type": "bytes",
2638 "logicalType": "decimal",
2639 "precision": 16,
2640 "scale": 2
2641 }"#,
2642 )?;
2643
2644 let mut buffer: Vec<u8> = Vec::new();
2645 let names = HashMap::new();
2646 let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None);
2647
2648 let val = Decimal::from(&[251, 155]);
2649 val.serialize(&mut serializer)?;
2650
2651 match ().serialize(&mut serializer).map_err(Error::into_details) {
2652 Err(Details::SerializeValueWithSchema {
2653 value_type,
2654 value,
2655 schema,
2656 }) => {
2657 assert_eq!(value_type, "none");
2658 assert_eq!(value, "None. Cause: Expected: Decimal. Got: Null");
2659 assert_eq!(schema, schema);
2660 }
2661 unexpected => panic!("Expected an error. Got: {unexpected:?}"),
2662 }
2663
2664 assert_eq!(buffer.as_slice(), &[4, 251, 155]);
2665
2666 Ok(())
2667 }
2668
2669 #[test]
2670 fn test_serialize_decimal_fixed() -> TestResult {
2671 let schema = Schema::parse_str(
2672 r#"{
2673 "type": "fixed",
2674 "name": "FixedDecimal",
2675 "size": 8,
2676 "logicalType": "decimal",
2677 "precision": 16,
2678 "scale": 2
2679 }"#,
2680 )?;
2681
2682 let mut buffer: Vec<u8> = Vec::new();
2683 let names = HashMap::new();
2684 let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None);
2685
2686 let val = Decimal::from(&[0, 0, 0, 0, 0, 0, 251, 155]);
2687 val.serialize(&mut serializer)?;
2688
2689 match ().serialize(&mut serializer).map_err(Error::into_details) {
2690 Err(Details::SerializeValueWithSchema {
2691 value_type,
2692 value,
2693 schema,
2694 }) => {
2695 assert_eq!(value_type, "none");
2696 assert_eq!(value, "None. Cause: Expected: Decimal. Got: Null");
2697 assert_eq!(schema, schema);
2698 }
2699 unexpected => panic!("Expected an error. Got: {unexpected:?}"),
2700 }
2701
2702 assert_eq!(buffer.as_slice(), &[0, 0, 0, 0, 0, 0, 251, 155]);
2703
2704 Ok(())
2705 }
2706
2707 #[test]
2708 fn test_serialize_bigdecimal() -> TestResult {
2709 let schema = Schema::parse_str(
2710 r#"{
2711 "type": "bytes",
2712 "logicalType": "big-decimal"
2713 }"#,
2714 )?;
2715
2716 let mut buffer: Vec<u8> = Vec::new();
2717 let names = HashMap::new();
2718 let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None);
2719
2720 let val = BigDecimal::new(BigInt::new(Sign::Plus, vec![50024]), 2);
2721 val.serialize(&mut serializer)?;
2722
2723 assert_eq!(buffer.as_slice(), &[10, 6, 0, 195, 104, 4]);
2724
2725 Ok(())
2726 }
2727
2728 #[test]
2729 fn test_serialize_uuid() -> TestResult {
2730 let schema = Schema::parse_str(
2731 r#"{
2732 "type": "fixed",
2733 "size": 16,
2734 "logicalType": "uuid",
2735 "name": "FixedUuid"
2736 }"#,
2737 )?;
2738
2739 assert!(!crate::util::is_human_readable());
2740 let mut buffer: Vec<u8> = Vec::new();
2741 let names = HashMap::new();
2742 let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None);
2743
2744 "8c28da81-238c-4326-bddd-4e3d00cc5099"
2745 .parse::<Uuid>()?
2746 .serialize(&mut serializer)?;
2747
2748 match 1_u8.serialize(&mut serializer).map_err(Error::into_details) {
2749 Err(Details::SerializeValueWithSchema {
2750 value_type,
2751 value,
2752 schema,
2753 }) => {
2754 assert_eq!(value_type, "u8");
2755 assert_eq!(value, "1. Cause: Expected: Uuid. Got: Int");
2756 assert_eq!(schema, schema);
2757 }
2758 unexpected => panic!("Expected an error. Got: {unexpected:?}"),
2759 }
2760
2761 assert_eq!(
2762 buffer.as_slice(),
2763 &[
2764 140, 40, 218, 129, 35, 140, 67, 38, 189, 221, 78, 61, 0, 204, 80, 153
2765 ]
2766 );
2767
2768 Ok(())
2769 }
2770
2771 #[test]
2772 fn test_serialize_date() -> TestResult {
2773 let schema = Schema::parse_str(
2774 r#"{
2775 "type": "int",
2776 "logicalType": "date"
2777 }"#,
2778 )?;
2779
2780 let mut buffer: Vec<u8> = Vec::new();
2781 let names = HashMap::new();
2782 let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None);
2783
2784 100_u8.serialize(&mut serializer)?;
2785 1000_u16.serialize(&mut serializer)?;
2786 10000_u32.serialize(&mut serializer)?;
2787 1000_i16.serialize(&mut serializer)?;
2788 10000_i32.serialize(&mut serializer)?;
2789
2790 match 10000_f32
2791 .serialize(&mut serializer)
2792 .map_err(Error::into_details)
2793 {
2794 Err(Details::SerializeValueWithSchema {
2795 value_type,
2796 value,
2797 schema,
2798 }) => {
2799 assert_eq!(value_type, "f32");
2800 assert_eq!(value, "10000. Cause: Expected: Date. Got: Float");
2801 assert_eq!(schema, schema);
2802 }
2803 unexpected => panic!("Expected an error. Got: {unexpected:?}"),
2804 }
2805
2806 assert_eq!(
2807 buffer.as_slice(),
2808 &[200, 1, 208, 15, 160, 156, 1, 208, 15, 160, 156, 1]
2809 );
2810
2811 Ok(())
2812 }
2813
2814 #[test]
2815 fn test_serialize_time_millis() -> TestResult {
2816 let schema = Schema::parse_str(
2817 r#"{
2818 "type": "int",
2819 "logicalType": "time-millis"
2820 }"#,
2821 )?;
2822
2823 let mut buffer: Vec<u8> = Vec::new();
2824 let names = HashMap::new();
2825 let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None);
2826
2827 100_u8.serialize(&mut serializer)?;
2828 1000_u16.serialize(&mut serializer)?;
2829 10000_u32.serialize(&mut serializer)?;
2830 1000_i16.serialize(&mut serializer)?;
2831 10000_i32.serialize(&mut serializer)?;
2832
2833 match 10000_f32
2834 .serialize(&mut serializer)
2835 .map_err(Error::into_details)
2836 {
2837 Err(Details::SerializeValueWithSchema {
2838 value_type,
2839 value,
2840 schema,
2841 }) => {
2842 assert_eq!(value_type, "f32");
2843 assert_eq!(value, "10000. Cause: Expected: TimeMillis. Got: Float");
2844 assert_eq!(schema, schema);
2845 }
2846 unexpected => panic!("Expected an error. Got: {unexpected:?}"),
2847 }
2848
2849 assert_eq!(
2850 buffer.as_slice(),
2851 &[200, 1, 208, 15, 160, 156, 1, 208, 15, 160, 156, 1]
2852 );
2853
2854 Ok(())
2855 }
2856
2857 #[test]
2858 fn test_serialize_time_micros() -> TestResult {
2859 let schema = Schema::parse_str(
2860 r#"{
2861 "type": "long",
2862 "logicalType": "time-micros"
2863 }"#,
2864 )?;
2865
2866 let mut buffer: Vec<u8> = Vec::new();
2867 let names = HashMap::new();
2868 let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None);
2869
2870 100_u8.serialize(&mut serializer)?;
2871 1000_u16.serialize(&mut serializer)?;
2872 10000_u32.serialize(&mut serializer)?;
2873 1000_i16.serialize(&mut serializer)?;
2874 10000_i32.serialize(&mut serializer)?;
2875 10000_i64.serialize(&mut serializer)?;
2876
2877 match 10000_f32
2878 .serialize(&mut serializer)
2879 .map_err(Error::into_details)
2880 {
2881 Err(Details::SerializeValueWithSchema {
2882 value_type,
2883 value,
2884 schema,
2885 }) => {
2886 assert_eq!(value_type, "f32");
2887 assert_eq!(value, "10000. Cause: Expected: TimeMicros. Got: Float");
2888 assert_eq!(schema, schema);
2889 }
2890 unexpected => panic!("Expected an error. Got: {unexpected:?}"),
2891 }
2892
2893 assert_eq!(
2894 buffer.as_slice(),
2895 &[
2896 200, 1, 208, 15, 160, 156, 1, 208, 15, 160, 156, 1, 160, 156, 1
2897 ]
2898 );
2899
2900 Ok(())
2901 }
2902
2903 #[test]
2904 fn test_serialize_timestamp() -> TestResult {
2905 for precision in ["millis", "micros", "nanos"] {
2906 let schema = Schema::parse_str(&format!(
2907 r#"{{
2908 "type": "long",
2909 "logicalType": "timestamp-{precision}"
2910 }}"#
2911 ))?;
2912
2913 let mut buffer: Vec<u8> = Vec::new();
2914 let names = HashMap::new();
2915 let mut serializer =
2916 SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None);
2917
2918 100_u8.serialize(&mut serializer)?;
2919 1000_u16.serialize(&mut serializer)?;
2920 10000_u32.serialize(&mut serializer)?;
2921 1000_i16.serialize(&mut serializer)?;
2922 10000_i32.serialize(&mut serializer)?;
2923 10000_i64.serialize(&mut serializer)?;
2924
2925 match 10000_f64
2926 .serialize(&mut serializer)
2927 .map_err(Error::into_details)
2928 {
2929 Err(Details::SerializeValueWithSchema {
2930 value_type,
2931 value,
2932 schema,
2933 }) => {
2934 let mut capital_precision = precision.to_string();
2935 if let Some(c) = capital_precision.chars().next() {
2936 capital_precision.replace_range(..1, &c.to_uppercase().to_string());
2937 }
2938 assert_eq!(value_type, "f64");
2939 assert_eq!(
2940 value,
2941 format!(
2942 "10000. Cause: Expected: Timestamp{capital_precision}. Got: Double"
2943 )
2944 );
2945 assert_eq!(schema, schema);
2946 }
2947 unexpected => panic!("Expected an error. Got: {unexpected:?}"),
2948 }
2949
2950 assert_eq!(
2951 buffer.as_slice(),
2952 &[
2953 200, 1, 208, 15, 160, 156, 1, 208, 15, 160, 156, 1, 160, 156, 1
2954 ]
2955 );
2956 }
2957
2958 Ok(())
2959 }
2960
2961 #[test]
2962 fn test_serialize_duration() -> TestResult {
2963 let schema = Schema::parse_str(
2964 r#"{
2965 "type": "fixed",
2966 "size": 12,
2967 "name": "duration",
2968 "logicalType": "duration"
2969 }"#,
2970 )?;
2971
2972 let mut buffer: Vec<u8> = Vec::new();
2973 let names = HashMap::new();
2974 let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None);
2975
2976 let duration_bytes =
2977 ByteArray::new(Duration::new(Months::new(3), Days::new(2), Millis::new(1200)).into());
2978 duration_bytes.serialize(&mut serializer)?;
2979
2980 match [1; 12]
2981 .serialize(&mut serializer)
2982 .map_err(Error::into_details)
2983 {
2984 Err(Details::SerializeValueWithSchema {
2985 value_type,
2986 value,
2987 schema,
2988 }) => {
2989 assert_eq!(value_type, "tuple"); assert_eq!(
2991 value,
2992 "tuple (len=12). Cause: Expected: Duration. Got: Array"
2993 );
2994 assert_eq!(schema, schema);
2995 }
2996 unexpected => panic!("Expected an error. Got: {unexpected:?}"),
2997 }
2998
2999 assert_eq!(buffer.as_slice(), &[3, 0, 0, 0, 2, 0, 0, 0, 176, 4, 0, 0]);
3000
3001 Ok(())
3002 }
3003
3004 #[test]
3005 fn test_serialize_recursive_record() -> TestResult {
3006 let schema = Schema::parse_str(
3007 r#"{
3008 "type": "record",
3009 "name": "TestRecord",
3010 "fields": [
3011 {"name": "stringField", "type": "string"},
3012 {"name": "intField", "type": "int"},
3013 {"name": "bigDecimalField", "type": {"type": "bytes", "logicalType": "big-decimal"}},
3014 {"name": "uuidField", "type": "fixed", "size": 16, "logicalType": "uuid"},
3015 {"name": "innerRecord", "type": ["null", "TestRecord"]}
3016 ]
3017 }"#,
3018 )?;
3019
3020 #[derive(Serialize)]
3021 #[serde(rename_all = "camelCase")]
3022 struct TestRecord {
3023 string_field: String,
3024 int_field: i32,
3025 big_decimal_field: BigDecimal,
3026 uuid_field: Uuid,
3027 inner_record: Option<Box<TestRecord>>,
3029 }
3030
3031 assert!(!crate::util::is_human_readable());
3032 let mut buffer: Vec<u8> = Vec::new();
3033 let rs = ResolvedSchema::try_from(&schema)?;
3034 let mut serializer =
3035 SchemaAwareWriteSerializer::new(&mut buffer, &schema, rs.get_names(), None);
3036
3037 let good_record = TestRecord {
3038 string_field: String::from("test"),
3039 int_field: 10,
3040 big_decimal_field: BigDecimal::new(BigInt::new(Sign::Plus, vec![50024]), 2),
3041 uuid_field: "8c28da81-238c-4326-bddd-4e3d00cc5098".parse::<Uuid>()?,
3042 inner_record: Some(Box::new(TestRecord {
3043 string_field: String::from("inner_test"),
3044 int_field: 100,
3045 big_decimal_field: BigDecimal::new(BigInt::new(Sign::Plus, vec![20038]), 2),
3046 uuid_field: "8c28da81-238c-4326-bddd-4e3d00cc5099".parse::<Uuid>()?,
3047 inner_record: None,
3048 })),
3049 };
3050 good_record.serialize(&mut serializer)?;
3051
3052 assert_eq!(
3053 buffer.as_slice(),
3054 &[
3055 8, 116, 101, 115, 116, 20, 10, 6, 0, 195, 104, 4, 140, 40, 218, 129, 35, 140, 67,
3056 38, 189, 221, 78, 61, 0, 204, 80, 152, 2, 20, 105, 110, 110, 101, 114, 95, 116,
3057 101, 115, 116, 200, 1, 8, 4, 78, 70, 4, 140, 40, 218, 129, 35, 140, 67, 38, 189,
3058 221, 78, 61, 0, 204, 80, 153, 0
3059 ]
3060 );
3061
3062 Ok(())
3063 }
3064
3065 #[test]
3066 fn avro_rs_337_serialize_union_record_variant() -> TestResult {
3067 let schema = Schema::parse_str(
3068 r#"{
3069 "type": "record",
3070 "name": "TestRecord",
3071 "fields": [{
3072 "name": "innerUnion", "type": [
3073 {"type": "record", "name": "innerRecordFoo", "fields": [
3074 {"name": "foo", "type": "string"}
3075 ]},
3076 {"type": "record", "name": "innerRecordBar", "fields": [
3077 {"name": "bar", "type": "string"}
3078 ]},
3079 {"name": "intField", "type": "int"},
3080 {"name": "stringField", "type": "string"}
3081 ]
3082 }]
3083 }"#,
3084 )?;
3085
3086 #[derive(Serialize)]
3087 #[serde(rename_all = "camelCase")]
3088 struct TestRecord {
3089 inner_union: InnerUnion,
3090 }
3091
3092 #[derive(Serialize)]
3093 #[serde(untagged)]
3094 enum InnerUnion {
3095 InnerVariantFoo(InnerRecordFoo),
3096 InnerVariantBar(InnerRecordBar),
3097 IntField(i32),
3098 StringField(String),
3099 }
3100
3101 #[derive(Serialize)]
3102 #[serde(rename = "innerRecordFoo")]
3103 struct InnerRecordFoo {
3104 foo: String,
3105 }
3106
3107 #[derive(Serialize)]
3108 #[serde(rename = "innerRecordBar")]
3109 struct InnerRecordBar {
3110 bar: String,
3111 }
3112
3113 let mut buffer: Vec<u8> = Vec::new();
3114 let rs = ResolvedSchema::try_from(&schema)?;
3115 let mut serializer =
3116 SchemaAwareWriteSerializer::new(&mut buffer, &schema, rs.get_names(), None);
3117
3118 let foo_record = TestRecord {
3119 inner_union: InnerUnion::InnerVariantFoo(InnerRecordFoo {
3120 foo: String::from("foo"),
3121 }),
3122 };
3123 foo_record.serialize(&mut serializer)?;
3124 let bar_record = TestRecord {
3125 inner_union: InnerUnion::InnerVariantBar(InnerRecordBar {
3126 bar: String::from("bar"),
3127 }),
3128 };
3129 bar_record.serialize(&mut serializer)?;
3130 let int_record = TestRecord {
3131 inner_union: InnerUnion::IntField(1),
3132 };
3133 int_record.serialize(&mut serializer)?;
3134 let string_record = TestRecord {
3135 inner_union: InnerUnion::StringField(String::from("string")),
3136 };
3137 string_record.serialize(&mut serializer)?;
3138 Ok(())
3139 }
3140
3141 #[test]
3142 fn avro_rs_337_serialize_option_union_record_variant() -> TestResult {
3143 let schema = Schema::parse_str(
3144 r#"{
3145 "type": "record",
3146 "name": "TestRecord",
3147 "fields": [{
3148 "name": "innerUnion", "type": [
3149 "null",
3150 {"type": "record", "name": "innerRecordFoo", "fields": [
3151 {"name": "foo", "type": "string"}
3152 ]},
3153 {"type": "record", "name": "innerRecordBar", "fields": [
3154 {"name": "bar", "type": "string"}
3155 ]},
3156 {"name": "intField", "type": "int"},
3157 {"name": "stringField", "type": "string"}
3158 ]
3159 }]
3160 }"#,
3161 )?;
3162
3163 #[derive(Serialize)]
3164 #[serde(rename_all = "camelCase")]
3165 struct TestRecord {
3166 inner_union: Option<InnerUnion>,
3167 }
3168
3169 #[derive(Serialize)]
3170 #[serde(untagged)]
3171 enum InnerUnion {
3172 InnerVariantFoo(InnerRecordFoo),
3173 InnerVariantBar(InnerRecordBar),
3174 IntField(i32),
3175 StringField(String),
3176 }
3177
3178 #[derive(Serialize)]
3179 #[serde(rename = "innerRecordFoo")]
3180 struct InnerRecordFoo {
3181 foo: String,
3182 }
3183
3184 #[derive(Serialize)]
3185 #[serde(rename = "innerRecordBar")]
3186 struct InnerRecordBar {
3187 bar: String,
3188 }
3189
3190 let mut buffer: Vec<u8> = Vec::new();
3191 let rs = ResolvedSchema::try_from(&schema)?;
3192 let mut serializer =
3193 SchemaAwareWriteSerializer::new(&mut buffer, &schema, rs.get_names(), None);
3194
3195 let null_record = TestRecord { inner_union: None };
3196 null_record.serialize(&mut serializer)?;
3197 let foo_record = TestRecord {
3198 inner_union: Some(InnerUnion::InnerVariantFoo(InnerRecordFoo {
3199 foo: String::from("foo"),
3200 })),
3201 };
3202 foo_record.serialize(&mut serializer)?;
3203 let bar_record = TestRecord {
3204 inner_union: Some(InnerUnion::InnerVariantBar(InnerRecordBar {
3205 bar: String::from("bar"),
3206 })),
3207 };
3208 bar_record.serialize(&mut serializer)?;
3209 let int_record = TestRecord {
3210 inner_union: Some(InnerUnion::IntField(1)),
3211 };
3212 int_record.serialize(&mut serializer)?;
3213 let string_record = TestRecord {
3214 inner_union: Some(InnerUnion::StringField(String::from("string"))),
3215 };
3216 string_record.serialize(&mut serializer)?;
3217 Ok(())
3218 }
3219
3220 #[test]
3221 fn avro_rs_351_different_field_order_serde_vs_schema() -> TestResult {
3222 #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
3223 struct Foo {
3224 a: String,
3225 b: String,
3226 c: usize,
3227 d: f64,
3228 e: usize,
3229 }
3230 let schema = Schema::parse_str(
3231 r#"
3232 {
3233 "type":"record",
3234 "name":"Foo",
3235 "fields": [
3236 {
3237 "name":"b",
3238 "type":"string"
3239 },
3240 {
3241 "name":"a",
3242 "type":"string"
3243 },
3244 {
3245 "name":"d",
3246 "type":"double"
3247 },
3248 {
3249 "name":"e",
3250 "type":"long"
3251 },
3252 {
3253 "name":"c",
3254 "type":"long"
3255 }
3256 ]
3257 }
3258 "#,
3259 )?;
3260
3261 let mut writer = Writer::new(&schema, Vec::new())?;
3262 writer.append_ser(Foo {
3263 a: "Hello".into(),
3264 b: "World".into(),
3265 c: 42,
3266 d: std::f64::consts::PI,
3267 e: 5,
3268 })?;
3269 let encoded = writer.into_inner()?;
3270 let mut reader = Reader::with_schema(&schema, &encoded[..])?;
3271 let decoded = from_value::<Foo>(&reader.next().unwrap()?)?;
3272 assert_eq!(
3273 decoded,
3274 Foo {
3275 a: "Hello".into(),
3276 b: "World".into(),
3277 c: 42,
3278 d: std::f64::consts::PI,
3279 e: 5
3280 }
3281 );
3282 Ok(())
3283 }
3284
3285 #[test]
3286 fn avro_rs_414_serialize_char_as_string() -> TestResult {
3287 let schema = Schema::String;
3288
3289 let mut buffer: Vec<u8> = Vec::new();
3290 let names = HashMap::new();
3291 let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None);
3292
3293 'a'.serialize(&mut serializer)?;
3294
3295 assert_eq!(buffer.as_slice(), &[2, b'a']);
3296
3297 Ok(())
3298 }
3299
3300 #[test]
3301 fn avro_rs_414_serialize_char_as_bytes() -> TestResult {
3302 let schema = Schema::Bytes;
3303
3304 let mut buffer: Vec<u8> = Vec::new();
3305 let names = HashMap::new();
3306 let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None);
3307
3308 'a'.serialize(&mut serializer)?;
3309
3310 assert_eq!(buffer.as_slice(), &[2, b'a']);
3311
3312 Ok(())
3313 }
3314
3315 #[test]
3316 fn avro_rs_414_serialize_char_as_fixed() -> TestResult {
3317 let schema = Schema::Fixed(FixedSchema {
3318 name: Name::new("char")?,
3319 aliases: None,
3320 doc: None,
3321 size: 4,
3322 default: None,
3323 attributes: Default::default(),
3324 });
3325
3326 let mut buffer: Vec<u8> = Vec::new();
3327 let names = HashMap::new();
3328 let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None);
3329
3330 'a'.serialize(&mut serializer)?;
3331
3332 assert_eq!(buffer.as_slice(), &[b'a', 0, 0, 0]);
3333
3334 Ok(())
3335 }
3336
3337 #[test]
3338 fn avro_rs_414_serialize_emoji_char_as_string() -> TestResult {
3339 let schema = Schema::String;
3340
3341 let mut buffer: Vec<u8> = Vec::new();
3342 let names = HashMap::new();
3343 let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None);
3344
3345 '👹'.serialize(&mut serializer)?;
3346
3347 assert_eq!(buffer.as_slice(), &[8, 240, 159, 145, 185]);
3348
3349 Ok(())
3350 }
3351
3352 #[test]
3353 fn avro_rs_414_serialize_emoji_char_as_bytes() -> TestResult {
3354 let schema = Schema::Bytes;
3355
3356 let mut buffer: Vec<u8> = Vec::new();
3357 let names = HashMap::new();
3358 let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None);
3359
3360 '👹'.serialize(&mut serializer)?;
3361
3362 assert_eq!(buffer.as_slice(), &[8, 240, 159, 145, 185]);
3363
3364 Ok(())
3365 }
3366
3367 #[test]
3368 fn avro_rs_414_serialize_emoji_char_as_fixed() -> TestResult {
3369 let schema = Schema::Fixed(FixedSchema {
3370 name: Name::new("char")?,
3371 aliases: None,
3372 doc: None,
3373 size: 4,
3374 default: None,
3375 attributes: Default::default(),
3376 });
3377
3378 let mut buffer: Vec<u8> = Vec::new();
3379 let names = HashMap::new();
3380 let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None);
3381
3382 '👹'.serialize(&mut serializer)?;
3383
3384 assert_eq!(buffer.as_slice(), &[121, 244, 1, 0]);
3387
3388 Ok(())
3389 }
3390
3391 #[test]
3392 fn avro_rs_414_serialize_char_as_fixed_wrong_name() -> TestResult {
3393 let schema = Schema::Fixed(FixedSchema {
3394 name: Name::new("characters")?,
3395 aliases: None,
3396 doc: None,
3397 size: 4,
3398 default: None,
3399 attributes: Default::default(),
3400 });
3401
3402 let mut buffer: Vec<u8> = Vec::new();
3403 let names = HashMap::new();
3404 let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None);
3405
3406 assert!(matches!(
3407 'a'.serialize(&mut serializer).unwrap_err().details(),
3408 Details::SerializeValueWithSchema { .. }
3409 ));
3410
3411 Ok(())
3412 }
3413
3414 #[test]
3415 fn avro_rs_414_serialize_char_as_fixed_wrong_size() -> TestResult {
3416 let schema = Schema::Fixed(FixedSchema {
3417 name: Name::new("char")?,
3418 aliases: None,
3419 doc: None,
3420 size: 1,
3421 default: None,
3422 attributes: Default::default(),
3423 });
3424
3425 let mut buffer: Vec<u8> = Vec::new();
3426 let names = HashMap::new();
3427 let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None);
3428
3429 assert!(matches!(
3430 'a'.serialize(&mut serializer).unwrap_err().details(),
3431 Details::SerializeValueWithSchema { .. }
3432 ));
3433
3434 Ok(())
3435 }
3436
3437 #[test]
3438 fn avro_rs_414_serialize_i128_as_fixed() -> TestResult {
3439 let schema = Schema::Fixed(FixedSchema {
3440 name: Name::new("i128")?,
3441 aliases: None,
3442 doc: None,
3443 size: 16,
3444 default: None,
3445 attributes: Default::default(),
3446 });
3447
3448 let mut buffer: Vec<u8> = Vec::new();
3449 let names = HashMap::new();
3450 let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None);
3451
3452 let bytes_written = i128::MAX.serialize(&mut serializer)?;
3453 assert_eq!(bytes_written, 16);
3454
3455 assert_eq!(
3456 buffer.as_slice(),
3457 &[
3458 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
3459 0xFF, 0x7F
3460 ]
3461 );
3462
3463 Ok(())
3464 }
3465
3466 #[test]
3467 fn avro_rs_414_serialize_i128_as_fixed_wrong_name() -> TestResult {
3468 let schema = Schema::Fixed(FixedSchema {
3469 name: Name::new("onehundredtwentyeight")?,
3470 aliases: None,
3471 doc: None,
3472 size: 16,
3473 default: None,
3474 attributes: Default::default(),
3475 });
3476
3477 let mut buffer: Vec<u8> = Vec::new();
3478 let names = HashMap::new();
3479 let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None);
3480
3481 assert!(matches!(
3482 i128::MAX.serialize(&mut serializer).unwrap_err().details(),
3483 Details::SerializeValueWithSchema { .. }
3484 ));
3485
3486 Ok(())
3487 }
3488
3489 #[test]
3490 fn avro_rs_414_serialize_i128_as_fixed_wrong_size() -> TestResult {
3491 let schema = Schema::Fixed(FixedSchema {
3492 name: Name::new("i128")?,
3493 aliases: None,
3494 doc: None,
3495 size: 8,
3496 default: None,
3497 attributes: Default::default(),
3498 });
3499
3500 let mut buffer: Vec<u8> = Vec::new();
3501 let names = HashMap::new();
3502 let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None);
3503
3504 assert!(matches!(
3505 i128::MAX.serialize(&mut serializer).unwrap_err().details(),
3506 Details::SerializeValueWithSchema { .. }
3507 ));
3508
3509 Ok(())
3510 }
3511
3512 #[test]
3513 fn avro_rs_414_serialize_u128_as_fixed() -> TestResult {
3514 let schema = Schema::Fixed(FixedSchema {
3515 name: Name::new("u128")?,
3516 aliases: None,
3517 doc: None,
3518 size: 16,
3519 default: None,
3520 attributes: Default::default(),
3521 });
3522
3523 let mut buffer: Vec<u8> = Vec::new();
3524 let names = HashMap::new();
3525 let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None);
3526
3527 let bytes_written = u128::MAX.serialize(&mut serializer)?;
3528 assert_eq!(bytes_written, 16);
3529
3530 assert_eq!(
3531 buffer.as_slice(),
3532 &[
3533 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
3534 0xFF, 0xFF
3535 ]
3536 );
3537
3538 Ok(())
3539 }
3540
3541 #[test]
3542 fn avro_rs_414_serialize_u128_as_fixed_wrong_name() -> TestResult {
3543 let schema = Schema::Fixed(FixedSchema {
3544 name: Name::new("onehundredtwentyeight")?,
3545 aliases: None,
3546 doc: None,
3547 size: 16,
3548 default: None,
3549 attributes: Default::default(),
3550 });
3551
3552 let mut buffer: Vec<u8> = Vec::new();
3553 let names = HashMap::new();
3554 let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None);
3555
3556 assert!(matches!(
3557 u128::MAX.serialize(&mut serializer).unwrap_err().details(),
3558 Details::SerializeValueWithSchema { .. }
3559 ));
3560
3561 Ok(())
3562 }
3563
3564 #[test]
3565 fn avro_rs_414_serialize_u128_as_fixed_wrong_size() -> TestResult {
3566 let schema = Schema::Fixed(FixedSchema {
3567 name: Name::new("u128")?,
3568 aliases: None,
3569 doc: None,
3570 size: 8,
3571 default: None,
3572 attributes: Default::default(),
3573 });
3574
3575 let mut buffer: Vec<u8> = Vec::new();
3576 let names = HashMap::new();
3577 let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None);
3578
3579 assert!(matches!(
3580 u128::MAX.serialize(&mut serializer).unwrap_err().details(),
3581 Details::SerializeValueWithSchema { .. }
3582 ));
3583
3584 Ok(())
3585 }
3586
3587 #[test]
3588 fn avro_rs_421_serialize_bytes_union_of_fixed() -> TestResult {
3589 let schema = Schema::parse_str(
3590 r#"[
3591 { "name": "fixed4", "type": "fixed", "size": 4 },
3592 { "name": "fixed8", "type": "fixed", "size": 8 }
3593 ]"#,
3594 )
3595 .unwrap();
3596
3597 let mut buffer: Vec<u8> = Vec::new();
3598 let names = HashMap::new();
3599 let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None);
3600 let bytes_written = crate::serde::fixed::serialize(&[0, 1, 2, 3], &mut serializer)?;
3601 assert_eq!(bytes_written, 4);
3602 let bytes_written =
3603 crate::serde::fixed::serialize(&[4, 5, 6, 7, 8, 9, 10, 11], &mut serializer)?;
3604 assert_eq!(bytes_written, 8);
3605
3606 assert_eq!(buffer, &[0, 0, 1, 2, 3, 2, 4, 5, 6, 7, 8, 9, 10, 11][..]);
3607
3608 Ok(())
3609 }
3610}