apache_avro/
ser_schema.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Logic for serde-compatible schema-aware serialization
19//! which writes directly to a `Write` stream
20
21use crate::{
22    bigdecimal::big_decimal_as_bytes,
23    encode::{encode_int, encode_long},
24    error::{Details, Error},
25    schema::{Name, NamesRef, Namespace, RecordField, RecordSchema, Schema},
26};
27use bigdecimal::BigDecimal;
28use serde::{Serialize, ser};
29use std::{borrow::Cow, io::Write, str::FromStr};
30
31const COLLECTION_SERIALIZER_ITEM_LIMIT: usize = 1024;
32const COLLECTION_SERIALIZER_DEFAULT_INIT_ITEM_CAPACITY: usize = 32;
33const SINGLE_VALUE_INIT_BUFFER_SIZE: usize = 128;
34
35/// The sequence serializer for [`SchemaAwareWriteSerializer`].
36/// [`SchemaAwareWriteSerializeSeq`] may break large arrays up into multiple blocks to avoid having
37/// to obtain the length of the entire array before being able to write any data to the underlying
38/// [`std::fmt::Write`] stream.  (See the
39/// [Data Serialization and Deserialization](https://avro.apache.org/docs/1.12.0/specification/#data-serialization-and-deserialization)
40/// section of the Avro spec for more info.)
41pub struct SchemaAwareWriteSerializeSeq<'a, 's, W: Write> {
42    ser: &'a mut SchemaAwareWriteSerializer<'s, W>,
43    item_schema: &'s Schema,
44    item_buffer_size: usize,
45    item_buffers: Vec<Vec<u8>>,
46    bytes_written: usize,
47}
48
49impl<'a, 's, W: Write> SchemaAwareWriteSerializeSeq<'a, 's, W> {
50    fn new(
51        ser: &'a mut SchemaAwareWriteSerializer<'s, W>,
52        item_schema: &'s Schema,
53        len: Option<usize>,
54    ) -> SchemaAwareWriteSerializeSeq<'a, 's, W> {
55        SchemaAwareWriteSerializeSeq {
56            ser,
57            item_schema,
58            item_buffer_size: SINGLE_VALUE_INIT_BUFFER_SIZE,
59            item_buffers: Vec::with_capacity(
60                len.unwrap_or(COLLECTION_SERIALIZER_DEFAULT_INIT_ITEM_CAPACITY),
61            ),
62            bytes_written: 0,
63        }
64    }
65
66    fn write_buffered_items(&mut self) -> Result<(), Error> {
67        if !self.item_buffers.is_empty() {
68            self.bytes_written +=
69                encode_long(self.item_buffers.len() as i64, &mut self.ser.writer)?;
70            for item in self.item_buffers.drain(..) {
71                self.bytes_written += self
72                    .ser
73                    .writer
74                    .write(item.as_slice())
75                    .map_err(Details::WriteBytes)?;
76            }
77        }
78
79        Ok(())
80    }
81
82    fn serialize_element<T: ser::Serialize>(&mut self, value: &T) -> Result<(), Error> {
83        let mut item_buffer: Vec<u8> = Vec::with_capacity(self.item_buffer_size);
84        let mut item_ser = SchemaAwareWriteSerializer::new(
85            &mut item_buffer,
86            self.item_schema,
87            self.ser.names,
88            self.ser.enclosing_namespace.clone(),
89        );
90        value.serialize(&mut item_ser)?;
91
92        self.item_buffer_size = std::cmp::max(self.item_buffer_size, item_buffer.len() + 16);
93
94        self.item_buffers.push(item_buffer);
95
96        if self.item_buffers.len() > COLLECTION_SERIALIZER_ITEM_LIMIT {
97            self.write_buffered_items()?;
98        }
99
100        Ok(())
101    }
102
103    fn end(mut self) -> Result<usize, Error> {
104        self.write_buffered_items()?;
105        self.bytes_written += self.ser.writer.write(&[0u8]).map_err(Details::WriteBytes)?;
106
107        Ok(self.bytes_written)
108    }
109}
110
111impl<W: Write> ser::SerializeSeq for SchemaAwareWriteSerializeSeq<'_, '_, W> {
112    type Ok = usize;
113    type Error = Error;
114
115    fn serialize_element<T>(&mut self, value: &T) -> Result<(), Self::Error>
116    where
117        T: ?Sized + ser::Serialize,
118    {
119        self.serialize_element(&value)
120    }
121
122    fn end(self) -> Result<Self::Ok, Self::Error> {
123        self.end()
124    }
125}
126
127impl<W: Write> ser::SerializeTuple for SchemaAwareWriteSerializeSeq<'_, '_, W> {
128    type Ok = usize;
129    type Error = Error;
130
131    fn serialize_element<T>(&mut self, value: &T) -> Result<(), Self::Error>
132    where
133        T: ?Sized + ser::Serialize,
134    {
135        ser::SerializeSeq::serialize_element(self, value)
136    }
137
138    fn end(self) -> Result<Self::Ok, Self::Error> {
139        ser::SerializeSeq::end(self)
140    }
141}
142
143/// The map serializer for [`SchemaAwareWriteSerializer`].
144/// [`SchemaAwareWriteSerializeMap`] may break large maps up into multiple blocks to avoid having to
145/// obtain the size of the entire map before being able to write any data to the underlying
146/// [`std::fmt::Write`] stream.  (See the
147/// [Data Serialization and Deserialization](https://avro.apache.org/docs/1.12.0/specification/#data-serialization-and-deserialization)
148/// section of the Avro spec for more info.)
149pub struct SchemaAwareWriteSerializeMap<'a, 's, W: Write> {
150    ser: &'a mut SchemaAwareWriteSerializer<'s, W>,
151    item_schema: &'s Schema,
152    item_buffer_size: usize,
153    item_buffers: Vec<Vec<u8>>,
154    bytes_written: usize,
155}
156
157impl<'a, 's, W: Write> SchemaAwareWriteSerializeMap<'a, 's, W> {
158    fn new(
159        ser: &'a mut SchemaAwareWriteSerializer<'s, W>,
160        item_schema: &'s Schema,
161        len: Option<usize>,
162    ) -> SchemaAwareWriteSerializeMap<'a, 's, W> {
163        SchemaAwareWriteSerializeMap {
164            ser,
165            item_schema,
166            item_buffer_size: SINGLE_VALUE_INIT_BUFFER_SIZE,
167            item_buffers: Vec::with_capacity(
168                len.unwrap_or(COLLECTION_SERIALIZER_DEFAULT_INIT_ITEM_CAPACITY),
169            ),
170            bytes_written: 0,
171        }
172    }
173
174    fn write_buffered_items(&mut self) -> Result<(), Error> {
175        if !self.item_buffers.is_empty() {
176            self.bytes_written +=
177                encode_long(self.item_buffers.len() as i64, &mut self.ser.writer)?;
178            for item in self.item_buffers.drain(..) {
179                self.bytes_written += self
180                    .ser
181                    .writer
182                    .write(item.as_slice())
183                    .map_err(Details::WriteBytes)?;
184            }
185        }
186
187        Ok(())
188    }
189}
190
191impl<W: Write> ser::SerializeMap for SchemaAwareWriteSerializeMap<'_, '_, W> {
192    type Ok = usize;
193    type Error = Error;
194
195    fn serialize_key<T>(&mut self, key: &T) -> Result<(), Self::Error>
196    where
197        T: ?Sized + ser::Serialize,
198    {
199        let mut element_buffer: Vec<u8> = Vec::with_capacity(self.item_buffer_size);
200        let string_schema = Schema::String;
201        let mut key_ser = SchemaAwareWriteSerializer::new(
202            &mut element_buffer,
203            &string_schema,
204            self.ser.names,
205            self.ser.enclosing_namespace.clone(),
206        );
207        key.serialize(&mut key_ser)?;
208
209        self.item_buffers.push(element_buffer);
210
211        Ok(())
212    }
213
214    fn serialize_value<T>(&mut self, value: &T) -> Result<(), Self::Error>
215    where
216        T: ?Sized + ser::Serialize,
217    {
218        let last_index = self.item_buffers.len() - 1;
219        let element_buffer = &mut self.item_buffers[last_index];
220        let mut val_ser = SchemaAwareWriteSerializer::new(
221            element_buffer,
222            self.item_schema,
223            self.ser.names,
224            self.ser.enclosing_namespace.clone(),
225        );
226        value.serialize(&mut val_ser)?;
227
228        self.item_buffer_size = std::cmp::max(self.item_buffer_size, element_buffer.len() + 16);
229
230        if self.item_buffers.len() > COLLECTION_SERIALIZER_ITEM_LIMIT {
231            self.write_buffered_items()?;
232        }
233
234        Ok(())
235    }
236
237    fn end(mut self) -> Result<Self::Ok, Self::Error> {
238        self.write_buffered_items()?;
239        self.bytes_written += self.ser.writer.write(&[0u8]).map_err(Details::WriteBytes)?;
240
241        Ok(self.bytes_written)
242    }
243}
244
245/// The struct serializer for [`SchemaAwareWriteSerializer`], which can serialize Avro records.
246/// [`SchemaAwareWriteSerializeStruct`] can accept fields out of order, but doing so incurs a
247/// performance penalty, since it requires [`SchemaAwareWriteSerializeStruct`] to buffer serialized
248/// values in order to write them to the stream in order.
249pub struct SchemaAwareWriteSerializeStruct<'a, 's, W: Write> {
250    ser: &'a mut SchemaAwareWriteSerializer<'s, W>,
251    record_schema: &'s RecordSchema,
252    bytes_written: usize,
253}
254
255impl<'a, 's, W: Write> SchemaAwareWriteSerializeStruct<'a, 's, W> {
256    fn new(
257        ser: &'a mut SchemaAwareWriteSerializer<'s, W>,
258        record_schema: &'s RecordSchema,
259    ) -> SchemaAwareWriteSerializeStruct<'a, 's, W> {
260        SchemaAwareWriteSerializeStruct {
261            ser,
262            record_schema,
263            bytes_written: 0,
264        }
265    }
266
267    fn serialize_next_field<T>(&mut self, field: &RecordField, value: &T) -> Result<(), Error>
268    where
269        T: ?Sized + ser::Serialize,
270    {
271        // If we receive fields in order, write them directly to the main writer
272        let mut value_ser = SchemaAwareWriteSerializer::new(
273            &mut *self.ser.writer,
274            &field.schema,
275            self.ser.names,
276            self.ser.enclosing_namespace.clone(),
277        );
278        self.bytes_written += value.serialize(&mut value_ser)?;
279
280        Ok(())
281    }
282
283    fn end(self) -> Result<usize, Error> {
284        Ok(self.bytes_written)
285    }
286}
287
288impl<W: Write> ser::SerializeStruct for SchemaAwareWriteSerializeStruct<'_, '_, W> {
289    type Ok = usize;
290    type Error = Error;
291
292    fn serialize_field<T>(&mut self, key: &'static str, value: &T) -> Result<(), Self::Error>
293    where
294        T: ?Sized + ser::Serialize,
295    {
296        let record_field = self
297            .record_schema
298            .lookup
299            .get(key)
300            .and_then(|idx| self.record_schema.fields.get(*idx));
301
302        match record_field {
303            Some(field) => {
304                // self.item_count += 1;
305                self.serialize_next_field(field, value).map_err(|e| {
306                    Details::SerializeRecordFieldWithSchema {
307                        field_name: key,
308                        record_schema: Schema::Record(self.record_schema.clone()),
309                        error: Box::new(e),
310                    }
311                    .into()
312                })
313            }
314            None => Err(Details::FieldName(String::from(key)).into()),
315        }
316    }
317
318    fn skip_field(&mut self, key: &'static str) -> Result<(), Self::Error> {
319        let skipped_field = self
320            .record_schema
321            .lookup
322            .get(key)
323            .and_then(|idx| self.record_schema.fields.get(*idx));
324
325        if let Some(skipped_field) = skipped_field {
326            // self.item_count += 1;
327            skipped_field
328                .default
329                .serialize(&mut SchemaAwareWriteSerializer::new(
330                    self.ser.writer,
331                    &skipped_field.schema,
332                    self.ser.names,
333                    self.ser.enclosing_namespace.clone(),
334                ))?;
335        } else {
336            return Err(Details::GetField(key.to_string()).into());
337        }
338
339        Ok(())
340    }
341
342    fn end(self) -> Result<Self::Ok, Self::Error> {
343        self.end()
344    }
345}
346
347impl<W: Write> ser::SerializeStructVariant for SchemaAwareWriteSerializeStruct<'_, '_, W> {
348    type Ok = usize;
349    type Error = Error;
350
351    fn serialize_field<T>(&mut self, key: &'static str, value: &T) -> Result<(), Self::Error>
352    where
353        T: ?Sized + ser::Serialize,
354    {
355        ser::SerializeStruct::serialize_field(self, key, value)
356    }
357
358    fn end(self) -> Result<Self::Ok, Self::Error> {
359        ser::SerializeStruct::end(self)
360    }
361}
362
363/// The tuple struct serializer for [`SchemaAwareWriteSerializer`].
364/// [`SchemaAwareWriteSerializeTupleStruct`] can serialize to an Avro array, record, or big-decimal.
365/// When serializing to a record, fields must be provided in the correct order, since no names are provided.
366pub enum SchemaAwareWriteSerializeTupleStruct<'a, 's, W: Write> {
367    Record(SchemaAwareWriteSerializeStruct<'a, 's, W>),
368    Array(SchemaAwareWriteSerializeSeq<'a, 's, W>),
369}
370
371impl<W: Write> SchemaAwareWriteSerializeTupleStruct<'_, '_, W> {
372    fn serialize_field<T>(&mut self, value: &T) -> Result<(), Error>
373    where
374        T: ?Sized + ser::Serialize,
375    {
376        use SchemaAwareWriteSerializeTupleStruct::*;
377        match self {
378            Record(_record_ser) => {
379                unimplemented!("Tuple struct serialization to record is not supported!");
380            }
381            Array(array_ser) => array_ser.serialize_element(&value),
382        }
383    }
384
385    fn end(self) -> Result<usize, Error> {
386        use SchemaAwareWriteSerializeTupleStruct::*;
387        match self {
388            Record(record_ser) => record_ser.end(),
389            Array(array_ser) => array_ser.end(),
390        }
391    }
392}
393
394impl<W: Write> ser::SerializeTupleStruct for SchemaAwareWriteSerializeTupleStruct<'_, '_, W> {
395    type Ok = usize;
396    type Error = Error;
397
398    fn serialize_field<T>(&mut self, value: &T) -> Result<(), Self::Error>
399    where
400        T: ?Sized + ser::Serialize,
401    {
402        self.serialize_field(&value)
403    }
404
405    fn end(self) -> Result<Self::Ok, Self::Error> {
406        self.end()
407    }
408}
409
410impl<W: Write> ser::SerializeTupleVariant for SchemaAwareWriteSerializeTupleStruct<'_, '_, W> {
411    type Ok = usize;
412    type Error = Error;
413
414    fn serialize_field<T>(&mut self, value: &T) -> Result<(), Self::Error>
415    where
416        T: ?Sized + ser::Serialize,
417    {
418        self.serialize_field(&value)
419    }
420
421    fn end(self) -> Result<Self::Ok, Self::Error> {
422        self.end()
423    }
424}
425
426/// A [`serde::ser::Serializer`] implementation that serializes directly to a [`std::fmt::Write`]
427/// using the provided schema.  If [`SchemaAwareWriteSerializer`] isn't able to match the incoming
428/// data with its schema, it will return an error.
429/// A [`SchemaAwareWriteSerializer`] instance can be re-used to serialize multiple values matching
430/// the schema to its [`std::fmt::Write`] stream.
431pub struct SchemaAwareWriteSerializer<'s, W: Write> {
432    writer: &'s mut W,
433    root_schema: &'s Schema,
434    names: &'s NamesRef<'s>,
435    enclosing_namespace: Namespace,
436}
437
438impl<'s, W: Write> SchemaAwareWriteSerializer<'s, W> {
439    /// Create a new [`SchemaAwareWriteSerializer`].
440    ///
441    /// `writer` is the [`std::fmt::Write`] stream to be written to.
442    ///
443    /// `schema` is the schema of the value to be written.
444    ///
445    /// `names` is the mapping of schema names to schemas, to be used for type reference lookups
446    ///
447    /// `enclosing_namespace` is the enclosing namespace to be used for type reference lookups
448    pub fn new(
449        writer: &'s mut W,
450        schema: &'s Schema,
451        names: &'s NamesRef<'s>,
452        enclosing_namespace: Namespace,
453    ) -> SchemaAwareWriteSerializer<'s, W> {
454        SchemaAwareWriteSerializer {
455            writer,
456            root_schema: schema,
457            names,
458            enclosing_namespace,
459        }
460    }
461
462    fn get_ref_schema(&self, name: &'s Name) -> Result<&'s Schema, Error> {
463        let full_name = match name.namespace {
464            Some(_) => Cow::Borrowed(name),
465            None => Cow::Owned(Name {
466                name: name.name.clone(),
467                namespace: self.enclosing_namespace.clone(),
468            }),
469        };
470
471        let ref_schema = self.names.get(full_name.as_ref()).copied();
472
473        ref_schema.ok_or_else(|| Details::SchemaResolutionError(full_name.as_ref().clone()).into())
474    }
475
476    fn write_bytes(&mut self, bytes: &[u8]) -> Result<usize, Error> {
477        let mut bytes_written: usize = 0;
478
479        bytes_written += encode_long(bytes.len() as i64, &mut self.writer)?;
480        bytes_written += self.writer.write(bytes).map_err(Details::WriteBytes)?;
481
482        Ok(bytes_written)
483    }
484
485    fn serialize_bool_with_schema(&mut self, value: bool, schema: &Schema) -> Result<usize, Error> {
486        let create_error = |cause: String| {
487            Error::new(Details::SerializeValueWithSchema {
488                value_type: "bool",
489                value: format!("{value}. Cause: {cause}"),
490                schema: schema.clone(),
491            })
492        };
493
494        match schema {
495            Schema::Boolean => self
496                .writer
497                .write(&[u8::from(value)])
498                .map_err(|e| Details::WriteBytes(e).into()),
499            Schema::Union(union_schema) => {
500                for (i, variant_schema) in union_schema.schemas.iter().enumerate() {
501                    match variant_schema {
502                        Schema::Boolean => {
503                            encode_int(i as i32, &mut *self.writer)?;
504                            return self.serialize_bool_with_schema(value, variant_schema);
505                        }
506                        _ => { /* skip */ }
507                    }
508                }
509                Err(create_error(format!(
510                    "No matching Schema::Bool found in {:?}",
511                    union_schema.schemas
512                )))
513            }
514            expected => Err(create_error(format!("Expected {expected}. Got: Bool"))),
515        }
516    }
517
518    fn serialize_i32_with_schema(&mut self, value: i32, schema: &Schema) -> Result<usize, Error> {
519        let create_error = |cause: String| {
520            Error::new(Details::SerializeValueWithSchema {
521                value_type: "int (i8 | i16 | i32)",
522                value: format!("{value}. Cause: {cause}"),
523                schema: schema.clone(),
524            })
525        };
526
527        match schema {
528            Schema::Int | Schema::TimeMillis | Schema::Date => encode_int(value, &mut self.writer),
529            Schema::Long
530            | Schema::TimeMicros
531            | Schema::TimestampMillis
532            | Schema::TimestampMicros
533            | Schema::TimestampNanos
534            | Schema::LocalTimestampMillis
535            | Schema::LocalTimestampMicros
536            | Schema::LocalTimestampNanos => encode_long(value as i64, &mut self.writer),
537            Schema::Union(union_schema) => {
538                for (i, variant_schema) in union_schema.schemas.iter().enumerate() {
539                    match variant_schema {
540                        Schema::Int
541                        | Schema::TimeMillis
542                        | Schema::Date
543                        | Schema::Long
544                        | Schema::TimeMicros
545                        | Schema::TimestampMillis
546                        | Schema::TimestampMicros
547                        | Schema::TimestampNanos
548                        | Schema::LocalTimestampMillis
549                        | Schema::LocalTimestampMicros
550                        | Schema::LocalTimestampNanos => {
551                            encode_int(i as i32, &mut *self.writer)?;
552                            return self.serialize_i32_with_schema(value, variant_schema);
553                        }
554                        _ => { /* skip */ }
555                    }
556                }
557                Err(create_error(format!(
558                    "Cannot find a matching int-like schema in {union_schema:?}"
559                )))
560            }
561            expected => Err(create_error(format!("Expected {expected}. Got: Int/Long"))),
562        }
563    }
564
565    fn serialize_i64_with_schema(&mut self, value: i64, schema: &Schema) -> Result<usize, Error> {
566        let create_error = |cause: String| {
567            Error::new(Details::SerializeValueWithSchema {
568                value_type: "i64",
569                value: format!("{value}. Cause: {cause}"),
570                schema: schema.clone(),
571            })
572        };
573
574        match schema {
575            Schema::Int | Schema::TimeMillis | Schema::Date => {
576                let int_value =
577                    i32::try_from(value).map_err(|cause| create_error(cause.to_string()))?;
578                encode_int(int_value, &mut self.writer)
579            }
580            Schema::Long
581            | Schema::TimeMicros
582            | Schema::TimestampMillis
583            | Schema::TimestampMicros
584            | Schema::TimestampNanos
585            | Schema::LocalTimestampMillis
586            | Schema::LocalTimestampMicros
587            | Schema::LocalTimestampNanos => encode_long(value, &mut self.writer),
588            Schema::Union(union_schema) => {
589                for (i, variant_schema) in union_schema.schemas.iter().enumerate() {
590                    match variant_schema {
591                        Schema::Int
592                        | Schema::TimeMillis
593                        | Schema::Date
594                        | Schema::Long
595                        | Schema::TimeMicros
596                        | Schema::TimestampMillis
597                        | Schema::TimestampMicros
598                        | Schema::TimestampNanos
599                        | Schema::LocalTimestampMillis
600                        | Schema::LocalTimestampMicros
601                        | Schema::LocalTimestampNanos => {
602                            encode_int(i as i32, &mut *self.writer)?;
603                            return self.serialize_i64_with_schema(value, variant_schema);
604                        }
605                        _ => { /* skip */ }
606                    }
607                }
608                Err(create_error(format!(
609                    "Cannot find a matching int/long-like schema in {:?}",
610                    union_schema.schemas
611                )))
612            }
613            expected => Err(create_error(format!("Expected: {expected}. Got: Int/Long"))),
614        }
615    }
616
617    fn serialize_u8_with_schema(&mut self, value: u8, schema: &Schema) -> Result<usize, Error> {
618        let create_error = |cause: String| {
619            Error::new(Details::SerializeValueWithSchema {
620                value_type: "u8",
621                value: format!("{value}. Cause: {cause}"),
622                schema: schema.clone(),
623            })
624        };
625
626        match schema {
627            Schema::Int | Schema::TimeMillis | Schema::Date => {
628                encode_int(value as i32, &mut self.writer)
629            }
630            Schema::Long
631            | Schema::TimeMicros
632            | Schema::TimestampMillis
633            | Schema::TimestampMicros
634            | Schema::TimestampNanos
635            | Schema::LocalTimestampMillis
636            | Schema::LocalTimestampMicros
637            | Schema::LocalTimestampNanos => encode_long(value as i64, &mut self.writer),
638            Schema::Bytes => self.write_bytes(&[value]),
639            Schema::Union(union_schema) => {
640                for (i, variant_schema) in union_schema.schemas.iter().enumerate() {
641                    match variant_schema {
642                        Schema::Int
643                        | Schema::TimeMillis
644                        | Schema::Date
645                        | Schema::Long
646                        | Schema::TimeMicros
647                        | Schema::TimestampMillis
648                        | Schema::TimestampMicros
649                        | Schema::TimestampNanos
650                        | Schema::LocalTimestampMillis
651                        | Schema::LocalTimestampMicros
652                        | Schema::LocalTimestampNanos
653                        | Schema::Bytes => {
654                            encode_int(i as i32, &mut *self.writer)?;
655                            return self.serialize_u8_with_schema(value, variant_schema);
656                        }
657                        _ => { /* skip */ }
658                    }
659                }
660                Err(create_error(format!(
661                    "Cannot find a matching Int-like, Long-like or Bytes schema in {union_schema:?}"
662                )))
663            }
664            expected => Err(create_error(format!("Expected: {expected}. Got: Int"))),
665        }
666    }
667
668    fn serialize_u32_with_schema(&mut self, value: u32, schema: &Schema) -> Result<usize, Error> {
669        let create_error = |cause: String| {
670            Error::new(Details::SerializeValueWithSchema {
671                value_type: "unsigned int (u16 | u32)",
672                value: format!("{value}. Cause: {cause}"),
673                schema: schema.clone(),
674            })
675        };
676
677        match schema {
678            Schema::Int | Schema::TimeMillis | Schema::Date => {
679                let int_value =
680                    i32::try_from(value).map_err(|cause| create_error(cause.to_string()))?;
681                encode_int(int_value, &mut self.writer)
682            }
683            Schema::Long
684            | Schema::TimeMicros
685            | Schema::TimestampMillis
686            | Schema::TimestampMicros
687            | Schema::TimestampNanos
688            | Schema::LocalTimestampMillis
689            | Schema::LocalTimestampMicros
690            | Schema::LocalTimestampNanos => encode_long(value as i64, &mut self.writer),
691            Schema::Union(union_schema) => {
692                for (i, variant_schema) in union_schema.schemas.iter().enumerate() {
693                    match variant_schema {
694                        Schema::Int
695                        | Schema::TimeMillis
696                        | Schema::Date
697                        | Schema::Long
698                        | Schema::TimeMicros
699                        | Schema::TimestampMillis
700                        | Schema::TimestampMicros
701                        | Schema::TimestampNanos
702                        | Schema::LocalTimestampMillis
703                        | Schema::LocalTimestampMicros
704                        | Schema::LocalTimestampNanos => {
705                            encode_int(i as i32, &mut *self.writer)?;
706                            return self.serialize_u32_with_schema(value, variant_schema);
707                        }
708                        _ => { /* skip */ }
709                    }
710                }
711                Err(create_error(format!(
712                    "Cannot find a matching Int-like or Long-like schema in {union_schema:?}"
713                )))
714            }
715            expected => Err(create_error(format!("Expected: {expected}. Got: Int/Long"))),
716        }
717    }
718
719    fn serialize_u64_with_schema(&mut self, value: u64, schema: &Schema) -> Result<usize, Error> {
720        let create_error = |cause: String| {
721            Error::new(Details::SerializeValueWithSchema {
722                value_type: "u64",
723                value: format!("{value}. Cause: {cause}"),
724                schema: schema.clone(),
725            })
726        };
727
728        match schema {
729            Schema::Int | Schema::TimeMillis | Schema::Date => {
730                let int_value =
731                    i32::try_from(value).map_err(|cause| create_error(cause.to_string()))?;
732                encode_int(int_value, &mut self.writer)
733            }
734            Schema::Long
735            | Schema::TimeMicros
736            | Schema::TimestampMillis
737            | Schema::TimestampMicros
738            | Schema::TimestampNanos
739            | Schema::LocalTimestampMillis
740            | Schema::LocalTimestampMicros
741            | Schema::LocalTimestampNanos => {
742                let long_value =
743                    i64::try_from(value).map_err(|cause| create_error(cause.to_string()))?;
744                encode_long(long_value, &mut self.writer)
745            }
746            Schema::Union(union_schema) => {
747                for (i, variant_schema) in union_schema.schemas.iter().enumerate() {
748                    match variant_schema {
749                        Schema::Int
750                        | Schema::TimeMillis
751                        | Schema::Date
752                        | Schema::Long
753                        | Schema::TimeMicros
754                        | Schema::TimestampMillis
755                        | Schema::TimestampMicros
756                        | Schema::TimestampNanos
757                        | Schema::LocalTimestampMillis
758                        | Schema::LocalTimestampMicros
759                        | Schema::LocalTimestampNanos => {
760                            encode_int(i as i32, &mut *self.writer)?;
761                            return self.serialize_u64_with_schema(value, variant_schema);
762                        }
763                        _ => { /* skip */ }
764                    }
765                }
766                Err(create_error(format!(
767                    "Cannot find a matching Int-like or Long-like schema in {:?}",
768                    union_schema.schemas
769                )))
770            }
771            expected => Err(create_error(format!("Expected {expected}. Got: Int/Long"))),
772        }
773    }
774
775    fn serialize_f32_with_schema(&mut self, value: f32, schema: &Schema) -> Result<usize, Error> {
776        let create_error = |cause: String| {
777            Error::new(Details::SerializeValueWithSchema {
778                value_type: "f32",
779                value: format!("{value}. Cause: {cause}"),
780                schema: schema.clone(),
781            })
782        };
783
784        match schema {
785            Schema::Float => self
786                .writer
787                .write(&value.to_le_bytes())
788                .map_err(|e| Details::WriteBytes(e).into()),
789            Schema::Double => self
790                .writer
791                .write(&(value as f64).to_le_bytes())
792                .map_err(|e| Details::WriteBytes(e).into()),
793            Schema::Union(union_schema) => {
794                for (i, variant_schema) in union_schema.schemas.iter().enumerate() {
795                    match variant_schema {
796                        Schema::Float | Schema::Double => {
797                            encode_int(i as i32, &mut *self.writer)?;
798                            return self.serialize_f32_with_schema(value, variant_schema);
799                        }
800                        _ => { /* skip */ }
801                    }
802                }
803                Err(create_error(format!(
804                    "Cannot find a Float schema in {:?}",
805                    union_schema.schemas
806                )))
807            }
808            expected => Err(create_error(format!("Expected: {expected}. Got: Float"))),
809        }
810    }
811
812    fn serialize_f64_with_schema(&mut self, value: f64, schema: &Schema) -> Result<usize, Error> {
813        let create_error = |cause: String| {
814            Error::new(Details::SerializeValueWithSchema {
815                value_type: "f64",
816                value: format!("{value}. Cause: {cause}"),
817                schema: schema.clone(),
818            })
819        };
820
821        match schema {
822            Schema::Float => self
823                .writer
824                .write(&(value as f32).to_le_bytes())
825                .map_err(|e| Details::WriteBytes(e).into()),
826            Schema::Double => self
827                .writer
828                .write(&value.to_le_bytes())
829                .map_err(|e| Details::WriteBytes(e).into()),
830            Schema::Union(union_schema) => {
831                for (i, variant_schema) in union_schema.schemas.iter().enumerate() {
832                    match variant_schema {
833                        Schema::Float | Schema::Double => {
834                            encode_int(i as i32, &mut *self.writer)?;
835                            return self.serialize_f64_with_schema(value, variant_schema);
836                        }
837                        _ => { /* skip */ }
838                    }
839                }
840                Err(create_error(format!(
841                    "Cannot find a Double schema in {:?}",
842                    union_schema.schemas
843                )))
844            }
845            expected => Err(create_error(format!("Expected: {expected}. Got: Double"))),
846        }
847    }
848
849    fn serialize_char_with_schema(&mut self, value: char, schema: &Schema) -> Result<usize, Error> {
850        let create_error = |cause: String| {
851            Error::new(Details::SerializeValueWithSchema {
852                value_type: "char",
853                value: format!("{value}. Cause: {cause}"),
854                schema: schema.clone(),
855            })
856        };
857
858        match schema {
859            Schema::String | Schema::Bytes => self.write_bytes(String::from(value).as_bytes()),
860            Schema::Union(union_schema) => {
861                for (i, variant_schema) in union_schema.schemas.iter().enumerate() {
862                    match variant_schema {
863                        Schema::String | Schema::Bytes => {
864                            encode_int(i as i32, &mut *self.writer)?;
865                            return self.serialize_char_with_schema(value, variant_schema);
866                        }
867                        _ => { /* skip */ }
868                    }
869                }
870                Err(create_error(format!(
871                    "Cannot find a matching String or Bytes schema in {union_schema:?}"
872                )))
873            }
874            expected => Err(create_error(format!("Expected {expected}. Got: char"))),
875        }
876    }
877
878    fn serialize_str_with_schema(&mut self, value: &str, schema: &Schema) -> Result<usize, Error> {
879        let create_error = |cause: String| {
880            Error::new(Details::SerializeValueWithSchema {
881                value_type: "string",
882                value: format!("{value}. Cause: {cause}"),
883                schema: schema.clone(),
884            })
885        };
886
887        match schema {
888            Schema::String | Schema::Bytes | Schema::Uuid => self.write_bytes(value.as_bytes()),
889            Schema::BigDecimal => {
890                // If we get a string for a `BigDecimal` type, expect a display string representation, such as "12.75"
891                let decimal_val =
892                    BigDecimal::from_str(value).map_err(|e| create_error(e.to_string()))?;
893                let decimal_bytes = big_decimal_as_bytes(&decimal_val)?;
894                self.write_bytes(decimal_bytes.as_slice())
895            }
896            Schema::Fixed(fixed_schema) => {
897                if value.len() == fixed_schema.size {
898                    self.writer
899                        .write(value.as_bytes())
900                        .map_err(|e| Details::WriteBytes(e).into())
901                } else {
902                    Err(create_error(format!(
903                        "Fixed schema size ({}) does not match the value length ({})",
904                        fixed_schema.size,
905                        value.len()
906                    )))
907                }
908            }
909            Schema::Ref { name } => {
910                let ref_schema = self.get_ref_schema(name)?;
911                self.serialize_str_with_schema(value, ref_schema)
912            }
913            Schema::Union(union_schema) => {
914                for (i, variant_schema) in union_schema.schemas.iter().enumerate() {
915                    match variant_schema {
916                        Schema::String
917                        | Schema::Bytes
918                        | Schema::Uuid
919                        | Schema::Fixed(_)
920                        | Schema::Ref { name: _ } => {
921                            encode_int(i as i32, &mut *self.writer)?;
922                            return self.serialize_str_with_schema(value, variant_schema);
923                        }
924                        _ => { /* skip */ }
925                    }
926                }
927                Err(create_error(format!(
928                    "Expected one of the union variants {:?}. Got: String",
929                    union_schema.schemas
930                )))
931            }
932            expected => Err(create_error(format!("Expected: {expected}. Got: String"))),
933        }
934    }
935
936    fn serialize_bytes_with_schema(
937        &mut self,
938        value: &[u8],
939        schema: &Schema,
940    ) -> Result<usize, Error> {
941        let create_error = |cause: String| {
942            use std::fmt::Write;
943            let mut v_str = String::with_capacity(value.len());
944            for b in value {
945                if write!(&mut v_str, "{b:x}").is_err() {
946                    v_str.push_str("??");
947                }
948            }
949            Error::new(Details::SerializeValueWithSchema {
950                value_type: "bytes",
951                value: format!("{v_str}. Cause: {cause}"),
952                schema: schema.clone(),
953            })
954        };
955
956        match schema {
957            Schema::String | Schema::Bytes | Schema::Uuid | Schema::BigDecimal => {
958                self.write_bytes(value)
959            }
960            Schema::Fixed(fixed_schema) => {
961                if value.len() == fixed_schema.size {
962                    self.writer
963                        .write(value)
964                        .map_err(|e| Details::WriteBytes(e).into())
965                } else {
966                    Err(create_error(format!(
967                        "Fixed schema size ({}) does not match the value length ({})",
968                        fixed_schema.size,
969                        value.len()
970                    )))
971                }
972            }
973            Schema::Duration => {
974                if value.len() == 12 {
975                    self.writer
976                        .write(value)
977                        .map_err(|e| Details::WriteBytes(e).into())
978                } else {
979                    Err(create_error(format!(
980                        "Duration length must be 12! Got ({})",
981                        value.len()
982                    )))
983                }
984            }
985            Schema::Decimal(decimal_schema) => match decimal_schema.inner.as_ref() {
986                Schema::Bytes => self.write_bytes(value),
987                Schema::Fixed(fixed_schema) => match fixed_schema.size.checked_sub(value.len()) {
988                    Some(pad) => {
989                        let pad_val = match value.len() {
990                            0 => 0,
991                            _ => value[0],
992                        };
993                        let padding = vec![pad_val; pad];
994                        self.writer
995                            .write(padding.as_slice())
996                            .map_err(Details::WriteBytes)?;
997                        self.writer
998                            .write(value)
999                            .map_err(|e| Details::WriteBytes(e).into())
1000                    }
1001                    None => Err(Details::CompareFixedSizes {
1002                        size: fixed_schema.size,
1003                        n: value.len(),
1004                    }
1005                    .into()),
1006                },
1007                unsupported => Err(create_error(format!(
1008                    "Decimal schema's inner should be Bytes or Fixed schema. Got: {unsupported}"
1009                ))),
1010            },
1011            Schema::Ref { name } => {
1012                let ref_schema = self.get_ref_schema(name)?;
1013                self.serialize_bytes_with_schema(value, ref_schema)
1014            }
1015            Schema::Union(union_schema) => {
1016                for (i, variant_schema) in union_schema.schemas.iter().enumerate() {
1017                    match variant_schema {
1018                        Schema::String
1019                        | Schema::Bytes
1020                        | Schema::Uuid
1021                        | Schema::BigDecimal
1022                        | Schema::Fixed(_)
1023                        | Schema::Duration
1024                        | Schema::Decimal(_)
1025                        | Schema::Ref { name: _ } => {
1026                            encode_int(i as i32, &mut *self.writer)?;
1027                            return self.serialize_bytes_with_schema(value, variant_schema);
1028                        }
1029                        _ => { /* skip */ }
1030                    }
1031                }
1032                Err(create_error(format!(
1033                    "Cannot find a matching String, Bytes, Uuid, BigDecimal, Fixed, Duration, Decimal or Ref schema in {union_schema:?}"
1034                )))
1035            }
1036            unsupported => Err(create_error(format!(
1037                "Expected String, Bytes, Uuid, BigDecimal, Fixed, Duration, Decimal, Ref or Union schema. Got: {unsupported}"
1038            ))),
1039        }
1040    }
1041
1042    fn serialize_none_with_schema(&mut self, schema: &Schema) -> Result<usize, Error> {
1043        let create_error = |cause: String| {
1044            Error::new(Details::SerializeValueWithSchema {
1045                value_type: "none",
1046                value: format!("None. Cause: {cause}"),
1047                schema: schema.clone(),
1048            })
1049        };
1050
1051        match schema {
1052            Schema::Null => Ok(0),
1053            Schema::Union(union_schema) => {
1054                for (i, variant_schema) in union_schema.schemas.iter().enumerate() {
1055                    match variant_schema {
1056                        Schema::Null => {
1057                            return encode_int(i as i32, &mut *self.writer);
1058                        }
1059                        _ => { /* skip */ }
1060                    }
1061                }
1062                Err(create_error(format!(
1063                    "Cannot find a matching Null schema in {:?}",
1064                    union_schema.schemas
1065                )))
1066            }
1067            expected => Err(create_error(format!("Expected: {expected}. Got: Null"))),
1068        }
1069    }
1070
1071    fn serialize_some_with_schema<T>(&mut self, value: &T, schema: &Schema) -> Result<usize, Error>
1072    where
1073        T: ?Sized + ser::Serialize,
1074    {
1075        let create_error = |cause: String| {
1076            Error::new(Details::SerializeValueWithSchema {
1077                value_type: "some",
1078                value: format!("Some(?). Cause: {cause}"),
1079                schema: schema.clone(),
1080            })
1081        };
1082
1083        match schema {
1084            Schema::Union(union_schema) => {
1085                for (i, variant_schema) in union_schema.schemas.iter().enumerate() {
1086                    match variant_schema {
1087                        Schema::Null => { /* skip */ }
1088                        _ => {
1089                            encode_long(i as i64, &mut *self.writer)?;
1090                            let mut variant_ser = SchemaAwareWriteSerializer::new(
1091                                &mut *self.writer,
1092                                variant_schema,
1093                                self.names,
1094                                self.enclosing_namespace.clone(),
1095                            );
1096                            return value.serialize(&mut variant_ser);
1097                        }
1098                    }
1099                }
1100                Err(create_error(format!(
1101                    "Cannot find a matching Null schema in {:?}",
1102                    union_schema.schemas
1103                )))
1104            }
1105            _ => value.serialize(self),
1106        }
1107    }
1108
1109    fn serialize_unit_struct_with_schema(
1110        &mut self,
1111        name: &'static str,
1112        schema: &Schema,
1113    ) -> Result<usize, Error> {
1114        let create_error = |cause: String| {
1115            Error::new(Details::SerializeValueWithSchema {
1116                value_type: "unit struct",
1117                value: format!("{name}. Cause: {cause}"),
1118                schema: schema.clone(),
1119            })
1120        };
1121
1122        match schema {
1123            Schema::Record(sch) => match sch.fields.len() {
1124                0 => Ok(0),
1125                too_many => Err(create_error(format!(
1126                    "Too many fields: {too_many}. Expected: 0"
1127                ))),
1128            },
1129            Schema::Null => Ok(0),
1130            Schema::Ref { name: ref_name } => {
1131                let ref_schema = self.get_ref_schema(ref_name)?;
1132                self.serialize_unit_struct_with_schema(name, ref_schema)
1133            }
1134            Schema::Union(union_schema) => {
1135                for (i, variant_schema) in union_schema.schemas.iter().enumerate() {
1136                    match variant_schema {
1137                        Schema::Record(record_schema) if record_schema.fields.is_empty() => {
1138                            encode_int(i as i32, &mut *self.writer)?;
1139                            return self.serialize_unit_struct_with_schema(name, variant_schema);
1140                        }
1141                        Schema::Null | Schema::Ref { name: _ } => {
1142                            encode_int(i as i32, &mut *self.writer)?;
1143                            return self.serialize_unit_struct_with_schema(name, variant_schema);
1144                        }
1145                        _ => { /* skip */ }
1146                    }
1147                }
1148                Err(create_error(format!(
1149                    "Cannot find a matching Null schema in {union_schema:?}"
1150                )))
1151            }
1152            unsupported => Err(create_error(format!(
1153                "Expected Null or Union schema. Got: {unsupported}"
1154            ))),
1155        }
1156    }
1157
1158    fn serialize_unit_variant_with_schema(
1159        &mut self,
1160        name: &'static str,
1161        variant_index: u32,
1162        variant: &'static str,
1163        schema: &Schema,
1164    ) -> Result<usize, Error> {
1165        let create_error = |cause: String| {
1166            Error::new(Details::SerializeValueWithSchema {
1167                value_type: "unit variant",
1168                value: format!("{name}::{variant} (index={variant_index}). Cause: {cause}"),
1169                schema: schema.clone(),
1170            })
1171        };
1172
1173        match schema {
1174            Schema::Enum(enum_schema) => {
1175                if variant_index as usize >= enum_schema.symbols.len() {
1176                    return Err(create_error(format!(
1177                        "Variant index out of bounds: {}. The Enum schema has '{}' symbols",
1178                        variant_index,
1179                        enum_schema.symbols.len()
1180                    )));
1181                }
1182
1183                encode_int(variant_index as i32, &mut self.writer)
1184            }
1185            Schema::Union(union_schema) => {
1186                if variant_index as usize >= union_schema.schemas.len() {
1187                    return Err(create_error(format!(
1188                        "Variant index out of bounds: {}. The union schema has '{}' schemas",
1189                        variant_index,
1190                        union_schema.schemas.len()
1191                    )));
1192                }
1193
1194                encode_int(variant_index as i32, &mut self.writer)?;
1195                self.serialize_unit_struct_with_schema(
1196                    name,
1197                    &union_schema.schemas[variant_index as usize],
1198                )
1199            }
1200            Schema::Ref { name: ref_name } => {
1201                let ref_schema = self.get_ref_schema(ref_name)?;
1202                self.serialize_unit_variant_with_schema(name, variant_index, variant, ref_schema)
1203            }
1204            unsupported => Err(create_error(format!(
1205                "Unsupported schema: {unsupported:?}. Expected: Enum, Union or Ref"
1206            ))),
1207        }
1208    }
1209
1210    fn serialize_newtype_struct_with_schema<T>(
1211        &mut self,
1212        _name: &'static str,
1213        value: &T,
1214        schema: &Schema,
1215    ) -> Result<usize, Error>
1216    where
1217        T: ?Sized + ser::Serialize,
1218    {
1219        let mut inner_ser = SchemaAwareWriteSerializer::new(
1220            &mut *self.writer,
1221            schema,
1222            self.names,
1223            self.enclosing_namespace.clone(),
1224        );
1225        // Treat any newtype struct as a transparent wrapper around the contained type
1226        value.serialize(&mut inner_ser)
1227    }
1228
1229    fn serialize_newtype_variant_with_schema<T>(
1230        &mut self,
1231        name: &'static str,
1232        variant_index: u32,
1233        variant: &'static str,
1234        value: &T,
1235        schema: &Schema,
1236    ) -> Result<usize, Error>
1237    where
1238        T: ?Sized + ser::Serialize,
1239    {
1240        let create_error = |cause: String| {
1241            Error::new(Details::SerializeValueWithSchema {
1242                value_type: "newtype variant",
1243                value: format!("{name}::{variant}(?) (index={variant_index}). Cause: {cause}"),
1244                schema: schema.clone(),
1245            })
1246        };
1247
1248        match schema {
1249            Schema::Union(union_schema) => {
1250                let variant_schema = union_schema
1251                    .schemas
1252                    .get(variant_index as usize)
1253                    .ok_or_else(|| {
1254                        create_error(format!(
1255                            "No variant schema at position {variant_index} for {union_schema:?}"
1256                        ))
1257                    })?;
1258
1259                encode_int(variant_index as i32, &mut self.writer)?;
1260                self.serialize_newtype_struct_with_schema(variant, value, variant_schema)
1261            }
1262            _ => Err(create_error(format!(
1263                "Expected Union schema. Got: {schema}"
1264            ))),
1265        }
1266    }
1267
1268    fn serialize_seq_with_schema<'a>(
1269        &'a mut self,
1270        len: Option<usize>,
1271        schema: &'s Schema,
1272    ) -> Result<SchemaAwareWriteSerializeSeq<'a, 's, W>, Error> {
1273        let create_error = |cause: String| {
1274            let len_str = len
1275                .map(|l| format!("{l}"))
1276                .unwrap_or_else(|| String::from("?"));
1277
1278            Error::new(Details::SerializeValueWithSchema {
1279                value_type: "sequence",
1280                value: format!("sequence (len={len_str}). Cause: {cause}"),
1281                schema: schema.clone(),
1282            })
1283        };
1284
1285        match schema {
1286            Schema::Array(array_schema) => Ok(SchemaAwareWriteSerializeSeq::new(
1287                self,
1288                array_schema.items.as_ref(),
1289                len,
1290            )),
1291            Schema::Union(union_schema) => {
1292                for (i, variant_schema) in union_schema.schemas.iter().enumerate() {
1293                    match variant_schema {
1294                        Schema::Array(_) => {
1295                            encode_int(i as i32, &mut *self.writer)?;
1296                            return self.serialize_seq_with_schema(len, variant_schema);
1297                        }
1298                        _ => { /* skip */ }
1299                    }
1300                }
1301                Err(create_error(format!(
1302                    "Expected Array schema in {union_schema:?}"
1303                )))
1304            }
1305            _ => Err(create_error(format!("Expected: {schema}. Got: Array"))),
1306        }
1307    }
1308
1309    fn serialize_tuple_with_schema<'a>(
1310        &'a mut self,
1311        len: usize,
1312        schema: &'s Schema,
1313    ) -> Result<SchemaAwareWriteSerializeSeq<'a, 's, W>, Error> {
1314        let create_error = |cause: String| {
1315            Error::new(Details::SerializeValueWithSchema {
1316                value_type: "tuple",
1317                value: format!("tuple (len={len}). Cause: {cause}"),
1318                schema: schema.clone(),
1319            })
1320        };
1321
1322        match schema {
1323            Schema::Array(array_schema) => Ok(SchemaAwareWriteSerializeSeq::new(
1324                self,
1325                array_schema.items.as_ref(),
1326                Some(len),
1327            )),
1328            Schema::Union(union_schema) => {
1329                for (i, variant_schema) in union_schema.schemas.iter().enumerate() {
1330                    match variant_schema {
1331                        Schema::Array(_) => {
1332                            encode_int(i as i32, &mut *self.writer)?;
1333                            return self.serialize_tuple_with_schema(len, variant_schema);
1334                        }
1335                        _ => { /* skip */ }
1336                    }
1337                }
1338                Err(create_error(format!(
1339                    "Expected Array schema in {union_schema:?}"
1340                )))
1341            }
1342            _ => Err(create_error(format!("Expected: {schema}. Got: Array"))),
1343        }
1344    }
1345
1346    fn serialize_tuple_struct_with_schema<'a>(
1347        &'a mut self,
1348        name: &'static str,
1349        len: usize,
1350        schema: &'s Schema,
1351    ) -> Result<SchemaAwareWriteSerializeTupleStruct<'a, 's, W>, Error> {
1352        let create_error = |cause: String| {
1353            Error::new(Details::SerializeValueWithSchema {
1354                value_type: "tuple struct",
1355                value: format!(
1356                    "{name}({}). Cause: {cause}",
1357                    vec!["?"; len].as_slice().join(",")
1358                ),
1359                schema: schema.clone(),
1360            })
1361        };
1362
1363        match schema {
1364            Schema::Array(sch) => Ok(SchemaAwareWriteSerializeTupleStruct::Array(
1365                SchemaAwareWriteSerializeSeq::new(self, &sch.items, Some(len)),
1366            )),
1367            Schema::Record(sch) => Ok(SchemaAwareWriteSerializeTupleStruct::Record(
1368                SchemaAwareWriteSerializeStruct::new(self, sch),
1369            )),
1370            Schema::Ref { name: ref_name } => {
1371                let ref_schema = self.get_ref_schema(ref_name)?;
1372                self.serialize_tuple_struct_with_schema(name, len, ref_schema)
1373            }
1374            Schema::Union(union_schema) => {
1375                for (i, variant_schema) in union_schema.schemas.iter().enumerate() {
1376                    match variant_schema {
1377                        Schema::Record(inner) => {
1378                            if inner.fields.len() == len {
1379                                encode_int(i as i32, &mut *self.writer)?;
1380                                return self.serialize_tuple_struct_with_schema(
1381                                    name,
1382                                    len,
1383                                    variant_schema,
1384                                );
1385                            }
1386                        }
1387                        Schema::Array(_) | Schema::Ref { name: _ } => {
1388                            encode_int(i as i32, &mut *self.writer)?;
1389                            return self.serialize_tuple_struct_with_schema(
1390                                name,
1391                                len,
1392                                variant_schema,
1393                            );
1394                        }
1395                        _ => { /* skip */ }
1396                    }
1397                }
1398                Err(create_error(format!(
1399                    "Expected Record, Array or Ref schema in {union_schema:?}"
1400                )))
1401            }
1402            _ => Err(create_error(format!(
1403                "Expected Record, Array, Ref or Union schema. Got: {schema}"
1404            ))),
1405        }
1406    }
1407
1408    fn serialize_tuple_variant_with_schema<'a>(
1409        &'a mut self,
1410        name: &'static str,
1411        variant_index: u32,
1412        variant: &'static str,
1413        len: usize,
1414        schema: &'s Schema,
1415    ) -> Result<SchemaAwareWriteSerializeTupleStruct<'a, 's, W>, Error> {
1416        let create_error = |cause: String| {
1417            Error::new(Details::SerializeValueWithSchema {
1418                value_type: "tuple variant",
1419                value: format!(
1420                    "{name}::{variant}({}) (index={variant_index}). Cause: {cause}",
1421                    vec!["?"; len].as_slice().join(",")
1422                ),
1423                schema: schema.clone(),
1424            })
1425        };
1426
1427        match schema {
1428            Schema::Union(union_schema) => {
1429                let variant_schema = union_schema
1430                    .schemas
1431                    .get(variant_index as usize)
1432                    .ok_or_else(|| {
1433                        create_error(format!(
1434                            "Cannot find a variant at position {variant_index} in {union_schema:?}"
1435                        ))
1436                    })?;
1437
1438                encode_int(variant_index as i32, &mut self.writer)?;
1439                self.serialize_tuple_struct_with_schema(variant, len, variant_schema)
1440            }
1441            _ => Err(create_error(format!(
1442                "Expected Union schema. Got: {schema}"
1443            ))),
1444        }
1445    }
1446
1447    fn serialize_map_with_schema<'a>(
1448        &'a mut self,
1449        len: Option<usize>,
1450        schema: &'s Schema,
1451    ) -> Result<SchemaAwareWriteSerializeMap<'a, 's, W>, Error> {
1452        let create_error = |cause: String| {
1453            let len_str = len
1454                .map(|l| format!("{l}"))
1455                .unwrap_or_else(|| String::from("?"));
1456
1457            Error::new(Details::SerializeValueWithSchema {
1458                value_type: "map",
1459                value: format!("map (size={len_str}). Cause: {cause}"),
1460                schema: schema.clone(),
1461            })
1462        };
1463
1464        match schema {
1465            Schema::Map(map_schema) => Ok(SchemaAwareWriteSerializeMap::new(
1466                self,
1467                map_schema.types.as_ref(),
1468                len,
1469            )),
1470            Schema::Union(union_schema) => {
1471                for (i, variant_schema) in union_schema.schemas.iter().enumerate() {
1472                    match variant_schema {
1473                        Schema::Map(_) => {
1474                            encode_int(i as i32, &mut *self.writer)?;
1475                            return self.serialize_map_with_schema(len, variant_schema);
1476                        }
1477                        _ => { /* skip */ }
1478                    }
1479                }
1480                Err(create_error(format!(
1481                    "Expected a Map schema in {union_schema:?}"
1482                )))
1483            }
1484            _ => Err(create_error(format!(
1485                "Expected Map or Union schema. Got: {schema}"
1486            ))),
1487        }
1488    }
1489
1490    fn serialize_struct_with_schema<'a>(
1491        &'a mut self,
1492        name: &'static str,
1493        len: usize,
1494        schema: &'s Schema,
1495    ) -> Result<SchemaAwareWriteSerializeStruct<'a, 's, W>, Error> {
1496        let create_error = |cause: String| {
1497            Error::new(Details::SerializeValueWithSchema {
1498                value_type: "struct",
1499                value: format!("{name}{{ ... }}. Cause: {cause}"),
1500                schema: schema.clone(),
1501            })
1502        };
1503
1504        match schema {
1505            Schema::Record(record_schema) => {
1506                Ok(SchemaAwareWriteSerializeStruct::new(self, record_schema))
1507            }
1508            Schema::Ref { name: ref_name } => {
1509                let ref_schema = self.get_ref_schema(ref_name)?;
1510                self.serialize_struct_with_schema(name, len, ref_schema)
1511            }
1512            Schema::Union(union_schema) => {
1513                for (i, variant_schema) in union_schema.schemas.iter().enumerate() {
1514                    match variant_schema {
1515                        Schema::Record(inner)
1516                            if inner.fields.len() == len && inner.name.name == name =>
1517                        {
1518                            encode_int(i as i32, &mut *self.writer)?;
1519                            return self.serialize_struct_with_schema(name, len, variant_schema);
1520                        }
1521                        Schema::Ref { name: _ } => {
1522                            encode_int(i as i32, &mut *self.writer)?;
1523                            return self.serialize_struct_with_schema(name, len, variant_schema);
1524                        }
1525                        _ => { /* skip */ }
1526                    }
1527                }
1528                Err(create_error(format!(
1529                    "Expected Record or Ref schema in {union_schema:?}"
1530                )))
1531            }
1532            _ => Err(create_error(format!(
1533                "Expected Record, Ref or Union schema. Got: {schema}"
1534            ))),
1535        }
1536    }
1537
1538    fn serialize_struct_variant_with_schema<'a>(
1539        &'a mut self,
1540        name: &'static str,
1541        variant_index: u32,
1542        variant: &'static str,
1543        len: usize,
1544        schema: &'s Schema,
1545    ) -> Result<SchemaAwareWriteSerializeStruct<'a, 's, W>, Error> {
1546        let create_error = |cause: String| {
1547            Error::new(Details::SerializeValueWithSchema {
1548                value_type: "struct variant",
1549                value: format!("{name}::{variant}{{ ... }} (size={len}. Cause: {cause})"),
1550                schema: schema.clone(),
1551            })
1552        };
1553
1554        match schema {
1555            Schema::Union(union_schema) => {
1556                let variant_schema = union_schema
1557                    .schemas
1558                    .get(variant_index as usize)
1559                    .ok_or_else(|| {
1560                        create_error(format!(
1561                            "Cannot find variant at position {variant_index} in {union_schema:?}"
1562                        ))
1563                    })?;
1564
1565                encode_int(variant_index as i32, &mut self.writer)?;
1566                self.serialize_struct_with_schema(variant, len, variant_schema)
1567            }
1568            _ => Err(create_error(format!(
1569                "Expected Union schema. Got: {schema}"
1570            ))),
1571        }
1572    }
1573}
1574
1575impl<'a, 's, W: Write> ser::Serializer for &'a mut SchemaAwareWriteSerializer<'s, W> {
1576    type Ok = usize;
1577    type Error = Error;
1578    type SerializeSeq = SchemaAwareWriteSerializeSeq<'a, 's, W>;
1579    type SerializeTuple = SchemaAwareWriteSerializeSeq<'a, 's, W>;
1580    type SerializeTupleStruct = SchemaAwareWriteSerializeTupleStruct<'a, 's, W>;
1581    type SerializeTupleVariant = SchemaAwareWriteSerializeTupleStruct<'a, 's, W>;
1582    type SerializeMap = SchemaAwareWriteSerializeMap<'a, 's, W>;
1583    type SerializeStruct = SchemaAwareWriteSerializeStruct<'a, 's, W>;
1584    type SerializeStructVariant = SchemaAwareWriteSerializeStruct<'a, 's, W>;
1585
1586    fn serialize_bool(self, v: bool) -> Result<Self::Ok, Self::Error> {
1587        self.serialize_bool_with_schema(v, self.root_schema)
1588    }
1589
1590    fn serialize_i8(self, v: i8) -> Result<Self::Ok, Self::Error> {
1591        self.serialize_i32(v as i32)
1592    }
1593
1594    fn serialize_i16(self, v: i16) -> Result<Self::Ok, Self::Error> {
1595        self.serialize_i32(v as i32)
1596    }
1597
1598    fn serialize_i32(self, v: i32) -> Result<Self::Ok, Self::Error> {
1599        self.serialize_i32_with_schema(v, self.root_schema)
1600    }
1601
1602    fn serialize_i64(self, v: i64) -> Result<Self::Ok, Self::Error> {
1603        self.serialize_i64_with_schema(v, self.root_schema)
1604    }
1605
1606    fn serialize_u8(self, v: u8) -> Result<Self::Ok, Self::Error> {
1607        self.serialize_u8_with_schema(v, self.root_schema)
1608    }
1609
1610    fn serialize_u16(self, v: u16) -> Result<Self::Ok, Self::Error> {
1611        self.serialize_u32(v as u32)
1612    }
1613
1614    fn serialize_u32(self, v: u32) -> Result<Self::Ok, Self::Error> {
1615        self.serialize_u32_with_schema(v, self.root_schema)
1616    }
1617
1618    fn serialize_u64(self, v: u64) -> Result<Self::Ok, Self::Error> {
1619        self.serialize_u64_with_schema(v, self.root_schema)
1620    }
1621
1622    fn serialize_f32(self, v: f32) -> Result<Self::Ok, Self::Error> {
1623        self.serialize_f32_with_schema(v, self.root_schema)
1624    }
1625
1626    fn serialize_f64(self, v: f64) -> Result<Self::Ok, Self::Error> {
1627        self.serialize_f64_with_schema(v, self.root_schema)
1628    }
1629
1630    fn serialize_char(self, v: char) -> Result<Self::Ok, Self::Error> {
1631        self.serialize_char_with_schema(v, self.root_schema)
1632    }
1633
1634    fn serialize_str(self, v: &str) -> Result<Self::Ok, Self::Error> {
1635        self.serialize_str_with_schema(v, self.root_schema)
1636    }
1637
1638    fn serialize_bytes(self, v: &[u8]) -> Result<Self::Ok, Self::Error> {
1639        self.serialize_bytes_with_schema(v, self.root_schema)
1640    }
1641
1642    fn serialize_none(self) -> Result<Self::Ok, Self::Error> {
1643        self.serialize_none_with_schema(self.root_schema)
1644    }
1645
1646    fn serialize_some<T>(self, value: &T) -> Result<Self::Ok, Self::Error>
1647    where
1648        T: ?Sized + ser::Serialize,
1649    {
1650        self.serialize_some_with_schema(value, self.root_schema)
1651    }
1652
1653    fn serialize_unit(self) -> Result<Self::Ok, Self::Error> {
1654        self.serialize_none()
1655    }
1656
1657    fn serialize_unit_struct(self, name: &'static str) -> Result<Self::Ok, Self::Error> {
1658        self.serialize_unit_struct_with_schema(name, self.root_schema)
1659    }
1660
1661    fn serialize_unit_variant(
1662        self,
1663        name: &'static str,
1664        variant_index: u32,
1665        variant: &'static str,
1666    ) -> Result<Self::Ok, Self::Error> {
1667        self.serialize_unit_variant_with_schema(name, variant_index, variant, self.root_schema)
1668    }
1669
1670    fn serialize_newtype_struct<T>(
1671        self,
1672        name: &'static str,
1673        value: &T,
1674    ) -> Result<Self::Ok, Self::Error>
1675    where
1676        T: ?Sized + ser::Serialize,
1677    {
1678        self.serialize_newtype_struct_with_schema(name, value, self.root_schema)
1679    }
1680
1681    fn serialize_newtype_variant<T>(
1682        self,
1683        name: &'static str,
1684        variant_index: u32,
1685        variant: &'static str,
1686        value: &T,
1687    ) -> Result<Self::Ok, Self::Error>
1688    where
1689        T: ?Sized + ser::Serialize,
1690    {
1691        self.serialize_newtype_variant_with_schema(
1692            name,
1693            variant_index,
1694            variant,
1695            value,
1696            self.root_schema,
1697        )
1698    }
1699
1700    fn serialize_seq(self, len: Option<usize>) -> Result<Self::SerializeSeq, Self::Error> {
1701        self.serialize_seq_with_schema(len, self.root_schema)
1702    }
1703
1704    fn serialize_tuple(self, len: usize) -> Result<Self::SerializeTuple, Self::Error> {
1705        self.serialize_tuple_with_schema(len, self.root_schema)
1706    }
1707
1708    fn serialize_tuple_struct(
1709        self,
1710        name: &'static str,
1711        len: usize,
1712    ) -> Result<Self::SerializeTupleStruct, Self::Error> {
1713        self.serialize_tuple_struct_with_schema(name, len, self.root_schema)
1714    }
1715
1716    fn serialize_tuple_variant(
1717        self,
1718        name: &'static str,
1719        variant_index: u32,
1720        variant: &'static str,
1721        len: usize,
1722    ) -> Result<Self::SerializeTupleVariant, Self::Error> {
1723        self.serialize_tuple_variant_with_schema(
1724            name,
1725            variant_index,
1726            variant,
1727            len,
1728            self.root_schema,
1729        )
1730    }
1731
1732    fn serialize_map(self, len: Option<usize>) -> Result<Self::SerializeMap, Self::Error> {
1733        self.serialize_map_with_schema(len, self.root_schema)
1734    }
1735
1736    fn serialize_struct(
1737        self,
1738        name: &'static str,
1739        len: usize,
1740    ) -> Result<Self::SerializeStruct, Self::Error> {
1741        self.serialize_struct_with_schema(name, len, self.root_schema)
1742    }
1743
1744    fn serialize_struct_variant(
1745        self,
1746        name: &'static str,
1747        variant_index: u32,
1748        variant: &'static str,
1749        len: usize,
1750    ) -> Result<Self::SerializeStructVariant, Self::Error> {
1751        self.serialize_struct_variant_with_schema(
1752            name,
1753            variant_index,
1754            variant,
1755            len,
1756            self.root_schema,
1757        )
1758    }
1759
1760    fn is_human_readable(&self) -> bool {
1761        crate::util::is_human_readable()
1762    }
1763}
1764
1765#[cfg(test)]
1766mod tests {
1767    use super::*;
1768    use crate::{
1769        Days, Duration, Millis, Months, decimal::Decimal, error::Details, schema::ResolvedSchema,
1770    };
1771    use apache_avro_test_helper::TestResult;
1772    use bigdecimal::BigDecimal;
1773    use num_bigint::{BigInt, Sign};
1774    use serde::Serialize;
1775    use serde_bytes::{ByteArray, Bytes};
1776    use serial_test::serial;
1777    use std::{
1778        collections::{BTreeMap, HashMap},
1779        marker::PhantomData,
1780        sync::atomic::Ordering,
1781    };
1782    use uuid::Uuid;
1783
1784    #[test]
1785    fn test_serialize_null() -> TestResult {
1786        let schema = Schema::Null;
1787        let mut buffer: Vec<u8> = Vec::new();
1788        let names = HashMap::new();
1789        let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None);
1790
1791        ().serialize(&mut serializer)?;
1792        None::<()>.serialize(&mut serializer)?;
1793        None::<i32>.serialize(&mut serializer)?;
1794        None::<String>.serialize(&mut serializer)?;
1795        assert!("".serialize(&mut serializer).is_err());
1796        assert!(Some("").serialize(&mut serializer).is_err());
1797
1798        assert_eq!(buffer.as_slice(), Vec::<u8>::new().as_slice());
1799
1800        Ok(())
1801    }
1802
1803    #[test]
1804    fn test_serialize_bool() -> TestResult {
1805        let schema = Schema::Boolean;
1806        let mut buffer: Vec<u8> = Vec::new();
1807        let names = HashMap::new();
1808        let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None);
1809
1810        true.serialize(&mut serializer)?;
1811        false.serialize(&mut serializer)?;
1812        assert!("".serialize(&mut serializer).is_err());
1813        assert!(Some("").serialize(&mut serializer).is_err());
1814
1815        assert_eq!(buffer.as_slice(), &[1, 0]);
1816
1817        Ok(())
1818    }
1819
1820    #[test]
1821    fn test_serialize_int() -> TestResult {
1822        let schema = Schema::Int;
1823        let mut buffer: Vec<u8> = Vec::new();
1824        let names = HashMap::new();
1825        let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None);
1826
1827        4u8.serialize(&mut serializer)?;
1828        31u16.serialize(&mut serializer)?;
1829        13u32.serialize(&mut serializer)?;
1830        7i8.serialize(&mut serializer)?;
1831        (-57i16).serialize(&mut serializer)?;
1832        129i32.serialize(&mut serializer)?;
1833        assert!("".serialize(&mut serializer).is_err());
1834        assert!(Some("").serialize(&mut serializer).is_err());
1835
1836        assert_eq!(buffer.as_slice(), &[8, 62, 26, 14, 113, 130, 2]);
1837
1838        Ok(())
1839    }
1840
1841    #[test]
1842    fn test_serialize_long() -> TestResult {
1843        let schema = Schema::Long;
1844        let mut buffer: Vec<u8> = Vec::new();
1845        let names = HashMap::new();
1846        let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None);
1847
1848        4u8.serialize(&mut serializer)?;
1849        31u16.serialize(&mut serializer)?;
1850        13u32.serialize(&mut serializer)?;
1851        291u64.serialize(&mut serializer)?;
1852        7i8.serialize(&mut serializer)?;
1853        (-57i16).serialize(&mut serializer)?;
1854        129i32.serialize(&mut serializer)?;
1855        (-432i64).serialize(&mut serializer)?;
1856        assert!("".serialize(&mut serializer).is_err());
1857        assert!(Some("").serialize(&mut serializer).is_err());
1858
1859        assert_eq!(
1860            buffer.as_slice(),
1861            &[8, 62, 26, 198, 4, 14, 113, 130, 2, 223, 6]
1862        );
1863
1864        Ok(())
1865    }
1866
1867    #[test]
1868    fn test_serialize_float() -> TestResult {
1869        let schema = Schema::Float;
1870        let mut buffer: Vec<u8> = Vec::new();
1871        let names = HashMap::new();
1872        let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None);
1873
1874        4.7f32.serialize(&mut serializer)?;
1875        (-14.1f64).serialize(&mut serializer)?;
1876        assert!("".serialize(&mut serializer).is_err());
1877        assert!(Some("").serialize(&mut serializer).is_err());
1878
1879        assert_eq!(buffer.as_slice(), &[102, 102, 150, 64, 154, 153, 97, 193]);
1880
1881        Ok(())
1882    }
1883
1884    #[test]
1885    fn test_serialize_double() -> TestResult {
1886        let schema = Schema::Float;
1887        let mut buffer: Vec<u8> = Vec::new();
1888        let names = HashMap::new();
1889        let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None);
1890
1891        4.7f32.serialize(&mut serializer)?;
1892        (-14.1f64).serialize(&mut serializer)?;
1893        assert!("".serialize(&mut serializer).is_err());
1894        assert!(Some("").serialize(&mut serializer).is_err());
1895
1896        assert_eq!(buffer.as_slice(), &[102, 102, 150, 64, 154, 153, 97, 193]);
1897
1898        Ok(())
1899    }
1900
1901    #[test]
1902    fn test_serialize_bytes() -> TestResult {
1903        let schema = Schema::Bytes;
1904        let mut buffer: Vec<u8> = Vec::new();
1905        let names = HashMap::new();
1906        let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None);
1907
1908        'a'.serialize(&mut serializer)?;
1909        "test".serialize(&mut serializer)?;
1910        Bytes::new(&[12, 3, 7, 91, 4]).serialize(&mut serializer)?;
1911        assert!(().serialize(&mut serializer).is_err());
1912        assert!(PhantomData::<String>.serialize(&mut serializer).is_err());
1913
1914        assert_eq!(
1915            buffer.as_slice(),
1916            &[2, b'a', 8, b't', b'e', b's', b't', 10, 12, 3, 7, 91, 4]
1917        );
1918
1919        Ok(())
1920    }
1921
1922    #[test]
1923    fn test_serialize_string() -> TestResult {
1924        let schema = Schema::String;
1925        let mut buffer: Vec<u8> = Vec::new();
1926        let names = HashMap::new();
1927        let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None);
1928
1929        'a'.serialize(&mut serializer)?;
1930        "test".serialize(&mut serializer)?;
1931        Bytes::new(&[12, 3, 7, 91, 4]).serialize(&mut serializer)?;
1932        assert!(().serialize(&mut serializer).is_err());
1933        assert!(PhantomData::<String>.serialize(&mut serializer).is_err());
1934
1935        assert_eq!(
1936            buffer.as_slice(),
1937            &[2, b'a', 8, b't', b'e', b's', b't', 10, 12, 3, 7, 91, 4]
1938        );
1939
1940        Ok(())
1941    }
1942
1943    #[test]
1944    fn test_serialize_record() -> TestResult {
1945        let schema = Schema::parse_str(
1946            r#"{
1947            "type": "record",
1948            "name": "TestRecord",
1949            "fields": [
1950                {"name": "stringField", "type": "string"},
1951                {"name": "intField", "type": "int"}
1952            ]
1953        }"#,
1954        )?;
1955
1956        #[derive(Serialize)]
1957        #[serde(rename_all = "camelCase")]
1958        struct GoodTestRecord {
1959            string_field: String,
1960            int_field: i32,
1961        }
1962
1963        #[derive(Serialize)]
1964        #[serde(rename_all = "camelCase")]
1965        struct BadTestRecord {
1966            foo_string_field: String,
1967            bar_int_field: i32,
1968        }
1969
1970        let mut buffer: Vec<u8> = Vec::new();
1971        let names = HashMap::new();
1972        let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None);
1973
1974        let good_record = GoodTestRecord {
1975            string_field: String::from("test"),
1976            int_field: 10,
1977        };
1978        good_record.serialize(&mut serializer)?;
1979
1980        let bad_record = BadTestRecord {
1981            foo_string_field: String::from("test"),
1982            bar_int_field: 10,
1983        };
1984        assert!(bad_record.serialize(&mut serializer).is_err());
1985
1986        assert!("".serialize(&mut serializer).is_err());
1987        assert!(Some("").serialize(&mut serializer).is_err());
1988
1989        assert_eq!(buffer.as_slice(), &[8, b't', b'e', b's', b't', 20]);
1990
1991        Ok(())
1992    }
1993
1994    #[test]
1995    fn test_serialize_empty_record() -> TestResult {
1996        let schema = Schema::parse_str(
1997            r#"{
1998            "type": "record",
1999            "name": "EmptyRecord",
2000            "fields": []
2001        }"#,
2002        )?;
2003
2004        let mut buffer: Vec<u8> = Vec::new();
2005        let names = HashMap::new();
2006        let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None);
2007
2008        #[derive(Serialize)]
2009        struct EmptyRecord;
2010        EmptyRecord.serialize(&mut serializer)?;
2011
2012        #[derive(Serialize)]
2013        struct NonEmptyRecord {
2014            foo: String,
2015        }
2016        let record = NonEmptyRecord {
2017            foo: "bar".to_string(),
2018        };
2019        match record
2020            .serialize(&mut serializer)
2021            .map_err(Error::into_details)
2022        {
2023            Err(Details::FieldName(field_name)) if field_name == "foo" => (),
2024            unexpected => panic!("Expected an error. Got: {unexpected:?}"),
2025        }
2026
2027        match ().serialize(&mut serializer).map_err(Error::into_details) {
2028            Err(Details::SerializeValueWithSchema {
2029                value_type,
2030                value,
2031                schema,
2032            }) => {
2033                assert_eq!(value_type, "none"); // serialize_unit() delegates to serialize_none()
2034                assert_eq!(value, "None. Cause: Expected: Record. Got: Null");
2035                assert_eq!(schema, schema);
2036            }
2037            unexpected => panic!("Expected an error. Got: {unexpected:?}"),
2038        }
2039
2040        assert_eq!(buffer.len(), 0);
2041
2042        Ok(())
2043    }
2044
2045    #[test]
2046    fn test_serialize_enum() -> TestResult {
2047        let schema = Schema::parse_str(
2048            r#"{
2049            "type": "enum",
2050            "name": "Suit",
2051            "symbols": ["SPADES", "HEARTS", "DIAMONDS", "CLUBS"]
2052        }"#,
2053        )?;
2054
2055        #[derive(Serialize)]
2056        enum Suit {
2057            Spades,
2058            Hearts,
2059            Diamonds,
2060            Clubs,
2061        }
2062
2063        let mut buffer: Vec<u8> = Vec::new();
2064        let names = HashMap::new();
2065        let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None);
2066
2067        Suit::Spades.serialize(&mut serializer)?;
2068        Suit::Hearts.serialize(&mut serializer)?;
2069        Suit::Diamonds.serialize(&mut serializer)?;
2070        Suit::Clubs.serialize(&mut serializer)?;
2071        match None::<()>
2072            .serialize(&mut serializer)
2073            .map_err(Error::into_details)
2074        {
2075            Err(Details::SerializeValueWithSchema {
2076                value_type,
2077                value,
2078                schema,
2079            }) => {
2080                assert_eq!(value_type, "none");
2081                assert_eq!(value, "None. Cause: Expected: Enum. Got: Null");
2082                assert_eq!(schema, schema);
2083            }
2084            unexpected => panic!("Expected an error. Got: {unexpected:?}"),
2085        }
2086
2087        assert_eq!(buffer.as_slice(), &[0, 2, 4, 6]);
2088
2089        Ok(())
2090    }
2091
2092    #[test]
2093    fn test_serialize_array() -> TestResult {
2094        let schema = Schema::parse_str(
2095            r#"{
2096            "type": "array",
2097            "items": "long"
2098        }"#,
2099        )?;
2100
2101        let mut buffer: Vec<u8> = Vec::new();
2102        let names = HashMap::new();
2103        let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None);
2104
2105        let arr: Vec<i64> = vec![10, 5, 400];
2106        arr.serialize(&mut serializer)?;
2107
2108        match vec![1_f32]
2109            .serialize(&mut serializer)
2110            .map_err(Error::into_details)
2111        {
2112            Err(Details::SerializeValueWithSchema {
2113                value_type,
2114                value,
2115                schema,
2116            }) => {
2117                assert_eq!(value_type, "f32");
2118                assert_eq!(value, "1. Cause: Expected: Long. Got: Float");
2119                assert_eq!(schema, schema);
2120            }
2121            unexpected => panic!("Expected an error. Got: {unexpected:?}"),
2122        }
2123
2124        assert_eq!(buffer.as_slice(), &[6, 20, 10, 160, 6, 0]);
2125
2126        Ok(())
2127    }
2128
2129    #[test]
2130    fn test_serialize_map() -> TestResult {
2131        let schema = Schema::parse_str(
2132            r#"{
2133            "type": "map",
2134            "values": "long"
2135        }"#,
2136        )?;
2137
2138        let mut buffer: Vec<u8> = Vec::new();
2139        let names = HashMap::new();
2140        let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None);
2141
2142        let mut map: BTreeMap<String, i64> = BTreeMap::new();
2143        map.insert(String::from("item1"), 10);
2144        map.insert(String::from("item2"), 400);
2145
2146        map.serialize(&mut serializer)?;
2147
2148        let mut map: BTreeMap<String, &str> = BTreeMap::new();
2149        map.insert(String::from("item1"), "value1");
2150        match map.serialize(&mut serializer).map_err(Error::into_details) {
2151            Err(Details::SerializeValueWithSchema {
2152                value_type,
2153                value,
2154                schema,
2155            }) => {
2156                assert_eq!(value_type, "string");
2157                assert_eq!(value, "value1. Cause: Expected: Long. Got: String");
2158                assert_eq!(schema, schema);
2159            }
2160            unexpected => panic!("Expected an error. Got: {unexpected:?}"),
2161        }
2162
2163        assert_eq!(
2164            buffer.as_slice(),
2165            &[
2166                4, 10, b'i', b't', b'e', b'm', b'1', 20, 10, b'i', b't', b'e', b'm', b'2', 160, 6,
2167                0
2168            ]
2169        );
2170
2171        Ok(())
2172    }
2173
2174    #[test]
2175    fn test_serialize_nullable_union() -> TestResult {
2176        let schema = Schema::parse_str(
2177            r#"{
2178            "type": ["null", "long"]
2179        }"#,
2180        )?;
2181
2182        #[derive(Serialize)]
2183        enum NullableLong {
2184            Null,
2185            Long(i64),
2186        }
2187
2188        let mut buffer: Vec<u8> = Vec::new();
2189        let names = HashMap::new();
2190        let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None);
2191
2192        Some(10i64).serialize(&mut serializer)?;
2193        None::<i64>.serialize(&mut serializer)?;
2194        NullableLong::Long(400).serialize(&mut serializer)?;
2195        NullableLong::Null.serialize(&mut serializer)?;
2196
2197        match "invalid"
2198            .serialize(&mut serializer)
2199            .map_err(Error::into_details)
2200        {
2201            Err(Details::SerializeValueWithSchema {
2202                value_type,
2203                value,
2204                schema,
2205            }) => {
2206                assert_eq!(value_type, "string");
2207                assert_eq!(
2208                    value,
2209                    "invalid. Cause: Expected one of the union variants [Null, Long]. Got: String"
2210                );
2211                assert_eq!(schema, schema);
2212            }
2213            unexpected => panic!("Expected an error. Got: {unexpected:?}"),
2214        }
2215
2216        assert_eq!(buffer.as_slice(), &[2, 20, 0, 2, 160, 6, 0]);
2217
2218        Ok(())
2219    }
2220
2221    #[test]
2222    fn test_serialize_union() -> TestResult {
2223        let schema = Schema::parse_str(
2224            r#"{
2225            "type": ["null", "long", "string"]
2226        }"#,
2227        )?;
2228
2229        #[derive(Serialize)]
2230        enum LongOrString {
2231            Null,
2232            Long(i64),
2233            Str(String),
2234        }
2235
2236        let mut buffer: Vec<u8> = Vec::new();
2237        let names = HashMap::new();
2238        let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None);
2239
2240        LongOrString::Null.serialize(&mut serializer)?;
2241        LongOrString::Long(400).serialize(&mut serializer)?;
2242        LongOrString::Str(String::from("test")).serialize(&mut serializer)?;
2243
2244        match 1_f64
2245            .serialize(&mut serializer)
2246            .map_err(Error::into_details)
2247        {
2248            Err(Details::SerializeValueWithSchema {
2249                value_type,
2250                value,
2251                schema,
2252            }) => {
2253                assert_eq!(value_type, "f64");
2254                assert_eq!(
2255                    value,
2256                    "1. Cause: Cannot find a Double schema in [Null, Long, String]"
2257                );
2258                assert_eq!(schema, schema);
2259            }
2260            unexpected => panic!("Expected an error. Got: {unexpected:?}"),
2261        }
2262
2263        assert_eq!(
2264            buffer.as_slice(),
2265            &[0, 2, 160, 6, 4, 8, b't', b'e', b's', b't']
2266        );
2267
2268        Ok(())
2269    }
2270
2271    #[test]
2272    fn test_serialize_fixed() -> TestResult {
2273        let schema = Schema::parse_str(
2274            r#"{
2275            "type": "fixed",
2276            "size": 8,
2277            "name": "LongVal"
2278        }"#,
2279        )?;
2280
2281        let mut buffer: Vec<u8> = Vec::new();
2282        let names = HashMap::new();
2283        let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None);
2284
2285        Bytes::new(&[10, 124, 31, 97, 14, 201, 3, 88]).serialize(&mut serializer)?;
2286
2287        // non-8 size
2288        match Bytes::new(&[123])
2289            .serialize(&mut serializer)
2290            .map_err(Error::into_details)
2291        {
2292            Err(Details::SerializeValueWithSchema {
2293                value_type,
2294                value,
2295                schema,
2296            }) => {
2297                assert_eq!(value_type, "bytes");
2298                assert_eq!(
2299                    value,
2300                    "7b. Cause: Fixed schema size (8) does not match the value length (1)"
2301                ); // Bytes represents its values as hexadecimals: '7b' is 123
2302                assert_eq!(schema, schema);
2303            }
2304            unexpected => panic!("Expected an error. Got: {unexpected:?}"),
2305        }
2306
2307        // array
2308        match [1; 8]
2309            .serialize(&mut serializer)
2310            .map_err(Error::into_details)
2311        {
2312            Err(Details::SerializeValueWithSchema {
2313                value_type,
2314                value,
2315                schema,
2316            }) => {
2317                assert_eq!(value_type, "tuple"); // TODO: why is this 'tuple' ?!
2318                assert_eq!(value, "tuple (len=8). Cause: Expected: Fixed. Got: Array");
2319                assert_eq!(schema, schema);
2320            }
2321            unexpected => panic!("Expected an error. Got: {unexpected:?}"),
2322        }
2323
2324        // slice
2325        match &[1, 2, 3, 4, 5, 6, 7, 8]
2326            .serialize(&mut serializer)
2327            .map_err(Error::into_details)
2328        {
2329            Err(Details::SerializeValueWithSchema {
2330                value_type,
2331                value,
2332                schema,
2333            }) => {
2334                assert_eq!(*value_type, "tuple"); // TODO: why is this 'tuple' ?!
2335                assert_eq!(value, "tuple (len=8). Cause: Expected: Fixed. Got: Array");
2336                assert_eq!(schema, schema);
2337            }
2338            unexpected => panic!("Expected an error. Got: {unexpected:?}"),
2339        }
2340
2341        assert_eq!(buffer.as_slice(), &[10, 124, 31, 97, 14, 201, 3, 88]);
2342
2343        Ok(())
2344    }
2345
2346    #[test]
2347    fn test_serialize_decimal_bytes() -> TestResult {
2348        let schema = Schema::parse_str(
2349            r#"{
2350            "type": "bytes",
2351            "logicalType": "decimal",
2352            "precision": 16,
2353            "scale": 2
2354        }"#,
2355        )?;
2356
2357        let mut buffer: Vec<u8> = Vec::new();
2358        let names = HashMap::new();
2359        let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None);
2360
2361        let val = Decimal::from(&[251, 155]);
2362        val.serialize(&mut serializer)?;
2363
2364        match ().serialize(&mut serializer).map_err(Error::into_details) {
2365            Err(Details::SerializeValueWithSchema {
2366                value_type,
2367                value,
2368                schema,
2369            }) => {
2370                assert_eq!(value_type, "none");
2371                assert_eq!(value, "None. Cause: Expected: Decimal. Got: Null");
2372                assert_eq!(schema, schema);
2373            }
2374            unexpected => panic!("Expected an error. Got: {unexpected:?}"),
2375        }
2376
2377        assert_eq!(buffer.as_slice(), &[4, 251, 155]);
2378
2379        Ok(())
2380    }
2381
2382    #[test]
2383    fn test_serialize_decimal_fixed() -> TestResult {
2384        let schema = Schema::parse_str(
2385            r#"{
2386            "type": "fixed",
2387            "name": "FixedDecimal",
2388            "size": 8,
2389            "logicalType": "decimal",
2390            "precision": 16,
2391            "scale": 2
2392        }"#,
2393        )?;
2394
2395        let mut buffer: Vec<u8> = Vec::new();
2396        let names = HashMap::new();
2397        let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None);
2398
2399        let val = Decimal::from(&[0, 0, 0, 0, 0, 0, 251, 155]);
2400        val.serialize(&mut serializer)?;
2401
2402        match ().serialize(&mut serializer).map_err(Error::into_details) {
2403            Err(Details::SerializeValueWithSchema {
2404                value_type,
2405                value,
2406                schema,
2407            }) => {
2408                assert_eq!(value_type, "none");
2409                assert_eq!(value, "None. Cause: Expected: Decimal. Got: Null");
2410                assert_eq!(schema, schema);
2411            }
2412            unexpected => panic!("Expected an error. Got: {unexpected:?}"),
2413        }
2414
2415        assert_eq!(buffer.as_slice(), &[0, 0, 0, 0, 0, 0, 251, 155]);
2416
2417        Ok(())
2418    }
2419
2420    #[test]
2421    #[serial(serde_is_human_readable)]
2422    fn test_serialize_bigdecimal() -> TestResult {
2423        let schema = Schema::parse_str(
2424            r#"{
2425            "type": "bytes",
2426            "logicalType": "big-decimal"
2427        }"#,
2428        )?;
2429
2430        crate::util::SERDE_HUMAN_READABLE.store(true, Ordering::Release);
2431        let mut buffer: Vec<u8> = Vec::new();
2432        let names = HashMap::new();
2433        let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None);
2434
2435        let val = BigDecimal::new(BigInt::new(Sign::Plus, vec![50024]), 2);
2436        val.serialize(&mut serializer)?;
2437
2438        assert_eq!(buffer.as_slice(), &[10, 6, 0, 195, 104, 4]);
2439
2440        Ok(())
2441    }
2442
2443    #[test]
2444    #[serial(serde_is_human_readable)]
2445    fn test_serialize_uuid() -> TestResult {
2446        let schema = Schema::parse_str(
2447            r#"{
2448            "type": "string",
2449            "logicalType": "uuid"
2450        }"#,
2451        )?;
2452
2453        crate::util::SERDE_HUMAN_READABLE.store(true, Ordering::Release);
2454        let mut buffer: Vec<u8> = Vec::new();
2455        let names = HashMap::new();
2456        let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None);
2457
2458        "8c28da81-238c-4326-bddd-4e3d00cc5099"
2459            .parse::<Uuid>()?
2460            .serialize(&mut serializer)?;
2461
2462        match 1_u8.serialize(&mut serializer).map_err(Error::into_details) {
2463            Err(Details::SerializeValueWithSchema {
2464                value_type,
2465                value,
2466                schema,
2467            }) => {
2468                assert_eq!(value_type, "u8");
2469                assert_eq!(value, "1. Cause: Expected: Uuid. Got: Int");
2470                assert_eq!(schema, schema);
2471            }
2472            unexpected => panic!("Expected an error. Got: {unexpected:?}"),
2473        }
2474
2475        assert_eq!(
2476            buffer.as_slice(),
2477            &[
2478                72, b'8', b'c', b'2', b'8', b'd', b'a', b'8', b'1', b'-', b'2', b'3', b'8', b'c',
2479                b'-', b'4', b'3', b'2', b'6', b'-', b'b', b'd', b'd', b'd', b'-', b'4', b'e', b'3',
2480                b'd', b'0', b'0', b'c', b'c', b'5', b'0', b'9', b'9'
2481            ]
2482        );
2483
2484        Ok(())
2485    }
2486
2487    #[test]
2488    fn test_serialize_date() -> TestResult {
2489        let schema = Schema::parse_str(
2490            r#"{
2491            "type": "int",
2492            "logicalType": "date"
2493        }"#,
2494        )?;
2495
2496        let mut buffer: Vec<u8> = Vec::new();
2497        let names = HashMap::new();
2498        let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None);
2499
2500        100_u8.serialize(&mut serializer)?;
2501        1000_u16.serialize(&mut serializer)?;
2502        10000_u32.serialize(&mut serializer)?;
2503        1000_i16.serialize(&mut serializer)?;
2504        10000_i32.serialize(&mut serializer)?;
2505
2506        match 10000_f32
2507            .serialize(&mut serializer)
2508            .map_err(Error::into_details)
2509        {
2510            Err(Details::SerializeValueWithSchema {
2511                value_type,
2512                value,
2513                schema,
2514            }) => {
2515                assert_eq!(value_type, "f32");
2516                assert_eq!(value, "10000. Cause: Expected: Date. Got: Float");
2517                assert_eq!(schema, schema);
2518            }
2519            unexpected => panic!("Expected an error. Got: {unexpected:?}"),
2520        }
2521
2522        assert_eq!(
2523            buffer.as_slice(),
2524            &[200, 1, 208, 15, 160, 156, 1, 208, 15, 160, 156, 1]
2525        );
2526
2527        Ok(())
2528    }
2529
2530    #[test]
2531    fn test_serialize_time_millis() -> TestResult {
2532        let schema = Schema::parse_str(
2533            r#"{
2534            "type": "int",
2535            "logicalType": "time-millis"
2536        }"#,
2537        )?;
2538
2539        let mut buffer: Vec<u8> = Vec::new();
2540        let names = HashMap::new();
2541        let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None);
2542
2543        100_u8.serialize(&mut serializer)?;
2544        1000_u16.serialize(&mut serializer)?;
2545        10000_u32.serialize(&mut serializer)?;
2546        1000_i16.serialize(&mut serializer)?;
2547        10000_i32.serialize(&mut serializer)?;
2548
2549        match 10000_f32
2550            .serialize(&mut serializer)
2551            .map_err(Error::into_details)
2552        {
2553            Err(Details::SerializeValueWithSchema {
2554                value_type,
2555                value,
2556                schema,
2557            }) => {
2558                assert_eq!(value_type, "f32");
2559                assert_eq!(value, "10000. Cause: Expected: TimeMillis. Got: Float");
2560                assert_eq!(schema, schema);
2561            }
2562            unexpected => panic!("Expected an error. Got: {unexpected:?}"),
2563        }
2564
2565        assert_eq!(
2566            buffer.as_slice(),
2567            &[200, 1, 208, 15, 160, 156, 1, 208, 15, 160, 156, 1]
2568        );
2569
2570        Ok(())
2571    }
2572
2573    #[test]
2574    fn test_serialize_time_micros() -> TestResult {
2575        let schema = Schema::parse_str(
2576            r#"{
2577            "type": "long",
2578            "logicalType": "time-micros"
2579        }"#,
2580        )?;
2581
2582        let mut buffer: Vec<u8> = Vec::new();
2583        let names = HashMap::new();
2584        let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None);
2585
2586        100_u8.serialize(&mut serializer)?;
2587        1000_u16.serialize(&mut serializer)?;
2588        10000_u32.serialize(&mut serializer)?;
2589        1000_i16.serialize(&mut serializer)?;
2590        10000_i32.serialize(&mut serializer)?;
2591        10000_i64.serialize(&mut serializer)?;
2592
2593        match 10000_f32
2594            .serialize(&mut serializer)
2595            .map_err(Error::into_details)
2596        {
2597            Err(Details::SerializeValueWithSchema {
2598                value_type,
2599                value,
2600                schema,
2601            }) => {
2602                assert_eq!(value_type, "f32");
2603                assert_eq!(value, "10000. Cause: Expected: TimeMicros. Got: Float");
2604                assert_eq!(schema, schema);
2605            }
2606            unexpected => panic!("Expected an error. Got: {unexpected:?}"),
2607        }
2608
2609        assert_eq!(
2610            buffer.as_slice(),
2611            &[
2612                200, 1, 208, 15, 160, 156, 1, 208, 15, 160, 156, 1, 160, 156, 1
2613            ]
2614        );
2615
2616        Ok(())
2617    }
2618
2619    #[test]
2620    fn test_serialize_timestamp() -> TestResult {
2621        for precision in ["millis", "micros", "nanos"] {
2622            let schema = Schema::parse_str(&format!(
2623                r#"{{
2624                "type": "long",
2625                "logicalType": "timestamp-{precision}"
2626            }}"#
2627            ))?;
2628
2629            let mut buffer: Vec<u8> = Vec::new();
2630            let names = HashMap::new();
2631            let mut serializer =
2632                SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None);
2633
2634            100_u8.serialize(&mut serializer)?;
2635            1000_u16.serialize(&mut serializer)?;
2636            10000_u32.serialize(&mut serializer)?;
2637            1000_i16.serialize(&mut serializer)?;
2638            10000_i32.serialize(&mut serializer)?;
2639            10000_i64.serialize(&mut serializer)?;
2640
2641            match 10000_f64
2642                .serialize(&mut serializer)
2643                .map_err(Error::into_details)
2644            {
2645                Err(Details::SerializeValueWithSchema {
2646                    value_type,
2647                    value,
2648                    schema,
2649                }) => {
2650                    let mut capital_precision = precision.to_string();
2651                    if let Some(c) = capital_precision.chars().next() {
2652                        capital_precision.replace_range(..1, &c.to_uppercase().to_string());
2653                    }
2654                    assert_eq!(value_type, "f64");
2655                    assert_eq!(
2656                        value,
2657                        format!(
2658                            "10000. Cause: Expected: Timestamp{capital_precision}. Got: Double"
2659                        )
2660                    );
2661                    assert_eq!(schema, schema);
2662                }
2663                unexpected => panic!("Expected an error. Got: {unexpected:?}"),
2664            }
2665
2666            assert_eq!(
2667                buffer.as_slice(),
2668                &[
2669                    200, 1, 208, 15, 160, 156, 1, 208, 15, 160, 156, 1, 160, 156, 1
2670                ]
2671            );
2672        }
2673
2674        Ok(())
2675    }
2676
2677    #[test]
2678    fn test_serialize_duration() -> TestResult {
2679        let schema = Schema::parse_str(
2680            r#"{
2681            "type": "fixed",
2682            "size": 12,
2683            "name": "duration",
2684            "logicalType": "duration"
2685        }"#,
2686        )?;
2687
2688        let mut buffer: Vec<u8> = Vec::new();
2689        let names = HashMap::new();
2690        let mut serializer = SchemaAwareWriteSerializer::new(&mut buffer, &schema, &names, None);
2691
2692        let duration_bytes =
2693            ByteArray::new(Duration::new(Months::new(3), Days::new(2), Millis::new(1200)).into());
2694        duration_bytes.serialize(&mut serializer)?;
2695
2696        match [1; 12]
2697            .serialize(&mut serializer)
2698            .map_err(Error::into_details)
2699        {
2700            Err(Details::SerializeValueWithSchema {
2701                value_type,
2702                value,
2703                schema,
2704            }) => {
2705                assert_eq!(value_type, "tuple"); // TODO: why is this 'tuple' ?!
2706                assert_eq!(
2707                    value,
2708                    "tuple (len=12). Cause: Expected: Duration. Got: Array"
2709                );
2710                assert_eq!(schema, schema);
2711            }
2712            unexpected => panic!("Expected an error. Got: {unexpected:?}"),
2713        }
2714
2715        assert_eq!(buffer.as_slice(), &[3, 0, 0, 0, 2, 0, 0, 0, 176, 4, 0, 0]);
2716
2717        Ok(())
2718    }
2719
2720    #[test]
2721    #[serial(serde_is_human_readable)] // for BigDecimal and Uuid
2722    fn test_serialize_recursive_record() -> TestResult {
2723        let schema = Schema::parse_str(
2724            r#"{
2725            "type": "record",
2726            "name": "TestRecord",
2727            "fields": [
2728                {"name": "stringField", "type": "string"},
2729                {"name": "intField", "type": "int"},
2730                {"name": "bigDecimalField", "type": {"type": "bytes", "logicalType": "big-decimal"}},
2731                {"name": "uuidField", "type": "fixed", "size": 16, "logicalType": "uuid"},
2732                {"name": "innerRecord", "type": ["null", "TestRecord"]}
2733            ]
2734        }"#,
2735        )?;
2736
2737        #[derive(Serialize)]
2738        #[serde(rename_all = "camelCase")]
2739        struct TestRecord {
2740            string_field: String,
2741            int_field: i32,
2742            big_decimal_field: BigDecimal,
2743            uuid_field: Uuid,
2744            // #[serde(skip_serializing_if = "Option::is_none")] => Never ignore None!
2745            inner_record: Option<Box<TestRecord>>,
2746        }
2747
2748        crate::util::SERDE_HUMAN_READABLE.store(true, Ordering::Release);
2749        let mut buffer: Vec<u8> = Vec::new();
2750        let rs = ResolvedSchema::try_from(&schema)?;
2751        let mut serializer =
2752            SchemaAwareWriteSerializer::new(&mut buffer, &schema, rs.get_names(), None);
2753
2754        let good_record = TestRecord {
2755            string_field: String::from("test"),
2756            int_field: 10,
2757            big_decimal_field: BigDecimal::new(BigInt::new(Sign::Plus, vec![50024]), 2),
2758            uuid_field: "8c28da81-238c-4326-bddd-4e3d00cc5098".parse::<Uuid>()?,
2759            inner_record: Some(Box::new(TestRecord {
2760                string_field: String::from("inner_test"),
2761                int_field: 100,
2762                big_decimal_field: BigDecimal::new(BigInt::new(Sign::Plus, vec![20038]), 2),
2763                uuid_field: "8c28da81-238c-4326-bddd-4e3d00cc5099".parse::<Uuid>()?,
2764                inner_record: None,
2765            })),
2766        };
2767        good_record.serialize(&mut serializer)?;
2768
2769        assert_eq!(
2770            buffer.as_slice(),
2771            &[
2772                8, 116, 101, 115, 116, 20, 10, 6, 0, 195, 104, 4, 72, 56, 99, 50, 56, 100, 97, 56,
2773                49, 45, 50, 51, 56, 99, 45, 52, 51, 50, 54, 45, 98, 100, 100, 100, 45, 52, 101, 51,
2774                100, 48, 48, 99, 99, 53, 48, 57, 56, 2, 20, 105, 110, 110, 101, 114, 95, 116, 101,
2775                115, 116, 200, 1, 8, 4, 78, 70, 4, 72, 56, 99, 50, 56, 100, 97, 56, 49, 45, 50, 51,
2776                56, 99, 45, 52, 51, 50, 54, 45, 98, 100, 100, 100, 45, 52, 101, 51, 100, 48, 48,
2777                99, 99, 53, 48, 57, 57, 0
2778            ]
2779        );
2780
2781        Ok(())
2782    }
2783}