apache_avro/serde/ser_schema/record/
mod.rs1mod field_default;
19
20use std::{borrow::Borrow, cmp::Ordering, collections::HashMap, io::Write};
21
22use serde::{
23 Serialize,
24 ser::{SerializeMap, SerializeStruct, SerializeStructVariant},
25};
26
27use super::{Config, SchemaAwareSerializer};
28use crate::{
29 Error, Schema,
30 error::Details,
31 schema::RecordSchema,
32 serde::{
33 ser_schema::record::field_default::SchemaAwareRecordFieldDefault, util::StringSerializer,
34 },
35};
36
37pub struct RecordSerializer<'s, 'w, W: Write, S: Borrow<Schema>> {
38 writer: &'w mut W,
39 record: &'s RecordSchema,
40 config: Config<'s, S>,
41 cache: HashMap<usize, Vec<u8>>,
43 map_position: Option<usize>,
45 field_position: usize,
47 bytes_written: usize,
48}
49
50impl<'s, 'w, W: Write, S: Borrow<Schema>> RecordSerializer<'s, 'w, W, S> {
51 pub fn new(
52 writer: &'w mut W,
53 record: &'s RecordSchema,
54 config: Config<'s, S>,
55 bytes_written: Option<usize>,
56 ) -> Self {
57 Self {
58 writer,
59 record,
60 config,
61 cache: HashMap::new(),
62 map_position: None,
63 field_position: 0,
64 bytes_written: bytes_written.unwrap_or(0),
65 }
66 }
67
68 fn field_error(&self, position: usize, error: Error) -> Error {
69 let field = &self.record.fields[position];
70 let error = match error.into_details() {
71 Details::SerializeValueWithSchema {
72 value_type,
73 value,
74 schema: _,
75 } => format!("Failed to serialize value of type `{value_type}`: {value}"),
76 Details::SerializeRecordFieldWithSchema {
77 field_name,
78 record_schema,
79 error,
80 } => format!(
81 "Failed to serialize field '{field_name}' of record {}: {error}",
82 record_schema.name
83 ),
84 Details::MissingDefaultForSkippedField { field_name, schema } => {
85 format!(
86 "Missing default for skipped field '{field_name}' for record {}",
87 schema.name
88 )
89 }
90 details => format!("{details:?}"),
91 };
92 Error::new(Details::SerializeRecordFieldWithSchema {
93 field_name: field.name.clone(),
94 record_schema: self.record.clone(),
95 error,
96 })
97 }
98
99 fn serialize_next_field<T: ?Sized + Serialize>(
100 &mut self,
101 position: usize,
102 value: &T,
103 ) -> Result<(), Error> {
104 let field = &self.record.fields[position];
105 match self.field_position.cmp(&position) {
106 Ordering::Equal => {
107 self.bytes_written += value
109 .serialize(SchemaAwareSerializer::new(
110 self.writer,
111 &field.schema,
112 self.config,
113 )?)
114 .map_err(|e| self.field_error(self.field_position, e))?;
115 self.field_position += 1;
116
117 while let Some(bytes) = self.cache.remove(&self.field_position) {
119 self.writer.write_all(&bytes).map_err(Details::WriteBytes)?;
120 self.bytes_written += bytes.len();
121 self.field_position += 1;
122 }
123
124 Ok(())
125 }
126 Ordering::Less => {
127 let mut bytes = Vec::new();
129 value
130 .serialize(SchemaAwareSerializer::new(
131 &mut bytes,
132 &field.schema,
133 self.config,
134 )?)
135 .map_err(|e| self.field_error(position, e))?;
136 if self.cache.insert(position, bytes).is_some() {
137 Err(Details::FieldNameDuplicate(field.name.clone()).into())
138 } else {
139 Ok(())
140 }
141 }
142 Ordering::Greater => {
143 Err(Details::FieldNameDuplicate(field.name.clone()).into())
145 }
146 }
147 }
148
149 fn serialize_default(&mut self, position: usize) -> Result<(), Error> {
150 let field = &self.record.fields[position];
151 if let Some(default) = &field.default {
152 self.serialize_next_field(
153 position,
154 &SchemaAwareRecordFieldDefault::new(default, &field.schema),
155 )
156 .map_err(|e| self.field_error(position, e))
157 } else {
158 Err(Details::MissingDefaultForSkippedField {
159 field_name: field.name.clone(),
160 schema: self.record.clone(),
161 }
162 .into())
163 }
164 }
165
166 fn end(mut self) -> Result<usize, Error> {
167 while self.field_position != self.record.fields.len() {
169 self.serialize_default(self.field_position)?;
170 }
171
172 Ok(self.bytes_written)
173 }
174}
175
176impl<'s, 'w, W: Write, S: Borrow<Schema>> SerializeStruct for RecordSerializer<'s, 'w, W, S> {
177 type Ok = usize;
178 type Error = Error;
179
180 fn serialize_field<T>(&mut self, key: &'static str, value: &T) -> Result<(), Self::Error>
181 where
182 T: ?Sized + Serialize,
183 {
184 if let Some(position) = self.record.lookup.get(key).copied() {
185 self.serialize_next_field(position, value)
186 } else {
187 Err(Details::GetField(key.to_string()).into())
188 }
189 }
190
191 fn skip_field(&mut self, key: &'static str) -> Result<(), Self::Error> {
192 if let Some(position) = self.record.lookup.get(key).copied() {
193 self.serialize_default(position)
194 } else {
195 Err(Details::GetField(key.to_string()).into())
196 }
197 }
198
199 fn end(self) -> Result<Self::Ok, Self::Error> {
200 self.end()
201 }
202}
203
204impl<'s, 'w, W: Write, S: Borrow<Schema>> SerializeMap for RecordSerializer<'s, 'w, W, S> {
205 type Ok = usize;
206 type Error = Error;
207
208 fn serialize_key<T>(&mut self, key: &T) -> Result<(), Self::Error>
209 where
210 T: ?Sized + Serialize,
211 {
212 let name = key.serialize(StringSerializer)?;
213 if let Some(position) = self.record.lookup.get(&name).copied() {
214 self.map_position = Some(position);
215 Ok(())
216 } else {
217 Err(Details::FieldName(name.to_string()).into())
218 }
219 }
220
221 fn serialize_value<T>(&mut self, value: &T) -> Result<(), Self::Error>
222 where
223 T: ?Sized + Serialize,
224 {
225 self.serialize_next_field(
226 self.map_position
227 .expect("serialize_value called without calling serialize_key"),
228 value,
229 )
230 }
231
232 fn end(self) -> Result<Self::Ok, Self::Error> {
233 self.end()
234 }
235
236 fn serialize_entry<K, V>(&mut self, key: &K, value: &V) -> Result<(), Self::Error>
237 where
238 K: ?Sized + Serialize,
239 V: ?Sized + Serialize,
240 {
241 let name = key.serialize(StringSerializer)?;
242 if let Some(position) = self.record.lookup.get(&name).copied() {
243 self.serialize_next_field(position, value)
244 } else {
245 Err(Details::FieldName(name.to_string()).into())
246 }
247 }
248}
249
250impl<'s, 'w, W: Write, S: Borrow<Schema>> SerializeStructVariant
251 for RecordSerializer<'s, 'w, W, S>
252{
253 type Ok = usize;
254 type Error = Error;
255
256 fn serialize_field<T>(&mut self, key: &'static str, value: &T) -> Result<(), Self::Error>
257 where
258 T: ?Sized + Serialize,
259 {
260 <Self as SerializeStruct>::serialize_field(self, key, value)
261 }
262
263 fn skip_field(&mut self, key: &'static str) -> Result<(), Self::Error> {
264 <Self as SerializeStruct>::skip_field(self, key)
265 }
266
267 fn end(self) -> Result<Self::Ok, Self::Error> {
268 self.end()
269 }
270}