Skip to main content

apache_avro/serde/ser_schema/
block.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::{borrow::Borrow, io::Write};
19
20use serde::{
21    Serialize,
22    ser::{SerializeMap, SerializeSeq},
23};
24
25use super::{Config, SchemaAwareSerializer};
26use crate::{
27    Error, Schema,
28    error::Details,
29    schema::{ArraySchema, MapSchema},
30    util::zig_i64,
31};
32
33#[expect(
34    private_interfaces,
35    reason = "Direct and Buffered should not be used directly"
36)]
37pub enum BlockSerializer<'s, 'w, W: Write, S: Borrow<Schema>> {
38    Direct(DirectBlockSerializer<'s, 'w, W, S>),
39    Buffered(BufferedBlockSerializer<'s, 'w, W, S>),
40}
41
42impl<'s, 'w, W: Write, S: Borrow<Schema>> BlockSerializer<'s, 'w, W, S> {
43    pub fn array(
44        writer: &'w mut W,
45        schema: &'s ArraySchema,
46        config: Config<'s, S>,
47        len: Option<usize>,
48        bytes_written: Option<usize>,
49    ) -> Result<Self, Error> {
50        let schema = if let Schema::Ref { name } = schema.items.as_ref() {
51            config.get_schema(name)?
52        } else {
53            &schema.items
54        };
55
56        Self::new(writer, schema, config, len, bytes_written)
57    }
58
59    pub fn map(
60        writer: &'w mut W,
61        schema: &'s MapSchema,
62        config: Config<'s, S>,
63        len: Option<usize>,
64        bytes_written: Option<usize>,
65    ) -> Result<Self, Error> {
66        let schema = if let Schema::Ref { name } = schema.types.as_ref() {
67            config.get_schema(name)?
68        } else {
69            &schema.types
70        };
71
72        Self::new(writer, schema, config, len, bytes_written)
73    }
74
75    fn new(
76        writer: &'w mut W,
77        schema: &'s Schema,
78        config: Config<'s, S>,
79        len: Option<usize>,
80        bytes_written: Option<usize>,
81    ) -> Result<Self, Error> {
82        let bytes_written = bytes_written.unwrap_or(0);
83        if let Some(len) = len
84            && config.target_block_size.is_none()
85        {
86            Ok(Self::Direct(DirectBlockSerializer::new(
87                writer,
88                schema,
89                config,
90                len,
91                bytes_written,
92            )?))
93        } else {
94            let target_block_size = config.target_block_size.unwrap_or(1024);
95            Ok(Self::Buffered(BufferedBlockSerializer::new(
96                writer,
97                schema,
98                config,
99                target_block_size,
100                bytes_written,
101            )))
102        }
103    }
104}
105
106impl<'s, 'w, W: Write, S: Borrow<Schema>> SerializeSeq for BlockSerializer<'s, 'w, W, S> {
107    type Ok = usize;
108    type Error = Error;
109
110    fn serialize_element<T>(&mut self, value: &T) -> Result<(), Self::Error>
111    where
112        T: ?Sized + Serialize,
113    {
114        match self {
115            BlockSerializer::Direct(direct) => direct.serialize_element(value),
116            BlockSerializer::Buffered(buffered) => buffered.serialize_element(value),
117        }
118    }
119
120    fn end(self) -> Result<Self::Ok, Self::Error> {
121        match self {
122            BlockSerializer::Direct(direct) => direct.end(),
123            BlockSerializer::Buffered(buffered) => buffered.end(),
124        }
125    }
126}
127
128impl<'s, 'w, W: Write, S: Borrow<Schema>> SerializeMap for BlockSerializer<'s, 'w, W, S> {
129    type Ok = usize;
130    type Error = Error;
131
132    fn serialize_key<T>(&mut self, key: &T) -> Result<(), Self::Error>
133    where
134        T: ?Sized + Serialize,
135    {
136        match self {
137            BlockSerializer::Direct(direct) => direct.serialize_key(key),
138            BlockSerializer::Buffered(buffered) => buffered.serialize_key(key),
139        }
140    }
141
142    fn serialize_value<T>(&mut self, value: &T) -> Result<(), Self::Error>
143    where
144        T: ?Sized + Serialize,
145    {
146        match self {
147            BlockSerializer::Direct(direct) => direct.serialize_value(value),
148            BlockSerializer::Buffered(buffered) => buffered.serialize_value(value),
149        }
150    }
151
152    fn end(self) -> Result<Self::Ok, Self::Error> {
153        match self {
154            BlockSerializer::Direct(direct) => direct.end(),
155            BlockSerializer::Buffered(buffered) => buffered.end(),
156        }
157    }
158}
159
160struct DirectBlockSerializer<'s, 'w, W: Write, S: Borrow<Schema>> {
161    writer: &'w mut W,
162    schema: &'s Schema,
163    config: Config<'s, S>,
164    bytes_written: usize,
165}
166
167impl<'s, 'w, W: Write, S: Borrow<Schema>> DirectBlockSerializer<'s, 'w, W, S> {
168    pub fn new(
169        writer: &'w mut W,
170        schema: &'s Schema,
171        config: Config<'s, S>,
172        len: usize,
173        mut bytes_written: usize,
174    ) -> Result<Self, Error> {
175        if len != 0 {
176            // .end() always writes the zero block, so we only write the size for arrays
177            // that have at least one element
178            bytes_written += zig_i64(len as i64, &mut *writer)?;
179        }
180        Ok(Self {
181            writer,
182            schema,
183            config,
184            bytes_written,
185        })
186    }
187
188    fn end(self) -> Result<usize, Error> {
189        // Write the zero directly instead of through zig_i64 which does a lot of extra work
190        self.writer.write_all(&[0]).map_err(Details::WriteBytes)?;
191
192        Ok(self.bytes_written + 1)
193    }
194}
195
196impl<'s, 'w, W: Write, S: Borrow<Schema>> SerializeSeq for DirectBlockSerializer<'s, 'w, W, S> {
197    type Ok = usize;
198    type Error = Error;
199
200    fn serialize_element<T>(&mut self, value: &T) -> Result<(), Self::Error>
201    where
202        T: ?Sized + Serialize,
203    {
204        self.bytes_written += value.serialize(SchemaAwareSerializer::new(
205            self.writer,
206            self.schema,
207            self.config,
208        )?)?;
209        Ok(())
210    }
211
212    fn end(self) -> Result<Self::Ok, Self::Error> {
213        self.end()
214    }
215}
216
217impl<'s, 'w, W: Write, S: Borrow<Schema>> SerializeMap for DirectBlockSerializer<'s, 'w, W, S> {
218    type Ok = usize;
219    type Error = Error;
220
221    fn serialize_key<T>(&mut self, key: &T) -> Result<(), Self::Error>
222    where
223        T: ?Sized + Serialize,
224    {
225        self.bytes_written += key.serialize(SchemaAwareSerializer::new(
226            self.writer,
227            &Schema::String,
228            self.config,
229        )?)?;
230        Ok(())
231    }
232
233    fn serialize_value<T>(&mut self, value: &T) -> Result<(), Self::Error>
234    where
235        T: ?Sized + Serialize,
236    {
237        self.serialize_element(value)
238    }
239
240    fn end(self) -> Result<Self::Ok, Self::Error> {
241        self.end()
242    }
243}
244
245struct BufferedBlockSerializer<'s, 'w, W: Write, S: Borrow<Schema>> {
246    writer: &'w mut W,
247    buffer: Vec<u8>,
248    schema: &'s Schema,
249    config: Config<'s, S>,
250    bytes_written: usize,
251    items_in_buffer: i64,
252    target_block_size: usize,
253}
254
255impl<'s, 'w, W: Write, S: Borrow<Schema>> BufferedBlockSerializer<'s, 'w, W, S> {
256    pub fn new(
257        writer: &'w mut W,
258        schema: &'s Schema,
259        config: Config<'s, S>,
260        target_block_size: usize,
261        bytes_written: usize,
262    ) -> Self {
263        Self {
264            writer,
265            buffer: Vec::with_capacity(target_block_size),
266            schema,
267            config,
268            bytes_written,
269            items_in_buffer: 0,
270            target_block_size,
271        }
272    }
273
274    /// Write a block including the items and size header.
275    fn write_block(&mut self) -> Result<(), Error> {
276        // Write the header, the negative item count indicates that the next value is the size of the
277        // block in bytes
278        self.bytes_written += zig_i64(0 - self.items_in_buffer, &mut *self.writer)?;
279        self.bytes_written += zig_i64(self.buffer.len() as i64, &mut *self.writer)?;
280
281        // Write the actual data
282        self.writer
283            .write_all(&self.buffer)
284            .map_err(Details::WriteBytes)?;
285        self.bytes_written += self.buffer.len();
286
287        // Reset the buffer
288        self.items_in_buffer = 0;
289        self.buffer.clear();
290
291        Ok(())
292    }
293
294    fn end(mut self) -> Result<usize, Error> {
295        // Write any items remaining in the buffer
296        if self.items_in_buffer > 0 {
297            self.write_block()?;
298        }
299        debug_assert_eq!(self.buffer.len(), 0, "Buffer must be empty at this point");
300
301        // Write the zero directly instead of through zig_i64 which does a lot of extra work
302        self.writer.write_all(&[0]).map_err(Details::WriteBytes)?;
303
304        Ok(self.bytes_written + 1)
305    }
306}
307
308impl<'s, 'w, W: Write, S: Borrow<Schema>> SerializeSeq for BufferedBlockSerializer<'s, 'w, W, S> {
309    type Ok = usize;
310    type Error = Error;
311
312    fn serialize_element<T>(&mut self, value: &T) -> Result<(), Self::Error>
313    where
314        T: ?Sized + Serialize,
315    {
316        value.serialize(SchemaAwareSerializer::new(
317            &mut self.buffer,
318            self.schema,
319            self.config,
320        )?)?;
321        self.items_in_buffer += 1;
322
323        if self.buffer.len() >= self.target_block_size {
324            self.write_block()?;
325        }
326        Ok(())
327    }
328
329    fn end(self) -> Result<Self::Ok, Self::Error> {
330        self.end()
331    }
332}
333
334impl<'s, 'w, W: Write, S: Borrow<Schema>> SerializeMap for BufferedBlockSerializer<'s, 'w, W, S> {
335    type Ok = usize;
336    type Error = Error;
337
338    fn serialize_key<T>(&mut self, key: &T) -> Result<(), Self::Error>
339    where
340        T: ?Sized + Serialize,
341    {
342        key.serialize(SchemaAwareSerializer::new(
343            &mut self.buffer,
344            &Schema::String,
345            self.config,
346        )?)?;
347        Ok(())
348    }
349
350    fn serialize_value<T>(&mut self, value: &T) -> Result<(), Self::Error>
351    where
352        T: ?Sized + Serialize,
353    {
354        self.serialize_element(value)
355    }
356
357    fn end(self) -> Result<Self::Ok, Self::Error> {
358        self.end()
359    }
360}