1use std::{io::Write, marker::PhantomData, ops::RangeInclusive};
19
20use bon::Builder;
21use serde::Serialize;
22
23use crate::Error;
24use crate::encode::encode_internal;
25use crate::serde::ser_schema::{Config, SchemaAwareSerializer};
26use crate::util::is_human_readable;
27use crate::{
28 AvroResult, AvroSchema, Schema,
29 error::Details,
30 headers::{HeaderBuilder, RabinFingerprintHeader},
31 schema::ResolvedOwnedSchema,
32 types::Value,
33};
34
35pub struct GenericSingleObjectWriter {
39 buffer: Vec<u8>,
40 resolved: ResolvedOwnedSchema,
41}
42
43impl GenericSingleObjectWriter {
44 pub fn new_with_capacity(
45 schema: &Schema,
46 initial_buffer_cap: usize,
47 ) -> AvroResult<GenericSingleObjectWriter> {
48 let header_builder = RabinFingerprintHeader::from_schema(schema);
49 Self::new_with_capacity_and_header_builder(schema, initial_buffer_cap, &header_builder)
50 }
51
52 pub fn new_with_capacity_and_header_builder<HB: HeaderBuilder>(
53 schema: &Schema,
54 initial_buffer_cap: usize,
55 header_builder: &HB,
56 ) -> AvroResult<GenericSingleObjectWriter> {
57 let mut buffer = Vec::with_capacity(initial_buffer_cap);
58 let header = header_builder.build_header();
59 buffer.extend_from_slice(&header);
60
61 Ok(GenericSingleObjectWriter {
62 buffer,
63 resolved: ResolvedOwnedSchema::try_from(schema.clone())?,
64 })
65 }
66
67 const HEADER_LENGTH_RANGE: RangeInclusive<usize> = 10_usize..=20_usize;
68
69 pub fn write_value_ref<W: Write>(&mut self, v: &Value, writer: &mut W) -> AvroResult<usize> {
71 let original_length = self.buffer.len();
72 if !Self::HEADER_LENGTH_RANGE.contains(&original_length) {
73 Err(Details::IllegalSingleObjectWriterState.into())
74 } else {
75 write_value_ref_owned_resolved(&self.resolved, v, &mut self.buffer)?;
76 writer
77 .write_all(&self.buffer)
78 .map_err(Details::WriteBytes)?;
79 let len = self.buffer.len();
80 self.buffer.truncate(original_length);
81 Ok(len)
82 }
83 }
84
85 #[expect(
87 clippy::needless_pass_by_value,
88 reason = "That's the whole point of this wrapper function"
89 )]
90 pub fn write_value<W: Write>(&mut self, v: Value, writer: &mut W) -> AvroResult<usize> {
91 self.write_value_ref(&v, writer)
92 }
93}
94
95#[derive(Builder)]
97pub struct SpecificSingleObjectWriter<T>
98where
99 T: AvroSchema,
100{
101 #[builder(
102 with = |schema: Schema| -> Result<_, Error> { ResolvedOwnedSchema::new(schema) },
103 default = ResolvedOwnedSchema::new(T::get_schema()).expect("AvroSchema implementation should create valid schemas")
104 )]
105 resolved: ResolvedOwnedSchema,
106 #[builder(
107 default = RabinFingerprintHeader::from_schema(resolved.get_root_schema()).build_header(),
108 with = |header_builder: impl HeaderBuilder| header_builder.build_header(),
109 )]
110 header: Vec<u8>,
111 #[builder(default = is_human_readable())]
115 human_readable: bool,
116 target_block_size: Option<usize>,
124 #[builder(skip)]
125 _model: PhantomData<T>,
126}
127
128impl<T> SpecificSingleObjectWriter<T>
129where
130 T: AvroSchema,
131{
132 pub fn new() -> AvroResult<Self> {
133 let schema = T::get_schema();
134 let header = RabinFingerprintHeader::from_schema(&schema).build_header();
135 let resolved = ResolvedOwnedSchema::new(schema)?;
136 Ok(Self {
138 resolved,
139 header,
140 human_readable: is_human_readable(),
141 target_block_size: None,
142 _model: PhantomData,
143 })
144 }
145
146 pub fn new_with_header_builder(header_builder: &impl HeaderBuilder) -> AvroResult<Self> {
147 let header = header_builder.build_header();
148 let resolved = ResolvedOwnedSchema::new(T::get_schema())?;
149 Ok(Self {
150 resolved,
151 header,
152 human_readable: is_human_readable(),
153 target_block_size: None,
154 _model: PhantomData,
155 })
156 }
157
158 #[deprecated(since = "0.22.0", note = "Use new() instead")]
160 pub fn with_capacity(_buffer_cap: usize) -> AvroResult<Self> {
161 Self::new()
162 }
163}
164
165impl<T> SpecificSingleObjectWriter<T>
166where
167 T: AvroSchema + Into<Value>,
168{
169 pub fn write_value<W: Write>(&self, data: T, writer: &mut W) -> AvroResult<usize> {
176 writer
177 .write_all(&self.header)
178 .map_err(Details::WriteBytes)?;
179 let value: Value = data.into();
180 let bytes = write_value_ref_owned_resolved(&self.resolved, &value, writer)?;
181 Ok(bytes + self.header.len())
182 }
183}
184
185impl<T> SpecificSingleObjectWriter<T>
186where
187 T: AvroSchema + Serialize,
188{
189 pub fn write_ref<W: Write>(&self, data: &T, writer: &mut W) -> AvroResult<usize> {
196 writer
197 .write_all(&self.header)
198 .map_err(Details::WriteBytes)?;
199
200 let config = Config {
201 names: self.resolved.get_names(),
202 target_block_size: self.target_block_size,
203 human_readable: self.human_readable,
204 };
205
206 let bytes = data.serialize(SchemaAwareSerializer::new(
207 writer,
208 self.resolved.get_root_schema(),
209 config,
210 )?)?;
211
212 Ok(bytes + self.header.len())
213 }
214
215 #[expect(
222 clippy::needless_pass_by_value,
223 reason = "That's the whole point of this wrapper function"
224 )]
225 pub fn write<W: Write>(&self, data: T, writer: &mut W) -> AvroResult<usize> {
226 self.write_ref(&data, writer)
227 }
228}
229
230fn write_value_ref_owned_resolved<W: Write>(
231 resolved_schema: &ResolvedOwnedSchema,
232 value: &Value,
233 writer: &mut W,
234) -> AvroResult<usize> {
235 let root_schema = resolved_schema.get_root_schema();
236 if let Some(reason) = value.validate_internal(
237 root_schema,
238 resolved_schema.get_names(),
239 root_schema.namespace(),
240 ) {
241 return Err(Details::ValidationWithReason {
242 value: value.clone(),
243 schema: root_schema.clone(),
244 reason,
245 }
246 .into());
247 }
248 encode_internal(
249 value,
250 root_schema,
251 resolved_schema.get_names(),
252 root_schema.namespace(),
253 writer,
254 )
255}
256
257#[cfg(test)]
258mod tests {
259 use apache_avro_test_helper::TestResult;
260 use uuid::Uuid;
261
262 use crate::{encode::encode, headers::GlueSchemaUuidHeader, rabin::Rabin};
263
264 use super::*;
265
266 #[derive(Serialize, Clone)]
267 struct TestSingleObjectWriter {
268 a: i64,
269 b: f64,
270 c: Vec<String>,
271 }
272
273 impl AvroSchema for TestSingleObjectWriter {
274 fn get_schema() -> Schema {
275 let schema = r#"
276 {
277 "type":"record",
278 "name":"TestSingleObjectWrtierSerialize",
279 "fields":[
280 {
281 "name":"a",
282 "type":"long"
283 },
284 {
285 "name":"b",
286 "type":"double"
287 },
288 {
289 "name":"c",
290 "type":{
291 "type":"array",
292 "items":"string"
293 }
294 }
295 ]
296 }
297 "#;
298 Schema::parse_str(schema).unwrap()
299 }
300 }
301
302 impl From<TestSingleObjectWriter> for Value {
303 fn from(obj: TestSingleObjectWriter) -> Value {
304 Value::Record(vec![
305 ("a".into(), obj.a.into()),
306 ("b".into(), obj.b.into()),
307 (
308 "c".into(),
309 Value::Array(obj.c.into_iter().map(|s| s.into()).collect()),
310 ),
311 ])
312 }
313 }
314
315 #[test]
316 fn test_single_object_writer() -> TestResult {
317 let mut buf: Vec<u8> = Vec::new();
318 let obj = TestSingleObjectWriter {
319 a: 300,
320 b: 34.555,
321 c: vec!["cat".into(), "dog".into()],
322 };
323 let mut writer = GenericSingleObjectWriter::new_with_capacity(
324 &TestSingleObjectWriter::get_schema(),
325 1024,
326 )
327 .expect("Should resolve schema");
328 let value = obj.into();
329 let written_bytes = writer
330 .write_value_ref(&value, &mut buf)
331 .expect("Error serializing properly");
332
333 assert!(buf.len() > 10, "no bytes written");
334 assert_eq!(buf.len(), written_bytes);
335 assert_eq!(buf[0], 0xC3);
336 assert_eq!(buf[1], 0x01);
337 assert_eq!(
338 &buf[2..10],
339 &TestSingleObjectWriter::get_schema()
340 .fingerprint::<Rabin>()
341 .bytes[..]
342 );
343 let mut msg_binary = Vec::new();
344 encode(
345 &value,
346 &TestSingleObjectWriter::get_schema(),
347 &mut msg_binary,
348 )
349 .expect("encode should have failed by here as a dependency of any writing");
350 assert_eq!(&buf[10..], &msg_binary[..]);
351
352 Ok(())
353 }
354
355 #[test]
356 fn test_single_object_writer_with_header_builder() -> TestResult {
357 let mut buf: Vec<u8> = Vec::new();
358 let obj = TestSingleObjectWriter {
359 a: 300,
360 b: 34.555,
361 c: vec!["cat".into(), "dog".into()],
362 };
363 let schema_uuid = Uuid::parse_str("b2f1cf00-0434-013e-439a-125eb8485a5f")?;
364 let header_builder = GlueSchemaUuidHeader::from_uuid(schema_uuid);
365 let mut writer = GenericSingleObjectWriter::new_with_capacity_and_header_builder(
366 &TestSingleObjectWriter::get_schema(),
367 1024,
368 &header_builder,
369 )
370 .expect("Should resolve schema");
371 let value = obj.into();
372 writer
373 .write_value_ref(&value, &mut buf)
374 .expect("Error serializing properly");
375
376 assert_eq!(buf[0], 0x03);
377 assert_eq!(buf[1], 0x00);
378 assert_eq!(buf[2..18], schema_uuid.into_bytes()[..]);
379 Ok(())
380 }
381
382 #[test]
383 fn test_writer_parity() -> TestResult {
384 let obj1 = TestSingleObjectWriter {
385 a: 300,
386 b: 34.555,
387 c: vec!["cat".into(), "dog".into()],
388 };
389
390 let mut buf1: Vec<u8> = Vec::new();
391 let mut buf2: Vec<u8> = Vec::new();
392 let mut buf3: Vec<u8> = Vec::new();
393 let mut buf4: Vec<u8> = Vec::new();
394
395 let mut generic_writer = GenericSingleObjectWriter::new_with_capacity(
396 &TestSingleObjectWriter::get_schema(),
397 1024,
398 )
399 .expect("Should resolve schema");
400 let specific_writer = SpecificSingleObjectWriter::<TestSingleObjectWriter>::new()
401 .expect("Resolved should pass");
402 specific_writer
403 .write_ref(&obj1, &mut buf1)
404 .expect("Serialization expected");
405 specific_writer
406 .write_ref(&obj1, &mut buf2)
407 .expect("Serialization expected");
408 specific_writer
409 .write_value(obj1.clone(), &mut buf3)
410 .expect("Serialization expected");
411
412 generic_writer
413 .write_value(obj1.into(), &mut buf4)
414 .expect("Serialization expected");
415
416 assert_eq!(buf1, buf2);
417 assert_eq!(buf2, buf3);
418 assert_eq!(buf3, buf4);
419
420 Ok(())
421 }
422
423 #[test]
424 fn avro_rs_439_specific_single_object_writer_ref() -> TestResult {
425 #[derive(Serialize)]
426 struct Recursive {
427 field: bool,
428 recurse: Option<Box<Recursive>>,
429 }
430
431 impl AvroSchema for Recursive {
432 fn get_schema() -> Schema {
433 Schema::parse_str(
434 r#"{
435 "name": "Recursive",
436 "type": "record",
437 "fields": [
438 { "name": "field", "type": "boolean" },
439 { "name": "recurse", "type": ["null", "Recursive"] }
440 ]
441 }"#,
442 )
443 .unwrap()
444 }
445 }
446
447 let mut buffer = Vec::new();
448 let writer = SpecificSingleObjectWriter::new()?;
449
450 writer.write(
451 Recursive {
452 field: true,
453 recurse: Some(Box::new(Recursive {
454 field: false,
455 recurse: None,
456 })),
457 },
458 &mut buffer,
459 )?;
460 assert_eq!(
461 buffer,
462 &[195, 1, 83, 223, 43, 26, 181, 179, 227, 224, 1, 2, 0, 0][..]
463 );
464
465 Ok(())
466 }
467}