1use bon::bon;
19use serde::Serialize;
20use std::io::Write;
21
22use crate::{
23 AvroResult, Schema,
24 encode::encode_internal,
25 error::Details,
26 schema::{NamesRef, ResolvedSchema},
27 serde::ser_schema::SchemaAwareWriteSerializer,
28 types::Value,
29};
30
31pub struct GenericDatumWriter<'s> {
37 schema: &'s Schema,
38 resolved: ResolvedSchema<'s>,
39 validate: bool,
40}
41
42#[bon]
43impl<'s> GenericDatumWriter<'s> {
44 #[builder]
46 pub fn new(
47 #[builder(start_fn)]
49 schema: &'s Schema,
50 resolved_schemata: Option<ResolvedSchema<'s>>,
54 #[builder(default = true)]
61 validate: bool,
62 ) -> AvroResult<Self> {
63 let resolved = if let Some(resolved) = resolved_schemata {
64 resolved
65 } else {
66 ResolvedSchema::try_from(schema)?
67 };
68 Ok(Self {
69 schema,
70 resolved,
71 validate,
72 })
73 }
74}
75
76impl<'s, S: generic_datum_writer_builder::State> GenericDatumWriterBuilder<'s, S> {
77 pub fn schemata(
82 self,
83 schemata: Vec<&'s Schema>,
84 ) -> AvroResult<
85 GenericDatumWriterBuilder<'s, generic_datum_writer_builder::SetResolvedSchemata<S>>,
86 >
87 where
88 S::ResolvedSchemata: generic_datum_writer_builder::IsUnset,
89 {
90 let resolved = ResolvedSchema::new_with_schemata(schemata)?;
91 Ok(self.resolved_schemata(resolved))
92 }
93}
94
95impl GenericDatumWriter<'_> {
96 pub fn write_value<W: Write, V: Into<Value>>(
98 &self,
99 writer: &mut W,
100 value: V,
101 ) -> AvroResult<usize> {
102 let value = value.into();
103 self.write_value_ref(writer, &value)
104 }
105
106 pub fn write_value_ref<W: Write>(&self, writer: &mut W, value: &Value) -> AvroResult<usize> {
108 if self.validate
109 && value
110 .validate_internal(self.schema, self.resolved.get_names(), None)
111 .is_some()
112 {
113 return Err(Details::Validation.into());
114 }
115 encode_internal(value, self.schema, self.resolved.get_names(), None, writer)
116 }
117
118 pub fn write_value_to_vec<V: Into<Value>>(&self, value: V) -> AvroResult<Vec<u8>> {
120 let mut vec = Vec::new();
121 self.write_value(&mut vec, value)?;
122 Ok(vec)
123 }
124
125 pub fn write_ser<W: Write, T: Serialize>(
127 &self,
128 writer: &mut W,
129 value: &T,
130 ) -> AvroResult<usize> {
131 let mut serializer =
132 SchemaAwareWriteSerializer::new(writer, self.schema, self.resolved.get_names(), None);
133 value.serialize(&mut serializer)
134 }
135
136 pub fn write_ser_to_vec<T: Serialize>(&self, value: &T) -> AvroResult<Vec<u8>> {
138 let mut vec = Vec::new();
139 self.write_ser(&mut vec, value)?;
140 Ok(vec)
141 }
142}
143
144#[deprecated(since = "0.22.0", note = "Use GenericDatumWriter instead")]
161pub fn to_avro_datum<T: Into<Value>>(schema: &Schema, value: T) -> AvroResult<Vec<u8>> {
162 GenericDatumWriter::builder(schema)
163 .build()?
164 .write_value_to_vec(value)
165}
166
167pub fn write_avro_datum_ref<T: Serialize, W: Write>(
179 schema: &Schema,
180 names: &NamesRef,
181 data: &T,
182 writer: &mut W,
183) -> AvroResult<usize> {
184 let mut serializer = SchemaAwareWriteSerializer::new(writer, schema, names, None);
185 data.serialize(&mut serializer)
186}
187
188#[deprecated(since = "0.22.0", note = "Use GenericDatumWriter instead")]
203pub fn to_avro_datum_schemata<T: Into<Value>>(
204 schema: &Schema,
205 schemata: Vec<&Schema>,
206 value: T,
207) -> AvroResult<Vec<u8>> {
208 GenericDatumWriter::builder(schema)
209 .schemata(schemata)?
210 .build()?
211 .write_value_to_vec(value)
212}
213
214#[cfg(test)]
215mod tests {
216 use apache_avro_test_helper::TestResult;
217
218 use super::*;
219 use crate::reader::datum::GenericDatumReader;
220 use crate::{
221 Days, Decimal, Duration, Millis, Months,
222 schema::{DecimalSchema, FixedSchema, InnerDecimalSchema, Name},
223 types::Record,
224 util::zig_i64,
225 };
226
227 const SCHEMA: &str = r#"
228 {
229 "type": "record",
230 "name": "test",
231 "fields": [
232 {
233 "name": "a",
234 "type": "long",
235 "default": 42
236 },
237 {
238 "name": "b",
239 "type": "string"
240 }
241 ]
242 }
243 "#;
244
245 const UNION_SCHEMA: &str = r#"["null", "long"]"#;
246
247 #[test]
248 fn test_to_avro_datum() -> TestResult {
249 let schema = Schema::parse_str(SCHEMA)?;
250 let mut record = Record::new(&schema).unwrap();
251 record.put("a", 27i64);
252 record.put("b", "foo");
253
254 let mut expected = Vec::new();
255 zig_i64(27, &mut expected)?;
256 zig_i64(3, &mut expected)?;
257 expected.extend([b'f', b'o', b'o']);
258
259 let written = GenericDatumWriter::builder(&schema)
260 .build()?
261 .write_value_to_vec(record)?;
262
263 assert_eq!(written, expected);
264
265 Ok(())
266 }
267
268 #[test]
269 fn avro_rs_193_write_avro_datum_ref() -> TestResult {
270 #[derive(Serialize)]
271 struct TestStruct {
272 a: i64,
273 b: String,
274 }
275
276 let schema = Schema::parse_str(SCHEMA)?;
277 let mut writer: Vec<u8> = Vec::new();
278 let data = TestStruct {
279 a: 27,
280 b: "foo".to_string(),
281 };
282
283 let mut expected = Vec::new();
284 zig_i64(27, &mut expected)?;
285 zig_i64(3, &mut expected)?;
286 expected.extend([b'f', b'o', b'o']);
287
288 let bytes = GenericDatumWriter::builder(&schema)
289 .build()?
290 .write_ser(&mut writer, &data)?;
291
292 assert_eq!(bytes, expected.len());
293 assert_eq!(writer, expected);
294
295 Ok(())
296 }
297
298 #[test]
299 fn test_union_not_null() -> TestResult {
300 let schema = Schema::parse_str(UNION_SCHEMA)?;
301 let union = Value::Union(1, Box::new(Value::Long(3)));
302
303 let mut expected = Vec::new();
304 zig_i64(1, &mut expected)?;
305 zig_i64(3, &mut expected)?;
306
307 let written = GenericDatumWriter::builder(&schema)
308 .build()?
309 .write_value_to_vec(union)?;
310 assert_eq!(written, expected);
311
312 Ok(())
313 }
314
315 #[test]
316 fn test_union_null() -> TestResult {
317 let schema = Schema::parse_str(UNION_SCHEMA)?;
318 let union = Value::Union(0, Box::new(Value::Null));
319
320 let mut expected = Vec::new();
321 zig_i64(0, &mut expected)?;
322
323 let written = GenericDatumWriter::builder(&schema)
324 .build()?
325 .write_value_to_vec(union)?;
326 assert_eq!(written, expected);
327
328 Ok(())
329 }
330
331 fn logical_type_test<T: Into<Value> + Clone>(
332 schema_str: &'static str,
333
334 expected_schema: &Schema,
335 value: Value,
336
337 raw_schema: &Schema,
338 raw_value: T,
339 ) -> TestResult {
340 let schema = Schema::parse_str(schema_str)?;
341 assert_eq!(&schema, expected_schema);
342 let ser = GenericDatumWriter::builder(&schema)
344 .build()?
345 .write_value_to_vec(value.clone())?;
346 let raw_ser = GenericDatumWriter::builder(raw_schema)
347 .build()?
348 .write_value_to_vec(raw_value)?;
349 assert_eq!(ser, raw_ser);
350
351 let mut r = ser.as_slice();
353 let de = GenericDatumReader::builder(&schema)
354 .build()?
355 .read_value(&mut r)?;
356 assert_eq!(de, value);
357 Ok(())
358 }
359
360 #[test]
361 fn date() -> TestResult {
362 logical_type_test(
363 r#"{"type": "int", "logicalType": "date"}"#,
364 &Schema::Date,
365 Value::Date(1_i32),
366 &Schema::Int,
367 1_i32,
368 )
369 }
370
371 #[test]
372 fn time_millis() -> TestResult {
373 logical_type_test(
374 r#"{"type": "int", "logicalType": "time-millis"}"#,
375 &Schema::TimeMillis,
376 Value::TimeMillis(1_i32),
377 &Schema::Int,
378 1_i32,
379 )
380 }
381
382 #[test]
383 fn time_micros() -> TestResult {
384 logical_type_test(
385 r#"{"type": "long", "logicalType": "time-micros"}"#,
386 &Schema::TimeMicros,
387 Value::TimeMicros(1_i64),
388 &Schema::Long,
389 1_i64,
390 )
391 }
392
393 #[test]
394 fn timestamp_millis() -> TestResult {
395 logical_type_test(
396 r#"{"type": "long", "logicalType": "timestamp-millis"}"#,
397 &Schema::TimestampMillis,
398 Value::TimestampMillis(1_i64),
399 &Schema::Long,
400 1_i64,
401 )
402 }
403
404 #[test]
405 fn timestamp_micros() -> TestResult {
406 logical_type_test(
407 r#"{"type": "long", "logicalType": "timestamp-micros"}"#,
408 &Schema::TimestampMicros,
409 Value::TimestampMicros(1_i64),
410 &Schema::Long,
411 1_i64,
412 )
413 }
414
415 #[test]
416 fn decimal_fixed() -> TestResult {
417 let size = 30;
418 let fixed = FixedSchema {
419 name: Name::new("decimal")?,
420 aliases: None,
421 doc: None,
422 size,
423 attributes: Default::default(),
424 };
425 let inner = InnerDecimalSchema::Fixed(fixed.clone());
426 let value = vec![0u8; size];
427 logical_type_test(
428 r#"{"type": {"type": "fixed", "size": 30, "name": "decimal"}, "logicalType": "decimal", "precision": 20, "scale": 5}"#,
429 &Schema::Decimal(DecimalSchema {
430 precision: 20,
431 scale: 5,
432 inner,
433 }),
434 Value::Decimal(Decimal::from(value.clone())),
435 &Schema::Fixed(fixed),
436 Value::Fixed(size, value),
437 )
438 }
439
440 #[test]
441 fn decimal_bytes() -> TestResult {
442 let value = vec![0u8; 10];
443 logical_type_test(
444 r#"{"type": "bytes", "logicalType": "decimal", "precision": 4, "scale": 3}"#,
445 &Schema::Decimal(DecimalSchema {
446 precision: 4,
447 scale: 3,
448 inner: InnerDecimalSchema::Bytes,
449 }),
450 Value::Decimal(Decimal::from(value.clone())),
451 &Schema::Bytes,
452 value,
453 )
454 }
455
456 #[test]
457 fn duration() -> TestResult {
458 let inner = Schema::Fixed(FixedSchema {
459 name: Name::new("duration")?,
460 aliases: None,
461 doc: None,
462 size: 12,
463 attributes: Default::default(),
464 });
465 let value = Value::Duration(Duration::new(
466 Months::new(256),
467 Days::new(512),
468 Millis::new(1024),
469 ));
470 logical_type_test(
471 r#"{"type": {"type": "fixed", "name": "duration", "size": 12}, "logicalType": "duration"}"#,
472 &Schema::Duration(FixedSchema {
473 name: Name::try_from("duration").expect("Name is valid"),
474 aliases: None,
475 doc: None,
476 size: 12,
477 attributes: Default::default(),
478 }),
479 value,
480 &inner,
481 Value::Fixed(12, vec![0, 1, 0, 0, 0, 2, 0, 0, 0, 4, 0, 0]),
482 )
483 }
484}