apache_avro/writer/
datum.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use bon::bon;
19use serde::Serialize;
20use std::io::Write;
21
22use crate::{
23    AvroResult, Schema,
24    encode::encode_internal,
25    error::Details,
26    schema::{NamesRef, ResolvedSchema},
27    serde::ser_schema::SchemaAwareWriteSerializer,
28    types::Value,
29};
30
31/// Writer for writing raw Avro data.
32///
33/// This is most likely not what you need. Most users should use [`Writer`][crate::Writer],
34/// [`GenericSingleObjectWriter`][crate::GenericSingleObjectWriter], or
35/// [`SpecificSingleObjectWriter`][crate::SpecificSingleObjectWriter] instead.
36pub struct GenericDatumWriter<'s> {
37    schema: &'s Schema,
38    resolved: ResolvedSchema<'s>,
39    validate: bool,
40}
41
42#[bon]
43impl<'s> GenericDatumWriter<'s> {
44    /// Configure a new writer.
45    #[builder]
46    pub fn new(
47        /// The schema for the data that will be written
48        #[builder(start_fn)]
49        schema: &'s Schema,
50        /// Already resolved schemata that will be used to resolve references in the writer's schema.
51        ///
52        /// You can also use [`Self::schemata`] instead.
53        resolved_schemata: Option<ResolvedSchema<'s>>,
54        /// Validate values against the writer schema before writing them.
55        ///
56        /// Defaults to `true`.
57        ///
58        /// Setting this to `false` and writing values that don't match the schema will make the
59        /// written data unreadable.
60        #[builder(default = true)]
61        validate: bool,
62    ) -> AvroResult<Self> {
63        let resolved = if let Some(resolved) = resolved_schemata {
64            resolved
65        } else {
66            ResolvedSchema::try_from(schema)?
67        };
68        Ok(Self {
69            schema,
70            resolved,
71            validate,
72        })
73    }
74}
75
76impl<'s, S: generic_datum_writer_builder::State> GenericDatumWriterBuilder<'s, S> {
77    /// Set the schemata that will be used to resolve any references in the schema.
78    ///
79    /// This is equivalent to `.resolved_schemata(ResolvedSchema::new_with_schemata(schemata)?)`.
80    /// If you already have a [`ResolvedSchema`], use that function instead.
81    pub fn schemata(
82        self,
83        schemata: Vec<&'s Schema>,
84    ) -> AvroResult<
85        GenericDatumWriterBuilder<'s, generic_datum_writer_builder::SetResolvedSchemata<S>>,
86    >
87    where
88        S::ResolvedSchemata: generic_datum_writer_builder::IsUnset,
89    {
90        let resolved = ResolvedSchema::new_with_schemata(schemata)?;
91        Ok(self.resolved_schemata(resolved))
92    }
93}
94
95impl GenericDatumWriter<'_> {
96    /// Write a value to the writer.
97    pub fn write_value<W: Write, V: Into<Value>>(
98        &self,
99        writer: &mut W,
100        value: V,
101    ) -> AvroResult<usize> {
102        let value = value.into();
103        self.write_value_ref(writer, &value)
104    }
105
106    /// Write a value to the writer.
107    pub fn write_value_ref<W: Write>(&self, writer: &mut W, value: &Value) -> AvroResult<usize> {
108        if self.validate
109            && value
110                .validate_internal(self.schema, self.resolved.get_names(), None)
111                .is_some()
112        {
113            return Err(Details::Validation.into());
114        }
115        encode_internal(value, self.schema, self.resolved.get_names(), None, writer)
116    }
117
118    /// Write a value to a [`Vec`].
119    pub fn write_value_to_vec<V: Into<Value>>(&self, value: V) -> AvroResult<Vec<u8>> {
120        let mut vec = Vec::new();
121        self.write_value(&mut vec, value)?;
122        Ok(vec)
123    }
124
125    /// Serialize `T` to the writer.
126    pub fn write_ser<W: Write, T: Serialize>(
127        &self,
128        writer: &mut W,
129        value: &T,
130    ) -> AvroResult<usize> {
131        let mut serializer =
132            SchemaAwareWriteSerializer::new(writer, self.schema, self.resolved.get_names(), None);
133        value.serialize(&mut serializer)
134    }
135
136    /// Serialize `T` to a [`Vec`].
137    pub fn write_ser_to_vec<T: Serialize>(&self, value: &T) -> AvroResult<Vec<u8>> {
138        let mut vec = Vec::new();
139        self.write_ser(&mut vec, value)?;
140        Ok(vec)
141    }
142}
143
144/// Deprecated. Use [`GenericDatumWriter`] instead.
145///
146/// This is equivalent to:
147/// ```ignore
148/// GenericDatumWriter::builder(schema)
149///     .build()?
150///     .write_value_to_vec(value)
151/// ```
152///
153/// Encode a value into raw Avro data, also performs schema validation.
154///
155/// **NOTE**: This function has a quite small niche of usage and does NOT generate headers and sync
156/// markers; use [`Writer`] to be fully Avro-compatible if you don't know what
157/// you are doing, instead.
158///
159/// [`Writer`]: crate::Writer
160#[deprecated(since = "0.22.0", note = "Use GenericDatumWriter instead")]
161pub fn to_avro_datum<T: Into<Value>>(schema: &Schema, value: T) -> AvroResult<Vec<u8>> {
162    GenericDatumWriter::builder(schema)
163        .build()?
164        .write_value_to_vec(value)
165}
166
167/// Write the referenced [Serialize]able object to the provided [Write] object.
168///
169/// It is recommended to use [`GenericDatumWriter`] instead.
170///
171/// Returns a result with the number of bytes written.
172///
173/// **NOTE**: This function has a quite small niche of usage and does **NOT** generate headers and sync
174/// markers; use [`Writer::append_ser`] to be fully Avro-compatible
175/// if you don't know what you are doing, instead.
176///
177/// [`Writer::append_ser`]: crate::Writer::append_ser
178pub fn write_avro_datum_ref<T: Serialize, W: Write>(
179    schema: &Schema,
180    names: &NamesRef,
181    data: &T,
182    writer: &mut W,
183) -> AvroResult<usize> {
184    let mut serializer = SchemaAwareWriteSerializer::new(writer, schema, names, None);
185    data.serialize(&mut serializer)
186}
187
188/// Deprecated. Use [`GenericDatumWriter`] instead.
189///
190/// This is equivalent to:
191/// ```ignore
192/// GenericDatumWriter::builder(schema)
193///     .schemata(schemata)?
194///     .build()?
195///     .write_value_to_vec(value)
196/// ```
197///
198/// Encode a value into raw Avro data, also performs schema validation.
199///
200/// If the provided `schema` is incomplete then its dependencies must be
201/// provided in `schemata`
202#[deprecated(since = "0.22.0", note = "Use GenericDatumWriter instead")]
203pub fn to_avro_datum_schemata<T: Into<Value>>(
204    schema: &Schema,
205    schemata: Vec<&Schema>,
206    value: T,
207) -> AvroResult<Vec<u8>> {
208    GenericDatumWriter::builder(schema)
209        .schemata(schemata)?
210        .build()?
211        .write_value_to_vec(value)
212}
213
214#[cfg(test)]
215mod tests {
216    use apache_avro_test_helper::TestResult;
217
218    use super::*;
219    use crate::reader::datum::GenericDatumReader;
220    use crate::{
221        Days, Decimal, Duration, Millis, Months,
222        schema::{DecimalSchema, FixedSchema, InnerDecimalSchema, Name},
223        types::Record,
224        util::zig_i64,
225    };
226
227    const SCHEMA: &str = r#"
228    {
229      "type": "record",
230      "name": "test",
231      "fields": [
232        {
233          "name": "a",
234          "type": "long",
235          "default": 42
236        },
237        {
238          "name": "b",
239          "type": "string"
240        }
241      ]
242    }
243    "#;
244
245    const UNION_SCHEMA: &str = r#"["null", "long"]"#;
246
247    #[test]
248    fn test_to_avro_datum() -> TestResult {
249        let schema = Schema::parse_str(SCHEMA)?;
250        let mut record = Record::new(&schema).unwrap();
251        record.put("a", 27i64);
252        record.put("b", "foo");
253
254        let mut expected = Vec::new();
255        zig_i64(27, &mut expected)?;
256        zig_i64(3, &mut expected)?;
257        expected.extend([b'f', b'o', b'o']);
258
259        let written = GenericDatumWriter::builder(&schema)
260            .build()?
261            .write_value_to_vec(record)?;
262
263        assert_eq!(written, expected);
264
265        Ok(())
266    }
267
268    #[test]
269    fn avro_rs_193_write_avro_datum_ref() -> TestResult {
270        #[derive(Serialize)]
271        struct TestStruct {
272            a: i64,
273            b: String,
274        }
275
276        let schema = Schema::parse_str(SCHEMA)?;
277        let mut writer: Vec<u8> = Vec::new();
278        let data = TestStruct {
279            a: 27,
280            b: "foo".to_string(),
281        };
282
283        let mut expected = Vec::new();
284        zig_i64(27, &mut expected)?;
285        zig_i64(3, &mut expected)?;
286        expected.extend([b'f', b'o', b'o']);
287
288        let bytes = GenericDatumWriter::builder(&schema)
289            .build()?
290            .write_ser(&mut writer, &data)?;
291
292        assert_eq!(bytes, expected.len());
293        assert_eq!(writer, expected);
294
295        Ok(())
296    }
297
298    #[test]
299    fn test_union_not_null() -> TestResult {
300        let schema = Schema::parse_str(UNION_SCHEMA)?;
301        let union = Value::Union(1, Box::new(Value::Long(3)));
302
303        let mut expected = Vec::new();
304        zig_i64(1, &mut expected)?;
305        zig_i64(3, &mut expected)?;
306
307        let written = GenericDatumWriter::builder(&schema)
308            .build()?
309            .write_value_to_vec(union)?;
310        assert_eq!(written, expected);
311
312        Ok(())
313    }
314
315    #[test]
316    fn test_union_null() -> TestResult {
317        let schema = Schema::parse_str(UNION_SCHEMA)?;
318        let union = Value::Union(0, Box::new(Value::Null));
319
320        let mut expected = Vec::new();
321        zig_i64(0, &mut expected)?;
322
323        let written = GenericDatumWriter::builder(&schema)
324            .build()?
325            .write_value_to_vec(union)?;
326        assert_eq!(written, expected);
327
328        Ok(())
329    }
330
331    fn logical_type_test<T: Into<Value> + Clone>(
332        schema_str: &'static str,
333
334        expected_schema: &Schema,
335        value: Value,
336
337        raw_schema: &Schema,
338        raw_value: T,
339    ) -> TestResult {
340        let schema = Schema::parse_str(schema_str)?;
341        assert_eq!(&schema, expected_schema);
342        // The serialized format should be the same as the schema.
343        let ser = GenericDatumWriter::builder(&schema)
344            .build()?
345            .write_value_to_vec(value.clone())?;
346        let raw_ser = GenericDatumWriter::builder(raw_schema)
347            .build()?
348            .write_value_to_vec(raw_value)?;
349        assert_eq!(ser, raw_ser);
350
351        // Should deserialize from the schema into the logical type.
352        let mut r = ser.as_slice();
353        let de = GenericDatumReader::builder(&schema)
354            .build()?
355            .read_value(&mut r)?;
356        assert_eq!(de, value);
357        Ok(())
358    }
359
360    #[test]
361    fn date() -> TestResult {
362        logical_type_test(
363            r#"{"type": "int", "logicalType": "date"}"#,
364            &Schema::Date,
365            Value::Date(1_i32),
366            &Schema::Int,
367            1_i32,
368        )
369    }
370
371    #[test]
372    fn time_millis() -> TestResult {
373        logical_type_test(
374            r#"{"type": "int", "logicalType": "time-millis"}"#,
375            &Schema::TimeMillis,
376            Value::TimeMillis(1_i32),
377            &Schema::Int,
378            1_i32,
379        )
380    }
381
382    #[test]
383    fn time_micros() -> TestResult {
384        logical_type_test(
385            r#"{"type": "long", "logicalType": "time-micros"}"#,
386            &Schema::TimeMicros,
387            Value::TimeMicros(1_i64),
388            &Schema::Long,
389            1_i64,
390        )
391    }
392
393    #[test]
394    fn timestamp_millis() -> TestResult {
395        logical_type_test(
396            r#"{"type": "long", "logicalType": "timestamp-millis"}"#,
397            &Schema::TimestampMillis,
398            Value::TimestampMillis(1_i64),
399            &Schema::Long,
400            1_i64,
401        )
402    }
403
404    #[test]
405    fn timestamp_micros() -> TestResult {
406        logical_type_test(
407            r#"{"type": "long", "logicalType": "timestamp-micros"}"#,
408            &Schema::TimestampMicros,
409            Value::TimestampMicros(1_i64),
410            &Schema::Long,
411            1_i64,
412        )
413    }
414
415    #[test]
416    fn decimal_fixed() -> TestResult {
417        let size = 30;
418        let fixed = FixedSchema {
419            name: Name::new("decimal")?,
420            aliases: None,
421            doc: None,
422            size,
423            attributes: Default::default(),
424        };
425        let inner = InnerDecimalSchema::Fixed(fixed.clone());
426        let value = vec![0u8; size];
427        logical_type_test(
428            r#"{"type": {"type": "fixed", "size": 30, "name": "decimal"}, "logicalType": "decimal", "precision": 20, "scale": 5}"#,
429            &Schema::Decimal(DecimalSchema {
430                precision: 20,
431                scale: 5,
432                inner,
433            }),
434            Value::Decimal(Decimal::from(value.clone())),
435            &Schema::Fixed(fixed),
436            Value::Fixed(size, value),
437        )
438    }
439
440    #[test]
441    fn decimal_bytes() -> TestResult {
442        let value = vec![0u8; 10];
443        logical_type_test(
444            r#"{"type": "bytes", "logicalType": "decimal", "precision": 4, "scale": 3}"#,
445            &Schema::Decimal(DecimalSchema {
446                precision: 4,
447                scale: 3,
448                inner: InnerDecimalSchema::Bytes,
449            }),
450            Value::Decimal(Decimal::from(value.clone())),
451            &Schema::Bytes,
452            value,
453        )
454    }
455
456    #[test]
457    fn duration() -> TestResult {
458        let inner = Schema::Fixed(FixedSchema {
459            name: Name::new("duration")?,
460            aliases: None,
461            doc: None,
462            size: 12,
463            attributes: Default::default(),
464        });
465        let value = Value::Duration(Duration::new(
466            Months::new(256),
467            Days::new(512),
468            Millis::new(1024),
469        ));
470        logical_type_test(
471            r#"{"type": {"type": "fixed", "name": "duration", "size": 12}, "logicalType": "duration"}"#,
472            &Schema::Duration(FixedSchema {
473                name: Name::try_from("duration").expect("Name is valid"),
474                aliases: None,
475                doc: None,
476                size: 12,
477                attributes: Default::default(),
478            }),
479            value,
480            &inner,
481            Value::Fixed(12, vec![0, 1, 0, 0, 0, 2, 0, 0, 0, 4, 0, 0]),
482        )
483    }
484}