1use std::{borrow::Borrow, io::Write};
19
20use serde::{
21 Serialize,
22 ser::{SerializeMap, SerializeSeq},
23};
24
25use super::{Config, SchemaAwareSerializer};
26use crate::{
27 Error, Schema,
28 error::Details,
29 schema::{ArraySchema, MapSchema},
30 util::zig_i64,
31};
32
33#[expect(
34 private_interfaces,
35 reason = "Direct and Buffered should not be used directly"
36)]
37pub enum BlockSerializer<'s, 'w, W: Write, S: Borrow<Schema>> {
38 Direct(DirectBlockSerializer<'s, 'w, W, S>),
39 Buffered(BufferedBlockSerializer<'s, 'w, W, S>),
40}
41
42impl<'s, 'w, W: Write, S: Borrow<Schema>> BlockSerializer<'s, 'w, W, S> {
43 pub fn array(
44 writer: &'w mut W,
45 schema: &'s ArraySchema,
46 config: Config<'s, S>,
47 len: Option<usize>,
48 bytes_written: Option<usize>,
49 ) -> Result<Self, Error> {
50 let schema = if let Schema::Ref { name } = schema.items.as_ref() {
51 config.get_schema(name)?
52 } else {
53 &schema.items
54 };
55
56 Self::new(writer, schema, config, len, bytes_written)
57 }
58
59 pub fn map(
60 writer: &'w mut W,
61 schema: &'s MapSchema,
62 config: Config<'s, S>,
63 len: Option<usize>,
64 bytes_written: Option<usize>,
65 ) -> Result<Self, Error> {
66 let schema = if let Schema::Ref { name } = schema.types.as_ref() {
67 config.get_schema(name)?
68 } else {
69 &schema.types
70 };
71
72 Self::new(writer, schema, config, len, bytes_written)
73 }
74
75 fn new(
76 writer: &'w mut W,
77 schema: &'s Schema,
78 config: Config<'s, S>,
79 len: Option<usize>,
80 bytes_written: Option<usize>,
81 ) -> Result<Self, Error> {
82 let bytes_written = bytes_written.unwrap_or(0);
83 if let Some(len) = len
84 && config.target_block_size.is_none()
85 {
86 Ok(Self::Direct(DirectBlockSerializer::new(
87 writer,
88 schema,
89 config,
90 len,
91 bytes_written,
92 )?))
93 } else {
94 let target_block_size = config.target_block_size.unwrap_or(1024);
95 Ok(Self::Buffered(BufferedBlockSerializer::new(
96 writer,
97 schema,
98 config,
99 target_block_size,
100 bytes_written,
101 )))
102 }
103 }
104}
105
106impl<'s, 'w, W: Write, S: Borrow<Schema>> SerializeSeq for BlockSerializer<'s, 'w, W, S> {
107 type Ok = usize;
108 type Error = Error;
109
110 fn serialize_element<T>(&mut self, value: &T) -> Result<(), Self::Error>
111 where
112 T: ?Sized + Serialize,
113 {
114 match self {
115 BlockSerializer::Direct(direct) => direct.serialize_element(value),
116 BlockSerializer::Buffered(buffered) => buffered.serialize_element(value),
117 }
118 }
119
120 fn end(self) -> Result<Self::Ok, Self::Error> {
121 match self {
122 BlockSerializer::Direct(direct) => direct.end(),
123 BlockSerializer::Buffered(buffered) => buffered.end(),
124 }
125 }
126}
127
128impl<'s, 'w, W: Write, S: Borrow<Schema>> SerializeMap for BlockSerializer<'s, 'w, W, S> {
129 type Ok = usize;
130 type Error = Error;
131
132 fn serialize_key<T>(&mut self, key: &T) -> Result<(), Self::Error>
133 where
134 T: ?Sized + Serialize,
135 {
136 match self {
137 BlockSerializer::Direct(direct) => direct.serialize_key(key),
138 BlockSerializer::Buffered(buffered) => buffered.serialize_key(key),
139 }
140 }
141
142 fn serialize_value<T>(&mut self, value: &T) -> Result<(), Self::Error>
143 where
144 T: ?Sized + Serialize,
145 {
146 match self {
147 BlockSerializer::Direct(direct) => direct.serialize_value(value),
148 BlockSerializer::Buffered(buffered) => buffered.serialize_value(value),
149 }
150 }
151
152 fn end(self) -> Result<Self::Ok, Self::Error> {
153 match self {
154 BlockSerializer::Direct(direct) => direct.end(),
155 BlockSerializer::Buffered(buffered) => buffered.end(),
156 }
157 }
158}
159
160struct DirectBlockSerializer<'s, 'w, W: Write, S: Borrow<Schema>> {
161 writer: &'w mut W,
162 schema: &'s Schema,
163 config: Config<'s, S>,
164 bytes_written: usize,
165}
166
167impl<'s, 'w, W: Write, S: Borrow<Schema>> DirectBlockSerializer<'s, 'w, W, S> {
168 pub fn new(
169 writer: &'w mut W,
170 schema: &'s Schema,
171 config: Config<'s, S>,
172 len: usize,
173 mut bytes_written: usize,
174 ) -> Result<Self, Error> {
175 if len != 0 {
176 bytes_written += zig_i64(len as i64, &mut *writer)?;
179 }
180 Ok(Self {
181 writer,
182 schema,
183 config,
184 bytes_written,
185 })
186 }
187
188 fn end(self) -> Result<usize, Error> {
189 self.writer.write_all(&[0]).map_err(Details::WriteBytes)?;
191
192 Ok(self.bytes_written + 1)
193 }
194}
195
196impl<'s, 'w, W: Write, S: Borrow<Schema>> SerializeSeq for DirectBlockSerializer<'s, 'w, W, S> {
197 type Ok = usize;
198 type Error = Error;
199
200 fn serialize_element<T>(&mut self, value: &T) -> Result<(), Self::Error>
201 where
202 T: ?Sized + Serialize,
203 {
204 self.bytes_written += value.serialize(SchemaAwareSerializer::new(
205 self.writer,
206 self.schema,
207 self.config,
208 )?)?;
209 Ok(())
210 }
211
212 fn end(self) -> Result<Self::Ok, Self::Error> {
213 self.end()
214 }
215}
216
217impl<'s, 'w, W: Write, S: Borrow<Schema>> SerializeMap for DirectBlockSerializer<'s, 'w, W, S> {
218 type Ok = usize;
219 type Error = Error;
220
221 fn serialize_key<T>(&mut self, key: &T) -> Result<(), Self::Error>
222 where
223 T: ?Sized + Serialize,
224 {
225 self.bytes_written += key.serialize(SchemaAwareSerializer::new(
226 self.writer,
227 &Schema::String,
228 self.config,
229 )?)?;
230 Ok(())
231 }
232
233 fn serialize_value<T>(&mut self, value: &T) -> Result<(), Self::Error>
234 where
235 T: ?Sized + Serialize,
236 {
237 self.serialize_element(value)
238 }
239
240 fn end(self) -> Result<Self::Ok, Self::Error> {
241 self.end()
242 }
243}
244
245struct BufferedBlockSerializer<'s, 'w, W: Write, S: Borrow<Schema>> {
246 writer: &'w mut W,
247 buffer: Vec<u8>,
248 schema: &'s Schema,
249 config: Config<'s, S>,
250 bytes_written: usize,
251 items_in_buffer: i64,
252 target_block_size: usize,
253}
254
255impl<'s, 'w, W: Write, S: Borrow<Schema>> BufferedBlockSerializer<'s, 'w, W, S> {
256 pub fn new(
257 writer: &'w mut W,
258 schema: &'s Schema,
259 config: Config<'s, S>,
260 target_block_size: usize,
261 bytes_written: usize,
262 ) -> Self {
263 Self {
264 writer,
265 buffer: Vec::with_capacity(target_block_size),
266 schema,
267 config,
268 bytes_written,
269 items_in_buffer: 0,
270 target_block_size,
271 }
272 }
273
274 fn write_block(&mut self) -> Result<(), Error> {
276 self.bytes_written += zig_i64(0 - self.items_in_buffer, &mut *self.writer)?;
279 self.bytes_written += zig_i64(self.buffer.len() as i64, &mut *self.writer)?;
280
281 self.writer
283 .write_all(&self.buffer)
284 .map_err(Details::WriteBytes)?;
285 self.bytes_written += self.buffer.len();
286
287 self.items_in_buffer = 0;
289 self.buffer.clear();
290
291 Ok(())
292 }
293
294 fn end(mut self) -> Result<usize, Error> {
295 if self.items_in_buffer > 0 {
297 self.write_block()?;
298 }
299 debug_assert_eq!(self.buffer.len(), 0, "Buffer must be empty at this point");
300
301 self.writer.write_all(&[0]).map_err(Details::WriteBytes)?;
303
304 Ok(self.bytes_written + 1)
305 }
306}
307
308impl<'s, 'w, W: Write, S: Borrow<Schema>> SerializeSeq for BufferedBlockSerializer<'s, 'w, W, S> {
309 type Ok = usize;
310 type Error = Error;
311
312 fn serialize_element<T>(&mut self, value: &T) -> Result<(), Self::Error>
313 where
314 T: ?Sized + Serialize,
315 {
316 value.serialize(SchemaAwareSerializer::new(
317 &mut self.buffer,
318 self.schema,
319 self.config,
320 )?)?;
321 self.items_in_buffer += 1;
322
323 if self.buffer.len() >= self.target_block_size {
324 self.write_block()?;
325 }
326 Ok(())
327 }
328
329 fn end(self) -> Result<Self::Ok, Self::Error> {
330 self.end()
331 }
332}
333
334impl<'s, 'w, W: Write, S: Borrow<Schema>> SerializeMap for BufferedBlockSerializer<'s, 'w, W, S> {
335 type Ok = usize;
336 type Error = Error;
337
338 fn serialize_key<T>(&mut self, key: &T) -> Result<(), Self::Error>
339 where
340 T: ?Sized + Serialize,
341 {
342 key.serialize(SchemaAwareSerializer::new(
343 &mut self.buffer,
344 &Schema::String,
345 self.config,
346 )?)?;
347 Ok(())
348 }
349
350 fn serialize_value<T>(&mut self, value: &T) -> Result<(), Self::Error>
351 where
352 T: ?Sized + Serialize,
353 {
354 self.serialize_element(value)
355 }
356
357 fn end(self) -> Result<Self::Ok, Self::Error> {
358 self.end()
359 }
360}