Skip to main content

apache_avro/writer/
single_object.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::{io::Write, marker::PhantomData, ops::RangeInclusive};
19
20use bon::Builder;
21use serde::Serialize;
22
23use crate::Error;
24use crate::encode::encode_internal;
25use crate::serde::ser_schema::{Config, SchemaAwareSerializer};
26use crate::util::is_human_readable;
27use crate::{
28    AvroResult, AvroSchema, Schema,
29    error::Details,
30    headers::{HeaderBuilder, RabinFingerprintHeader},
31    schema::ResolvedOwnedSchema,
32    types::Value,
33};
34
35/// Writer that encodes messages according to the single object encoding v1 spec
36/// Uses an API similar to the current File Writer
37/// Writes all object bytes at once, and drains internal buffer
38pub struct GenericSingleObjectWriter {
39    buffer: Vec<u8>,
40    resolved: ResolvedOwnedSchema,
41}
42
43impl GenericSingleObjectWriter {
44    pub fn new_with_capacity(
45        schema: &Schema,
46        initial_buffer_cap: usize,
47    ) -> AvroResult<GenericSingleObjectWriter> {
48        let header_builder = RabinFingerprintHeader::from_schema(schema);
49        Self::new_with_capacity_and_header_builder(schema, initial_buffer_cap, &header_builder)
50    }
51
52    pub fn new_with_capacity_and_header_builder<HB: HeaderBuilder>(
53        schema: &Schema,
54        initial_buffer_cap: usize,
55        header_builder: &HB,
56    ) -> AvroResult<GenericSingleObjectWriter> {
57        let mut buffer = Vec::with_capacity(initial_buffer_cap);
58        let header = header_builder.build_header();
59        buffer.extend_from_slice(&header);
60
61        Ok(GenericSingleObjectWriter {
62            buffer,
63            resolved: ResolvedOwnedSchema::try_from(schema.clone())?,
64        })
65    }
66
67    const HEADER_LENGTH_RANGE: RangeInclusive<usize> = 10_usize..=20_usize;
68
69    /// Write the referenced Value to the provided Write object. Returns a result with the number of bytes written including the header
70    pub fn write_value_ref<W: Write>(&mut self, v: &Value, writer: &mut W) -> AvroResult<usize> {
71        let original_length = self.buffer.len();
72        if !Self::HEADER_LENGTH_RANGE.contains(&original_length) {
73            Err(Details::IllegalSingleObjectWriterState.into())
74        } else {
75            write_value_ref_owned_resolved(&self.resolved, v, &mut self.buffer)?;
76            writer
77                .write_all(&self.buffer)
78                .map_err(Details::WriteBytes)?;
79            let len = self.buffer.len();
80            self.buffer.truncate(original_length);
81            Ok(len)
82        }
83    }
84
85    /// Write the Value to the provided Write object. Returns a result with the number of bytes written including the header
86    #[expect(
87        clippy::needless_pass_by_value,
88        reason = "That's the whole point of this wrapper function"
89    )]
90    pub fn write_value<W: Write>(&mut self, v: Value, writer: &mut W) -> AvroResult<usize> {
91        self.write_value_ref(&v, writer)
92    }
93}
94
95/// Writer that encodes messages according to the single object encoding v1 spec
96#[derive(Builder)]
97pub struct SpecificSingleObjectWriter<T>
98where
99    T: AvroSchema,
100{
101    #[builder(
102        with = |schema: Schema| -> Result<_, Error> { ResolvedOwnedSchema::new(schema) },
103        default = ResolvedOwnedSchema::new(T::get_schema()).expect("AvroSchema implementation should create valid schemas")
104    )]
105    resolved: ResolvedOwnedSchema,
106    #[builder(
107        default = RabinFingerprintHeader::from_schema(resolved.get_root_schema()).build_header(),
108        with = |header_builder: impl HeaderBuilder| header_builder.build_header(),
109    )]
110    header: Vec<u8>,
111    /// Should [`Serialize`] implementations pick a human readable representation.
112    ///
113    /// It is recommended to set this to `false`.
114    #[builder(default = is_human_readable())]
115    human_readable: bool,
116    /// At what block size to start a new block (for arrays and maps).
117    ///
118    /// This is a minimum value, the block size will always be larger than this except for the last
119    /// block.
120    ///
121    /// When set to `None` all values will be written in a single block. This can be faster as no
122    /// intermediate buffer is used, but seeking through written data will be slower.
123    target_block_size: Option<usize>,
124    #[builder(skip)]
125    _model: PhantomData<T>,
126}
127
128impl<T> SpecificSingleObjectWriter<T>
129where
130    T: AvroSchema,
131{
132    pub fn new() -> AvroResult<Self> {
133        let schema = T::get_schema();
134        let header = RabinFingerprintHeader::from_schema(&schema).build_header();
135        let resolved = ResolvedOwnedSchema::new(schema)?;
136        // We don't use Self::new_with_header_builder as that would mean calling T::get_schema() twice
137        Ok(Self {
138            resolved,
139            header,
140            human_readable: is_human_readable(),
141            target_block_size: None,
142            _model: PhantomData,
143        })
144    }
145
146    pub fn new_with_header_builder(header_builder: &impl HeaderBuilder) -> AvroResult<Self> {
147        let header = header_builder.build_header();
148        let resolved = ResolvedOwnedSchema::new(T::get_schema())?;
149        Ok(Self {
150            resolved,
151            header,
152            human_readable: is_human_readable(),
153            target_block_size: None,
154            _model: PhantomData,
155        })
156    }
157
158    /// Deprecated. Use [`SpecificSingleObjectWriter::new`] instead.
159    #[deprecated(since = "0.22.0", note = "Use new() instead")]
160    pub fn with_capacity(_buffer_cap: usize) -> AvroResult<Self> {
161        Self::new()
162    }
163}
164
165impl<T> SpecificSingleObjectWriter<T>
166where
167    T: AvroSchema + Into<Value>,
168{
169    /// Write the value to the writer
170    ///
171    /// Returns the number of bytes written.
172    ///
173    /// Each call writes a complete single-object encoded message (header + data),
174    /// making each message independently decodable.
175    pub fn write_value<W: Write>(&self, data: T, writer: &mut W) -> AvroResult<usize> {
176        writer
177            .write_all(&self.header)
178            .map_err(Details::WriteBytes)?;
179        let value: Value = data.into();
180        let bytes = write_value_ref_owned_resolved(&self.resolved, &value, writer)?;
181        Ok(bytes + self.header.len())
182    }
183}
184
185impl<T> SpecificSingleObjectWriter<T>
186where
187    T: AvroSchema + Serialize,
188{
189    /// Write the object to the writer.
190    ///
191    /// Returns the number of bytes written.
192    ///
193    /// Each call writes a complete single-object encoded message (header + data),
194    /// making each message independently decodable.
195    pub fn write_ref<W: Write>(&self, data: &T, writer: &mut W) -> AvroResult<usize> {
196        writer
197            .write_all(&self.header)
198            .map_err(Details::WriteBytes)?;
199
200        let config = Config {
201            names: self.resolved.get_names(),
202            target_block_size: self.target_block_size,
203            human_readable: self.human_readable,
204        };
205
206        let bytes = data.serialize(SchemaAwareSerializer::new(
207            writer,
208            self.resolved.get_root_schema(),
209            config,
210        )?)?;
211
212        Ok(bytes + self.header.len())
213    }
214
215    /// Write the object to the writer.
216    ///
217    /// Returns the number of bytes written.
218    ///
219    /// Each call writes a complete single-object encoded message (header + data),
220    /// making each message independently decodable.
221    #[expect(
222        clippy::needless_pass_by_value,
223        reason = "That's the whole point of this wrapper function"
224    )]
225    pub fn write<W: Write>(&self, data: T, writer: &mut W) -> AvroResult<usize> {
226        self.write_ref(&data, writer)
227    }
228}
229
230fn write_value_ref_owned_resolved<W: Write>(
231    resolved_schema: &ResolvedOwnedSchema,
232    value: &Value,
233    writer: &mut W,
234) -> AvroResult<usize> {
235    let root_schema = resolved_schema.get_root_schema();
236    if let Some(reason) = value.validate_internal(
237        root_schema,
238        resolved_schema.get_names(),
239        root_schema.namespace(),
240    ) {
241        return Err(Details::ValidationWithReason {
242            value: value.clone(),
243            schema: root_schema.clone(),
244            reason,
245        }
246        .into());
247    }
248    encode_internal(
249        value,
250        root_schema,
251        resolved_schema.get_names(),
252        root_schema.namespace(),
253        writer,
254    )
255}
256
257#[cfg(test)]
258mod tests {
259    use apache_avro_test_helper::TestResult;
260    use uuid::Uuid;
261
262    use crate::{encode::encode, headers::GlueSchemaUuidHeader, rabin::Rabin};
263
264    use super::*;
265
266    #[derive(Serialize, Clone)]
267    struct TestSingleObjectWriter {
268        a: i64,
269        b: f64,
270        c: Vec<String>,
271    }
272
273    impl AvroSchema for TestSingleObjectWriter {
274        fn get_schema() -> Schema {
275            let schema = r#"
276            {
277                "type":"record",
278                "name":"TestSingleObjectWrtierSerialize",
279                "fields":[
280                    {
281                        "name":"a",
282                        "type":"long"
283                    },
284                    {
285                        "name":"b",
286                        "type":"double"
287                    },
288                    {
289                        "name":"c",
290                        "type":{
291                            "type":"array",
292                            "items":"string"
293                        }
294                    }
295                ]
296            }
297            "#;
298            Schema::parse_str(schema).unwrap()
299        }
300    }
301
302    impl From<TestSingleObjectWriter> for Value {
303        fn from(obj: TestSingleObjectWriter) -> Value {
304            Value::Record(vec![
305                ("a".into(), obj.a.into()),
306                ("b".into(), obj.b.into()),
307                (
308                    "c".into(),
309                    Value::Array(obj.c.into_iter().map(|s| s.into()).collect()),
310                ),
311            ])
312        }
313    }
314
315    #[test]
316    fn test_single_object_writer() -> TestResult {
317        let mut buf: Vec<u8> = Vec::new();
318        let obj = TestSingleObjectWriter {
319            a: 300,
320            b: 34.555,
321            c: vec!["cat".into(), "dog".into()],
322        };
323        let mut writer = GenericSingleObjectWriter::new_with_capacity(
324            &TestSingleObjectWriter::get_schema(),
325            1024,
326        )
327        .expect("Should resolve schema");
328        let value = obj.into();
329        let written_bytes = writer
330            .write_value_ref(&value, &mut buf)
331            .expect("Error serializing properly");
332
333        assert!(buf.len() > 10, "no bytes written");
334        assert_eq!(buf.len(), written_bytes);
335        assert_eq!(buf[0], 0xC3);
336        assert_eq!(buf[1], 0x01);
337        assert_eq!(
338            &buf[2..10],
339            &TestSingleObjectWriter::get_schema()
340                .fingerprint::<Rabin>()
341                .bytes[..]
342        );
343        let mut msg_binary = Vec::new();
344        encode(
345            &value,
346            &TestSingleObjectWriter::get_schema(),
347            &mut msg_binary,
348        )
349        .expect("encode should have failed by here as a dependency of any writing");
350        assert_eq!(&buf[10..], &msg_binary[..]);
351
352        Ok(())
353    }
354
355    #[test]
356    fn test_single_object_writer_with_header_builder() -> TestResult {
357        let mut buf: Vec<u8> = Vec::new();
358        let obj = TestSingleObjectWriter {
359            a: 300,
360            b: 34.555,
361            c: vec!["cat".into(), "dog".into()],
362        };
363        let schema_uuid = Uuid::parse_str("b2f1cf00-0434-013e-439a-125eb8485a5f")?;
364        let header_builder = GlueSchemaUuidHeader::from_uuid(schema_uuid);
365        let mut writer = GenericSingleObjectWriter::new_with_capacity_and_header_builder(
366            &TestSingleObjectWriter::get_schema(),
367            1024,
368            &header_builder,
369        )
370        .expect("Should resolve schema");
371        let value = obj.into();
372        writer
373            .write_value_ref(&value, &mut buf)
374            .expect("Error serializing properly");
375
376        assert_eq!(buf[0], 0x03);
377        assert_eq!(buf[1], 0x00);
378        assert_eq!(buf[2..18], schema_uuid.into_bytes()[..]);
379        Ok(())
380    }
381
382    #[test]
383    fn test_writer_parity() -> TestResult {
384        let obj1 = TestSingleObjectWriter {
385            a: 300,
386            b: 34.555,
387            c: vec!["cat".into(), "dog".into()],
388        };
389
390        let mut buf1: Vec<u8> = Vec::new();
391        let mut buf2: Vec<u8> = Vec::new();
392        let mut buf3: Vec<u8> = Vec::new();
393        let mut buf4: Vec<u8> = Vec::new();
394
395        let mut generic_writer = GenericSingleObjectWriter::new_with_capacity(
396            &TestSingleObjectWriter::get_schema(),
397            1024,
398        )
399        .expect("Should resolve schema");
400        let specific_writer = SpecificSingleObjectWriter::<TestSingleObjectWriter>::new()
401            .expect("Resolved should pass");
402        specific_writer
403            .write_ref(&obj1, &mut buf1)
404            .expect("Serialization expected");
405        specific_writer
406            .write_ref(&obj1, &mut buf2)
407            .expect("Serialization expected");
408        specific_writer
409            .write_value(obj1.clone(), &mut buf3)
410            .expect("Serialization expected");
411
412        generic_writer
413            .write_value(obj1.into(), &mut buf4)
414            .expect("Serialization expected");
415
416        assert_eq!(buf1, buf2);
417        assert_eq!(buf2, buf3);
418        assert_eq!(buf3, buf4);
419
420        Ok(())
421    }
422
423    #[test]
424    fn avro_rs_439_specific_single_object_writer_ref() -> TestResult {
425        #[derive(Serialize)]
426        struct Recursive {
427            field: bool,
428            recurse: Option<Box<Recursive>>,
429        }
430
431        impl AvroSchema for Recursive {
432            fn get_schema() -> Schema {
433                Schema::parse_str(
434                    r#"{
435                    "name": "Recursive",
436                    "type": "record",
437                    "fields": [
438                        { "name": "field", "type": "boolean" },
439                        { "name": "recurse", "type": ["null", "Recursive"] }
440                    ]
441                }"#,
442                )
443                .unwrap()
444            }
445        }
446
447        let mut buffer = Vec::new();
448        let writer = SpecificSingleObjectWriter::new()?;
449
450        writer.write(
451            Recursive {
452                field: true,
453                recurse: Some(Box::new(Recursive {
454                    field: false,
455                    recurse: None,
456                })),
457            },
458            &mut buffer,
459        )?;
460        assert_eq!(
461            buffer,
462            &[195, 1, 83, 223, 43, 26, 181, 179, 227, 224, 1, 2, 0, 0][..]
463        );
464
465        Ok(())
466    }
467}