apache_avro/writer/
single_object.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::{io::Write, marker::PhantomData, ops::RangeInclusive};
19
20use serde::Serialize;
21
22use crate::encode::encode_internal;
23use crate::serde::ser_schema::SchemaAwareWriteSerializer;
24use crate::{
25    AvroResult, AvroSchema, Schema,
26    error::Details,
27    headers::{HeaderBuilder, RabinFingerprintHeader},
28    schema::ResolvedOwnedSchema,
29    types::Value,
30};
31
32/// Writer that encodes messages according to the single object encoding v1 spec
33/// Uses an API similar to the current File Writer
34/// Writes all object bytes at once, and drains internal buffer
35pub struct GenericSingleObjectWriter {
36    buffer: Vec<u8>,
37    resolved: ResolvedOwnedSchema,
38}
39
40impl GenericSingleObjectWriter {
41    pub fn new_with_capacity(
42        schema: &Schema,
43        initial_buffer_cap: usize,
44    ) -> AvroResult<GenericSingleObjectWriter> {
45        let header_builder = RabinFingerprintHeader::from_schema(schema);
46        Self::new_with_capacity_and_header_builder(schema, initial_buffer_cap, header_builder)
47    }
48
49    pub fn new_with_capacity_and_header_builder<HB: HeaderBuilder>(
50        schema: &Schema,
51        initial_buffer_cap: usize,
52        header_builder: HB,
53    ) -> AvroResult<GenericSingleObjectWriter> {
54        let mut buffer = Vec::with_capacity(initial_buffer_cap);
55        let header = header_builder.build_header();
56        buffer.extend_from_slice(&header);
57
58        Ok(GenericSingleObjectWriter {
59            buffer,
60            resolved: ResolvedOwnedSchema::try_from(schema.clone())?,
61        })
62    }
63
64    const HEADER_LENGTH_RANGE: RangeInclusive<usize> = 10_usize..=20_usize;
65
66    /// Write the referenced Value to the provided Write object. Returns a result with the number of bytes written including the header
67    pub fn write_value_ref<W: Write>(&mut self, v: &Value, writer: &mut W) -> AvroResult<usize> {
68        let original_length = self.buffer.len();
69        if !Self::HEADER_LENGTH_RANGE.contains(&original_length) {
70            Err(Details::IllegalSingleObjectWriterState.into())
71        } else {
72            write_value_ref_owned_resolved(&self.resolved, v, &mut self.buffer)?;
73            writer
74                .write_all(&self.buffer)
75                .map_err(Details::WriteBytes)?;
76            let len = self.buffer.len();
77            self.buffer.truncate(original_length);
78            Ok(len)
79        }
80    }
81
82    /// Write the Value to the provided Write object. Returns a result with the number of bytes written including the header
83    pub fn write_value<W: Write>(&mut self, v: Value, writer: &mut W) -> AvroResult<usize> {
84        self.write_value_ref(&v, writer)
85    }
86}
87
88/// Writer that encodes messages according to the single object encoding v1 spec
89pub struct SpecificSingleObjectWriter<T>
90where
91    T: AvroSchema,
92{
93    resolved: ResolvedOwnedSchema,
94    header: Vec<u8>,
95    _model: PhantomData<T>,
96}
97
98impl<T> SpecificSingleObjectWriter<T>
99where
100    T: AvroSchema,
101{
102    pub fn new() -> AvroResult<Self> {
103        let schema = T::get_schema();
104        let header = RabinFingerprintHeader::from_schema(&schema).build_header();
105        let resolved = ResolvedOwnedSchema::new(schema)?;
106        // We don't use Self::new_with_header_builder as that would mean calling T::get_schema() twice
107        Ok(Self {
108            resolved,
109            header,
110            _model: PhantomData,
111        })
112    }
113
114    pub fn new_with_header_builder(header_builder: impl HeaderBuilder) -> AvroResult<Self> {
115        let header = header_builder.build_header();
116        let resolved = ResolvedOwnedSchema::new(T::get_schema())?;
117        Ok(Self {
118            resolved,
119            header,
120            _model: PhantomData,
121        })
122    }
123
124    /// Deprecated. Use [`SpecificSingleObjectWriter::new`] instead.
125    #[deprecated(since = "0.22.0", note = "Use new() instead")]
126    pub fn with_capacity(_buffer_cap: usize) -> AvroResult<Self> {
127        Self::new()
128    }
129}
130
131impl<T> SpecificSingleObjectWriter<T>
132where
133    T: AvroSchema + Into<Value>,
134{
135    /// Write the value to the writer
136    ///
137    /// Returns the number of bytes written.
138    ///
139    /// Each call writes a complete single-object encoded message (header + data),
140    /// making each message independently decodable.
141    pub fn write_value<W: Write>(&self, data: T, writer: &mut W) -> AvroResult<usize> {
142        writer
143            .write_all(&self.header)
144            .map_err(Details::WriteBytes)?;
145        let value: Value = data.into();
146        let bytes = write_value_ref_owned_resolved(&self.resolved, &value, writer)?;
147        Ok(bytes + self.header.len())
148    }
149}
150
151impl<T> SpecificSingleObjectWriter<T>
152where
153    T: AvroSchema + Serialize,
154{
155    /// Write the object to the writer.
156    ///
157    /// Returns the number of bytes written.
158    ///
159    /// Each call writes a complete single-object encoded message (header + data),
160    /// making each message independently decodable.
161    pub fn write_ref<W: Write>(&self, data: &T, writer: &mut W) -> AvroResult<usize> {
162        writer
163            .write_all(&self.header)
164            .map_err(Details::WriteBytes)?;
165
166        let mut serializer = SchemaAwareWriteSerializer::new(
167            writer,
168            self.resolved.get_root_schema(),
169            self.resolved.get_names(),
170            None,
171        );
172        let bytes = data.serialize(&mut serializer)?;
173
174        Ok(bytes + self.header.len())
175    }
176
177    /// Write the object to the writer.
178    ///
179    /// Returns the number of bytes written.
180    ///
181    /// Each call writes a complete single-object encoded message (header + data),
182    /// making each message independently decodable.
183    pub fn write<W: Write>(&self, data: T, writer: &mut W) -> AvroResult<usize> {
184        self.write_ref(&data, writer)
185    }
186}
187
188fn write_value_ref_owned_resolved<W: Write>(
189    resolved_schema: &ResolvedOwnedSchema,
190    value: &Value,
191    writer: &mut W,
192) -> AvroResult<usize> {
193    let root_schema = resolved_schema.get_root_schema();
194    if let Some(reason) = value.validate_internal(
195        root_schema,
196        resolved_schema.get_names(),
197        root_schema.namespace(),
198    ) {
199        return Err(Details::ValidationWithReason {
200            value: value.clone(),
201            schema: root_schema.clone(),
202            reason,
203        }
204        .into());
205    }
206    encode_internal(
207        value,
208        root_schema,
209        resolved_schema.get_names(),
210        root_schema.namespace(),
211        writer,
212    )
213}
214
215#[cfg(test)]
216mod tests {
217    use apache_avro_test_helper::TestResult;
218    use uuid::Uuid;
219
220    use crate::{encode::encode, headers::GlueSchemaUuidHeader, rabin::Rabin};
221
222    use super::*;
223
224    #[derive(Serialize, Clone)]
225    struct TestSingleObjectWriter {
226        a: i64,
227        b: f64,
228        c: Vec<String>,
229    }
230
231    impl AvroSchema for TestSingleObjectWriter {
232        fn get_schema() -> Schema {
233            let schema = r#"
234            {
235                "type":"record",
236                "name":"TestSingleObjectWrtierSerialize",
237                "fields":[
238                    {
239                        "name":"a",
240                        "type":"long"
241                    },
242                    {
243                        "name":"b",
244                        "type":"double"
245                    },
246                    {
247                        "name":"c",
248                        "type":{
249                            "type":"array",
250                            "items":"string"
251                        }
252                    }
253                ]
254            }
255            "#;
256            Schema::parse_str(schema).unwrap()
257        }
258    }
259
260    impl From<TestSingleObjectWriter> for Value {
261        fn from(obj: TestSingleObjectWriter) -> Value {
262            Value::Record(vec![
263                ("a".into(), obj.a.into()),
264                ("b".into(), obj.b.into()),
265                (
266                    "c".into(),
267                    Value::Array(obj.c.into_iter().map(|s| s.into()).collect()),
268                ),
269            ])
270        }
271    }
272
273    #[test]
274    fn test_single_object_writer() -> TestResult {
275        let mut buf: Vec<u8> = Vec::new();
276        let obj = TestSingleObjectWriter {
277            a: 300,
278            b: 34.555,
279            c: vec!["cat".into(), "dog".into()],
280        };
281        let mut writer = GenericSingleObjectWriter::new_with_capacity(
282            &TestSingleObjectWriter::get_schema(),
283            1024,
284        )
285        .expect("Should resolve schema");
286        let value = obj.into();
287        let written_bytes = writer
288            .write_value_ref(&value, &mut buf)
289            .expect("Error serializing properly");
290
291        assert!(buf.len() > 10, "no bytes written");
292        assert_eq!(buf.len(), written_bytes);
293        assert_eq!(buf[0], 0xC3);
294        assert_eq!(buf[1], 0x01);
295        assert_eq!(
296            &buf[2..10],
297            &TestSingleObjectWriter::get_schema()
298                .fingerprint::<Rabin>()
299                .bytes[..]
300        );
301        let mut msg_binary = Vec::new();
302        encode(
303            &value,
304            &TestSingleObjectWriter::get_schema(),
305            &mut msg_binary,
306        )
307        .expect("encode should have failed by here as a dependency of any writing");
308        assert_eq!(&buf[10..], &msg_binary[..]);
309
310        Ok(())
311    }
312
313    #[test]
314    fn test_single_object_writer_with_header_builder() -> TestResult {
315        let mut buf: Vec<u8> = Vec::new();
316        let obj = TestSingleObjectWriter {
317            a: 300,
318            b: 34.555,
319            c: vec!["cat".into(), "dog".into()],
320        };
321        let schema_uuid = Uuid::parse_str("b2f1cf00-0434-013e-439a-125eb8485a5f")?;
322        let header_builder = GlueSchemaUuidHeader::from_uuid(schema_uuid);
323        let mut writer = GenericSingleObjectWriter::new_with_capacity_and_header_builder(
324            &TestSingleObjectWriter::get_schema(),
325            1024,
326            header_builder,
327        )
328        .expect("Should resolve schema");
329        let value = obj.into();
330        writer
331            .write_value_ref(&value, &mut buf)
332            .expect("Error serializing properly");
333
334        assert_eq!(buf[0], 0x03);
335        assert_eq!(buf[1], 0x00);
336        assert_eq!(buf[2..18], schema_uuid.into_bytes()[..]);
337        Ok(())
338    }
339
340    #[test]
341    fn test_writer_parity() -> TestResult {
342        let obj1 = TestSingleObjectWriter {
343            a: 300,
344            b: 34.555,
345            c: vec!["cat".into(), "dog".into()],
346        };
347
348        let mut buf1: Vec<u8> = Vec::new();
349        let mut buf2: Vec<u8> = Vec::new();
350        let mut buf3: Vec<u8> = Vec::new();
351        let mut buf4: Vec<u8> = Vec::new();
352
353        let mut generic_writer = GenericSingleObjectWriter::new_with_capacity(
354            &TestSingleObjectWriter::get_schema(),
355            1024,
356        )
357        .expect("Should resolve schema");
358        let specific_writer = SpecificSingleObjectWriter::<TestSingleObjectWriter>::new()
359            .expect("Resolved should pass");
360        specific_writer
361            .write_ref(&obj1, &mut buf1)
362            .expect("Serialization expected");
363        specific_writer
364            .write_ref(&obj1, &mut buf2)
365            .expect("Serialization expected");
366        specific_writer
367            .write_value(obj1.clone(), &mut buf3)
368            .expect("Serialization expected");
369
370        generic_writer
371            .write_value(obj1.into(), &mut buf4)
372            .expect("Serialization expected");
373
374        assert_eq!(buf1, buf2);
375        assert_eq!(buf2, buf3);
376        assert_eq!(buf3, buf4);
377
378        Ok(())
379    }
380
381    #[test]
382    fn avro_rs_439_specific_single_object_writer_ref() -> TestResult {
383        #[derive(Serialize)]
384        struct Recursive {
385            field: bool,
386            recurse: Option<Box<Recursive>>,
387        }
388
389        impl AvroSchema for Recursive {
390            fn get_schema() -> Schema {
391                Schema::parse_str(
392                    r#"{
393                    "name": "Recursive",
394                    "type": "record",
395                    "fields": [
396                        { "name": "field", "type": "boolean" },
397                        { "name": "recurse", "type": ["null", "Recursive"] }
398                    ]
399                }"#,
400                )
401                .unwrap()
402            }
403        }
404
405        let mut buffer = Vec::new();
406        let writer = SpecificSingleObjectWriter::new()?;
407
408        writer.write(
409            Recursive {
410                field: true,
411                recurse: Some(Box::new(Recursive {
412                    field: false,
413                    recurse: None,
414                })),
415            },
416            &mut buffer,
417        )?;
418        assert_eq!(
419            buffer,
420            &[195, 1, 83, 223, 43, 26, 181, 179, 227, 224, 1, 2, 0, 0][..]
421        );
422
423        Ok(())
424    }
425}