Skip to main content

apache_avro/serde/ser_schema/record/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18mod field_default;
19
20use std::{borrow::Borrow, cmp::Ordering, collections::HashMap, io::Write};
21
22use serde::{
23    Serialize,
24    ser::{SerializeMap, SerializeStruct, SerializeStructVariant},
25};
26
27use super::{Config, SchemaAwareSerializer};
28use crate::{
29    Error, Schema,
30    error::Details,
31    schema::RecordSchema,
32    serde::{
33        ser_schema::record::field_default::SchemaAwareRecordFieldDefault, util::StringSerializer,
34    },
35};
36
37pub struct RecordSerializer<'s, 'w, W: Write, S: Borrow<Schema>> {
38    writer: &'w mut W,
39    record: &'s RecordSchema,
40    config: Config<'s, S>,
41    /// Cache fields received out-of-order
42    cache: HashMap<usize, Vec<u8>>,
43    /// The position of the current map entry being written
44    map_position: Option<usize>,
45    /// The field that should be written now.
46    field_position: usize,
47    bytes_written: usize,
48}
49
50impl<'s, 'w, W: Write, S: Borrow<Schema>> RecordSerializer<'s, 'w, W, S> {
51    pub fn new(
52        writer: &'w mut W,
53        record: &'s RecordSchema,
54        config: Config<'s, S>,
55        bytes_written: Option<usize>,
56    ) -> Self {
57        Self {
58            writer,
59            record,
60            config,
61            cache: HashMap::new(),
62            map_position: None,
63            field_position: 0,
64            bytes_written: bytes_written.unwrap_or(0),
65        }
66    }
67
68    fn field_error(&self, position: usize, error: Error) -> Error {
69        let field = &self.record.fields[position];
70        let error = match error.into_details() {
71            Details::SerializeValueWithSchema {
72                value_type,
73                value,
74                schema: _,
75            } => format!("Failed to serialize value of type `{value_type}`: {value}"),
76            Details::SerializeRecordFieldWithSchema {
77                field_name,
78                record_schema,
79                error,
80            } => format!(
81                "Failed to serialize field '{field_name}' of record {}: {error}",
82                record_schema.name
83            ),
84            Details::MissingDefaultForSkippedField { field_name, schema } => {
85                format!(
86                    "Missing default for skipped field '{field_name}' for record {}",
87                    schema.name
88                )
89            }
90            details => format!("{details:?}"),
91        };
92        Error::new(Details::SerializeRecordFieldWithSchema {
93            field_name: field.name.clone(),
94            record_schema: self.record.clone(),
95            error,
96        })
97    }
98
99    fn serialize_next_field<T: ?Sized + Serialize>(
100        &mut self,
101        position: usize,
102        value: &T,
103    ) -> Result<(), Error> {
104        let field = &self.record.fields[position];
105        match self.field_position.cmp(&position) {
106            Ordering::Equal => {
107                // Field received in the right order
108                self.bytes_written += value
109                    .serialize(SchemaAwareSerializer::new(
110                        self.writer,
111                        &field.schema,
112                        self.config,
113                    )?)
114                    .map_err(|e| self.field_error(self.field_position, e))?;
115                self.field_position += 1;
116
117                // Write any fields that were already received and can now be written
118                while let Some(bytes) = self.cache.remove(&self.field_position) {
119                    self.writer.write_all(&bytes).map_err(Details::WriteBytes)?;
120                    self.bytes_written += bytes.len();
121                    self.field_position += 1;
122                }
123
124                Ok(())
125            }
126            Ordering::Less => {
127                // Another field needs to be written first, so cache this field
128                let mut bytes = Vec::new();
129                value
130                    .serialize(SchemaAwareSerializer::new(
131                        &mut bytes,
132                        &field.schema,
133                        self.config,
134                    )?)
135                    .map_err(|e| self.field_error(position, e))?;
136                if self.cache.insert(position, bytes).is_some() {
137                    Err(Details::FieldNameDuplicate(field.name.clone()).into())
138                } else {
139                    Ok(())
140                }
141            }
142            Ordering::Greater => {
143                // This field is already written to the writer so we got a duplicate
144                Err(Details::FieldNameDuplicate(field.name.clone()).into())
145            }
146        }
147    }
148
149    fn serialize_default(&mut self, position: usize) -> Result<(), Error> {
150        let field = &self.record.fields[position];
151        if let Some(default) = &field.default {
152            self.serialize_next_field(
153                position,
154                &SchemaAwareRecordFieldDefault::new(default, &field.schema),
155            )
156            .map_err(|e| self.field_error(position, e))
157        } else {
158            Err(Details::MissingDefaultForSkippedField {
159                field_name: field.name.clone(),
160                schema: self.record.clone(),
161            }
162            .into())
163        }
164    }
165
166    fn end(mut self) -> Result<usize, Error> {
167        // Write any fields that were skipped by `#[serde(skip)]` or `#[serde(skip_serializing{,_if}]`
168        while self.field_position != self.record.fields.len() {
169            self.serialize_default(self.field_position)?;
170        }
171
172        Ok(self.bytes_written)
173    }
174}
175
176impl<'s, 'w, W: Write, S: Borrow<Schema>> SerializeStruct for RecordSerializer<'s, 'w, W, S> {
177    type Ok = usize;
178    type Error = Error;
179
180    fn serialize_field<T>(&mut self, key: &'static str, value: &T) -> Result<(), Self::Error>
181    where
182        T: ?Sized + Serialize,
183    {
184        if let Some(position) = self.record.lookup.get(key).copied() {
185            self.serialize_next_field(position, value)
186        } else {
187            Err(Details::GetField(key.to_string()).into())
188        }
189    }
190
191    fn skip_field(&mut self, key: &'static str) -> Result<(), Self::Error> {
192        if let Some(position) = self.record.lookup.get(key).copied() {
193            self.serialize_default(position)
194        } else {
195            Err(Details::GetField(key.to_string()).into())
196        }
197    }
198
199    fn end(self) -> Result<Self::Ok, Self::Error> {
200        self.end()
201    }
202}
203
204impl<'s, 'w, W: Write, S: Borrow<Schema>> SerializeMap for RecordSerializer<'s, 'w, W, S> {
205    type Ok = usize;
206    type Error = Error;
207
208    fn serialize_key<T>(&mut self, key: &T) -> Result<(), Self::Error>
209    where
210        T: ?Sized + Serialize,
211    {
212        let name = key.serialize(StringSerializer)?;
213        if let Some(position) = self.record.lookup.get(&name).copied() {
214            self.map_position = Some(position);
215            Ok(())
216        } else {
217            Err(Details::FieldName(name.to_string()).into())
218        }
219    }
220
221    fn serialize_value<T>(&mut self, value: &T) -> Result<(), Self::Error>
222    where
223        T: ?Sized + Serialize,
224    {
225        self.serialize_next_field(
226            self.map_position
227                .expect("serialize_value called without calling serialize_key"),
228            value,
229        )
230    }
231
232    fn end(self) -> Result<Self::Ok, Self::Error> {
233        self.end()
234    }
235
236    fn serialize_entry<K, V>(&mut self, key: &K, value: &V) -> Result<(), Self::Error>
237    where
238        K: ?Sized + Serialize,
239        V: ?Sized + Serialize,
240    {
241        let name = key.serialize(StringSerializer)?;
242        if let Some(position) = self.record.lookup.get(&name).copied() {
243            self.serialize_next_field(position, value)
244        } else {
245            Err(Details::FieldName(name.to_string()).into())
246        }
247    }
248}
249
250impl<'s, 'w, W: Write, S: Borrow<Schema>> SerializeStructVariant
251    for RecordSerializer<'s, 'w, W, S>
252{
253    type Ok = usize;
254    type Error = Error;
255
256    fn serialize_field<T>(&mut self, key: &'static str, value: &T) -> Result<(), Self::Error>
257    where
258        T: ?Sized + Serialize,
259    {
260        <Self as SerializeStruct>::serialize_field(self, key, value)
261    }
262
263    fn skip_field(&mut self, key: &'static str) -> Result<(), Self::Error> {
264        <Self as SerializeStruct>::skip_field(self, key)
265    }
266
267    fn end(self) -> Result<Self::Ok, Self::Error> {
268        self.end()
269    }
270}