Skip to main content

apache_avro/writer/
datum.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use bon::bon;
19use serde::Serialize;
20use std::io::Write;
21
22use crate::{
23    AvroResult, Schema,
24    encode::encode_internal,
25    error::Details,
26    schema::{NamesRef, ResolvedSchema},
27    serde::ser_schema::{Config, SchemaAwareSerializer},
28    types::Value,
29    util::is_human_readable,
30};
31
32/// Writer for writing raw Avro data.
33///
34/// This is most likely not what you need. Most users should use [`Writer`][crate::Writer],
35/// [`GenericSingleObjectWriter`][crate::GenericSingleObjectWriter], or
36/// [`SpecificSingleObjectWriter`][crate::SpecificSingleObjectWriter] instead.
37pub struct GenericDatumWriter<'s> {
38    schema: &'s Schema,
39    resolved: ResolvedSchema<'s>,
40    validate: bool,
41    human_readable: bool,
42    target_block_size: Option<usize>,
43}
44
45#[bon]
46impl<'s> GenericDatumWriter<'s> {
47    /// Configure a new writer.
48    #[builder]
49    pub fn new(
50        /// The schema for the data that will be written
51        #[builder(start_fn)]
52        schema: &'s Schema,
53        /// Already resolved schemata that will be used to resolve references in the writer's schema.
54        ///
55        /// You can also use [`Self::schemata`] instead.
56        resolved_schemata: Option<ResolvedSchema<'s>>,
57        /// Validate values against the writer schema before writing them.
58        ///
59        /// Defaults to `true`.
60        ///
61        /// Setting this to `false` and writing values that don't match the schema will make the
62        /// written data unreadable.
63        #[builder(default = true)]
64        validate: bool,
65        /// At what block size to start a new block (for arrays and maps).
66        ///
67        /// This is a minimum value, the block size will always be larger than this except for the last
68        /// block.
69        ///
70        /// When set to `None` all values will be written in a single block. This can be faster as no
71        /// intermediate buffer is used, but seeking through written data will be slower.
72        target_block_size: Option<usize>,
73        /// Should [`Serialize`] implementations pick a human readable representation.
74        ///
75        /// It is recommended to set this to `false`.
76        #[builder(default = is_human_readable())]
77        human_readable: bool,
78    ) -> AvroResult<Self> {
79        let resolved = if let Some(resolved) = resolved_schemata {
80            resolved
81        } else {
82            ResolvedSchema::try_from(schema)?
83        };
84        Ok(Self {
85            schema,
86            resolved,
87            validate,
88            human_readable,
89            target_block_size,
90        })
91    }
92}
93
94impl<'s, S: generic_datum_writer_builder::State> GenericDatumWriterBuilder<'s, S> {
95    /// Set the schemata that will be used to resolve any references in the schema.
96    ///
97    /// This is equivalent to `.resolved_schemata(ResolvedSchema::new_with_schemata(schemata)?)`.
98    /// If you already have a [`ResolvedSchema`], use that function instead.
99    pub fn schemata(
100        self,
101        schemata: Vec<&'s Schema>,
102    ) -> AvroResult<
103        GenericDatumWriterBuilder<'s, generic_datum_writer_builder::SetResolvedSchemata<S>>,
104    >
105    where
106        S::ResolvedSchemata: generic_datum_writer_builder::IsUnset,
107    {
108        let resolved = ResolvedSchema::new_with_schemata(schemata)?;
109        Ok(self.resolved_schemata(resolved))
110    }
111}
112
113impl GenericDatumWriter<'_> {
114    /// Write a value to the writer.
115    pub fn write_value<W: Write, V: Into<Value>>(
116        &self,
117        writer: &mut W,
118        value: V,
119    ) -> AvroResult<usize> {
120        let value = value.into();
121        self.write_value_ref(writer, &value)
122    }
123
124    /// Write a value to the writer.
125    pub fn write_value_ref<W: Write>(&self, writer: &mut W, value: &Value) -> AvroResult<usize> {
126        if self.validate
127            && value
128                .validate_internal(self.schema, self.resolved.get_names(), None)
129                .is_some()
130        {
131            return Err(Details::Validation.into());
132        }
133        encode_internal(value, self.schema, self.resolved.get_names(), None, writer)
134    }
135
136    /// Write a value to a [`Vec`].
137    pub fn write_value_to_vec<V: Into<Value>>(&self, value: V) -> AvroResult<Vec<u8>> {
138        let mut vec = Vec::new();
139        self.write_value(&mut vec, value)?;
140        Ok(vec)
141    }
142
143    /// Serialize `T` to the writer.
144    pub fn write_ser<W: Write, T: Serialize>(
145        &self,
146        writer: &mut W,
147        value: &T,
148    ) -> AvroResult<usize> {
149        let config = Config {
150            names: self.resolved.get_names(),
151            target_block_size: self.target_block_size,
152            human_readable: self.human_readable,
153        };
154        value.serialize(SchemaAwareSerializer::new(writer, self.schema, config)?)
155    }
156
157    /// Serialize `T` to a [`Vec`].
158    pub fn write_ser_to_vec<T: Serialize>(&self, value: &T) -> AvroResult<Vec<u8>> {
159        let mut vec = Vec::new();
160        self.write_ser(&mut vec, value)?;
161        Ok(vec)
162    }
163}
164
165/// Deprecated. Use [`GenericDatumWriter`] instead.
166///
167/// This is equivalent to:
168/// ```ignore
169/// GenericDatumWriter::builder(schema)
170///     .build()?
171///     .write_value_to_vec(value)
172/// ```
173///
174/// Encode a value into raw Avro data, also performs schema validation.
175///
176/// **NOTE**: This function has a quite small niche of usage and does NOT generate headers and sync
177/// markers; use [`Writer`] to be fully Avro-compatible if you don't know what
178/// you are doing, instead.
179///
180/// [`Writer`]: crate::Writer
181#[deprecated(since = "0.22.0", note = "Use GenericDatumWriter instead")]
182pub fn to_avro_datum<T: Into<Value>>(schema: &Schema, value: T) -> AvroResult<Vec<u8>> {
183    GenericDatumWriter::builder(schema)
184        .build()?
185        .write_value_to_vec(value)
186}
187
188/// Write the referenced [Serialize]able object to the provided [Write] object.
189///
190/// It is recommended to use [`GenericDatumWriter`] instead.
191///
192/// Returns a result with the number of bytes written.
193///
194/// **NOTE**: This function has a quite small niche of usage and does **NOT** generate headers and sync
195/// markers; use [`Writer::append_ser`] to be fully Avro-compatible
196/// if you don't know what you are doing, instead.
197///
198/// [`Writer::append_ser`]: crate::Writer::append_ser
199pub fn write_avro_datum_ref<T: Serialize, W: Write>(
200    schema: &Schema,
201    names: &NamesRef,
202    data: &T,
203    writer: &mut W,
204) -> AvroResult<usize> {
205    let config = Config {
206        names,
207        target_block_size: None,
208        human_readable: is_human_readable(),
209    };
210    data.serialize(SchemaAwareSerializer::new(writer, schema, config)?)
211}
212
213/// Deprecated. Use [`GenericDatumWriter`] instead.
214///
215/// This is equivalent to:
216/// ```ignore
217/// GenericDatumWriter::builder(schema)
218///     .schemata(schemata)?
219///     .build()?
220///     .write_value_to_vec(value)
221/// ```
222///
223/// Encode a value into raw Avro data, also performs schema validation.
224///
225/// If the provided `schema` is incomplete then its dependencies must be
226/// provided in `schemata`
227#[deprecated(since = "0.22.0", note = "Use GenericDatumWriter instead")]
228pub fn to_avro_datum_schemata<T: Into<Value>>(
229    schema: &Schema,
230    schemata: Vec<&Schema>,
231    value: T,
232) -> AvroResult<Vec<u8>> {
233    GenericDatumWriter::builder(schema)
234        .schemata(schemata)?
235        .build()?
236        .write_value_to_vec(value)
237}
238
239#[cfg(test)]
240mod tests {
241    use apache_avro_test_helper::TestResult;
242
243    use super::*;
244    use crate::reader::datum::GenericDatumReader;
245    use crate::{
246        Days, Decimal, Duration, Millis, Months,
247        schema::{DecimalSchema, FixedSchema, InnerDecimalSchema, Name},
248        types::Record,
249        util::zig_i64,
250    };
251
252    const SCHEMA: &str = r#"
253    {
254      "type": "record",
255      "name": "test",
256      "fields": [
257        {
258          "name": "a",
259          "type": "long",
260          "default": 42
261        },
262        {
263          "name": "b",
264          "type": "string"
265        }
266      ]
267    }
268    "#;
269
270    const UNION_SCHEMA: &str = r#"["null", "long"]"#;
271
272    #[test]
273    fn test_to_avro_datum() -> TestResult {
274        let schema = Schema::parse_str(SCHEMA)?;
275        let mut record = Record::new(&schema).unwrap();
276        record.put("a", 27i64);
277        record.put("b", "foo");
278
279        let mut expected = Vec::new();
280        zig_i64(27, &mut expected)?;
281        zig_i64(3, &mut expected)?;
282        expected.extend([b'f', b'o', b'o']);
283
284        let written = GenericDatumWriter::builder(&schema)
285            .build()?
286            .write_value_to_vec(record)?;
287
288        assert_eq!(written, expected);
289
290        Ok(())
291    }
292
293    #[test]
294    fn avro_rs_193_write_avro_datum_ref() -> TestResult {
295        #[derive(Serialize)]
296        struct TestStruct {
297            a: i64,
298            b: String,
299        }
300
301        let schema = Schema::parse_str(SCHEMA)?;
302        let mut writer: Vec<u8> = Vec::new();
303        let data = TestStruct {
304            a: 27,
305            b: "foo".to_string(),
306        };
307
308        let mut expected = Vec::new();
309        zig_i64(27, &mut expected)?;
310        zig_i64(3, &mut expected)?;
311        expected.extend([b'f', b'o', b'o']);
312
313        let bytes = GenericDatumWriter::builder(&schema)
314            .build()?
315            .write_ser(&mut writer, &data)?;
316
317        assert_eq!(bytes, expected.len());
318        assert_eq!(writer, expected);
319
320        Ok(())
321    }
322
323    #[test]
324    fn test_union_not_null() -> TestResult {
325        let schema = Schema::parse_str(UNION_SCHEMA)?;
326        let union = Value::Union(1, Box::new(Value::Long(3)));
327
328        let mut expected = Vec::new();
329        zig_i64(1, &mut expected)?;
330        zig_i64(3, &mut expected)?;
331
332        let written = GenericDatumWriter::builder(&schema)
333            .build()?
334            .write_value_to_vec(union)?;
335        assert_eq!(written, expected);
336
337        Ok(())
338    }
339
340    #[test]
341    fn test_union_null() -> TestResult {
342        let schema = Schema::parse_str(UNION_SCHEMA)?;
343        let union = Value::Union(0, Box::new(Value::Null));
344
345        let mut expected = Vec::new();
346        zig_i64(0, &mut expected)?;
347
348        let written = GenericDatumWriter::builder(&schema)
349            .build()?
350            .write_value_to_vec(union)?;
351        assert_eq!(written, expected);
352
353        Ok(())
354    }
355
356    #[expect(
357        clippy::needless_pass_by_value,
358        reason = "Value does not implement PartialEq<&Value>"
359    )]
360    fn logical_type_test<T: Into<Value> + Clone>(
361        schema_str: &'static str,
362
363        expected_schema: &Schema,
364        value: Value,
365
366        raw_schema: &Schema,
367        raw_value: T,
368    ) -> TestResult {
369        let schema = Schema::parse_str(schema_str)?;
370        assert_eq!(&schema, expected_schema);
371        // The serialized format should be the same as the schema.
372        let ser = GenericDatumWriter::builder(&schema)
373            .build()?
374            .write_value_to_vec(value.clone())?;
375        let raw_ser = GenericDatumWriter::builder(raw_schema)
376            .build()?
377            .write_value_to_vec(raw_value)?;
378        assert_eq!(ser, raw_ser);
379
380        // Should deserialize from the schema into the logical type.
381        let mut r = ser.as_slice();
382        let de = GenericDatumReader::builder(&schema)
383            .build()?
384            .read_value(&mut r)?;
385        assert_eq!(de, value);
386        Ok(())
387    }
388
389    #[test]
390    fn date() -> TestResult {
391        logical_type_test(
392            r#"{"type": "int", "logicalType": "date"}"#,
393            &Schema::Date,
394            Value::Date(1_i32),
395            &Schema::Int,
396            1_i32,
397        )
398    }
399
400    #[test]
401    fn time_millis() -> TestResult {
402        logical_type_test(
403            r#"{"type": "int", "logicalType": "time-millis"}"#,
404            &Schema::TimeMillis,
405            Value::TimeMillis(1_i32),
406            &Schema::Int,
407            1_i32,
408        )
409    }
410
411    #[test]
412    fn time_micros() -> TestResult {
413        logical_type_test(
414            r#"{"type": "long", "logicalType": "time-micros"}"#,
415            &Schema::TimeMicros,
416            Value::TimeMicros(1_i64),
417            &Schema::Long,
418            1_i64,
419        )
420    }
421
422    #[test]
423    fn timestamp_millis() -> TestResult {
424        logical_type_test(
425            r#"{"type": "long", "logicalType": "timestamp-millis"}"#,
426            &Schema::TimestampMillis,
427            Value::TimestampMillis(1_i64),
428            &Schema::Long,
429            1_i64,
430        )
431    }
432
433    #[test]
434    fn timestamp_micros() -> TestResult {
435        logical_type_test(
436            r#"{"type": "long", "logicalType": "timestamp-micros"}"#,
437            &Schema::TimestampMicros,
438            Value::TimestampMicros(1_i64),
439            &Schema::Long,
440            1_i64,
441        )
442    }
443
444    #[test]
445    fn decimal_fixed() -> TestResult {
446        let size = 30;
447        let fixed = FixedSchema {
448            name: Name::new("decimal")?,
449            aliases: None,
450            doc: None,
451            size,
452            attributes: Default::default(),
453        };
454        let inner = InnerDecimalSchema::Fixed(fixed.clone());
455        let value = vec![0u8; size];
456        logical_type_test(
457            r#"{"type": {"type": "fixed", "size": 30, "name": "decimal"}, "logicalType": "decimal", "precision": 20, "scale": 5}"#,
458            &Schema::Decimal(DecimalSchema {
459                precision: 20,
460                scale: 5,
461                inner,
462            }),
463            Value::Decimal(Decimal::from(value.clone())),
464            &Schema::Fixed(fixed),
465            Value::Fixed(size, value),
466        )
467    }
468
469    #[test]
470    fn decimal_bytes() -> TestResult {
471        let value = vec![0u8; 10];
472        logical_type_test(
473            r#"{"type": "bytes", "logicalType": "decimal", "precision": 4, "scale": 3}"#,
474            &Schema::Decimal(DecimalSchema {
475                precision: 4,
476                scale: 3,
477                inner: InnerDecimalSchema::Bytes,
478            }),
479            Value::Decimal(Decimal::from(value.clone())),
480            &Schema::Bytes,
481            value,
482        )
483    }
484
485    #[test]
486    fn duration() -> TestResult {
487        let inner = Schema::Fixed(FixedSchema {
488            name: Name::new("duration")?,
489            aliases: None,
490            doc: None,
491            size: 12,
492            attributes: Default::default(),
493        });
494        let value = Value::Duration(Duration::new(
495            Months::new(256),
496            Days::new(512),
497            Millis::new(1024),
498        ));
499        logical_type_test(
500            r#"{"type": {"type": "fixed", "name": "duration", "size": 12}, "logicalType": "duration"}"#,
501            &Schema::Duration(FixedSchema {
502                name: Name::try_from("duration").expect("Name is valid"),
503                aliases: None,
504                doc: None,
505                size: 12,
506                attributes: Default::default(),
507            }),
508            value,
509            &inner,
510            Value::Fixed(12, vec![0, 1, 0, 0, 0, 2, 0, 0, 0, 4, 0, 0]),
511        )
512    }
513}