1use bon::bon;
19use serde::Serialize;
20use std::io::Write;
21
22use crate::{
23 AvroResult, Schema,
24 encode::encode_internal,
25 error::Details,
26 schema::{NamesRef, ResolvedSchema},
27 serde::ser_schema::{Config, SchemaAwareSerializer},
28 types::Value,
29 util::is_human_readable,
30};
31
32pub struct GenericDatumWriter<'s> {
38 schema: &'s Schema,
39 resolved: ResolvedSchema<'s>,
40 validate: bool,
41 human_readable: bool,
42 target_block_size: Option<usize>,
43}
44
45#[bon]
46impl<'s> GenericDatumWriter<'s> {
47 #[builder]
49 pub fn new(
50 #[builder(start_fn)]
52 schema: &'s Schema,
53 resolved_schemata: Option<ResolvedSchema<'s>>,
57 #[builder(default = true)]
64 validate: bool,
65 target_block_size: Option<usize>,
73 #[builder(default = is_human_readable())]
77 human_readable: bool,
78 ) -> AvroResult<Self> {
79 let resolved = if let Some(resolved) = resolved_schemata {
80 resolved
81 } else {
82 ResolvedSchema::try_from(schema)?
83 };
84 Ok(Self {
85 schema,
86 resolved,
87 validate,
88 human_readable,
89 target_block_size,
90 })
91 }
92}
93
94impl<'s, S: generic_datum_writer_builder::State> GenericDatumWriterBuilder<'s, S> {
95 pub fn schemata(
100 self,
101 schemata: Vec<&'s Schema>,
102 ) -> AvroResult<
103 GenericDatumWriterBuilder<'s, generic_datum_writer_builder::SetResolvedSchemata<S>>,
104 >
105 where
106 S::ResolvedSchemata: generic_datum_writer_builder::IsUnset,
107 {
108 let resolved = ResolvedSchema::new_with_schemata(schemata)?;
109 Ok(self.resolved_schemata(resolved))
110 }
111}
112
113impl GenericDatumWriter<'_> {
114 pub fn write_value<W: Write, V: Into<Value>>(
116 &self,
117 writer: &mut W,
118 value: V,
119 ) -> AvroResult<usize> {
120 let value = value.into();
121 self.write_value_ref(writer, &value)
122 }
123
124 pub fn write_value_ref<W: Write>(&self, writer: &mut W, value: &Value) -> AvroResult<usize> {
126 if self.validate
127 && value
128 .validate_internal(self.schema, self.resolved.get_names(), None)
129 .is_some()
130 {
131 return Err(Details::Validation.into());
132 }
133 encode_internal(value, self.schema, self.resolved.get_names(), None, writer)
134 }
135
136 pub fn write_value_to_vec<V: Into<Value>>(&self, value: V) -> AvroResult<Vec<u8>> {
138 let mut vec = Vec::new();
139 self.write_value(&mut vec, value)?;
140 Ok(vec)
141 }
142
143 pub fn write_ser<W: Write, T: Serialize>(
145 &self,
146 writer: &mut W,
147 value: &T,
148 ) -> AvroResult<usize> {
149 let config = Config {
150 names: self.resolved.get_names(),
151 target_block_size: self.target_block_size,
152 human_readable: self.human_readable,
153 };
154 value.serialize(SchemaAwareSerializer::new(writer, self.schema, config)?)
155 }
156
157 pub fn write_ser_to_vec<T: Serialize>(&self, value: &T) -> AvroResult<Vec<u8>> {
159 let mut vec = Vec::new();
160 self.write_ser(&mut vec, value)?;
161 Ok(vec)
162 }
163}
164
165#[deprecated(since = "0.22.0", note = "Use GenericDatumWriter instead")]
182pub fn to_avro_datum<T: Into<Value>>(schema: &Schema, value: T) -> AvroResult<Vec<u8>> {
183 GenericDatumWriter::builder(schema)
184 .build()?
185 .write_value_to_vec(value)
186}
187
188pub fn write_avro_datum_ref<T: Serialize, W: Write>(
200 schema: &Schema,
201 names: &NamesRef,
202 data: &T,
203 writer: &mut W,
204) -> AvroResult<usize> {
205 let config = Config {
206 names,
207 target_block_size: None,
208 human_readable: is_human_readable(),
209 };
210 data.serialize(SchemaAwareSerializer::new(writer, schema, config)?)
211}
212
213#[deprecated(since = "0.22.0", note = "Use GenericDatumWriter instead")]
228pub fn to_avro_datum_schemata<T: Into<Value>>(
229 schema: &Schema,
230 schemata: Vec<&Schema>,
231 value: T,
232) -> AvroResult<Vec<u8>> {
233 GenericDatumWriter::builder(schema)
234 .schemata(schemata)?
235 .build()?
236 .write_value_to_vec(value)
237}
238
239#[cfg(test)]
240mod tests {
241 use apache_avro_test_helper::TestResult;
242
243 use super::*;
244 use crate::reader::datum::GenericDatumReader;
245 use crate::{
246 Days, Decimal, Duration, Millis, Months,
247 schema::{DecimalSchema, FixedSchema, InnerDecimalSchema, Name},
248 types::Record,
249 util::zig_i64,
250 };
251
252 const SCHEMA: &str = r#"
253 {
254 "type": "record",
255 "name": "test",
256 "fields": [
257 {
258 "name": "a",
259 "type": "long",
260 "default": 42
261 },
262 {
263 "name": "b",
264 "type": "string"
265 }
266 ]
267 }
268 "#;
269
270 const UNION_SCHEMA: &str = r#"["null", "long"]"#;
271
272 #[test]
273 fn test_to_avro_datum() -> TestResult {
274 let schema = Schema::parse_str(SCHEMA)?;
275 let mut record = Record::new(&schema).unwrap();
276 record.put("a", 27i64);
277 record.put("b", "foo");
278
279 let mut expected = Vec::new();
280 zig_i64(27, &mut expected)?;
281 zig_i64(3, &mut expected)?;
282 expected.extend([b'f', b'o', b'o']);
283
284 let written = GenericDatumWriter::builder(&schema)
285 .build()?
286 .write_value_to_vec(record)?;
287
288 assert_eq!(written, expected);
289
290 Ok(())
291 }
292
293 #[test]
294 fn avro_rs_193_write_avro_datum_ref() -> TestResult {
295 #[derive(Serialize)]
296 struct TestStruct {
297 a: i64,
298 b: String,
299 }
300
301 let schema = Schema::parse_str(SCHEMA)?;
302 let mut writer: Vec<u8> = Vec::new();
303 let data = TestStruct {
304 a: 27,
305 b: "foo".to_string(),
306 };
307
308 let mut expected = Vec::new();
309 zig_i64(27, &mut expected)?;
310 zig_i64(3, &mut expected)?;
311 expected.extend([b'f', b'o', b'o']);
312
313 let bytes = GenericDatumWriter::builder(&schema)
314 .build()?
315 .write_ser(&mut writer, &data)?;
316
317 assert_eq!(bytes, expected.len());
318 assert_eq!(writer, expected);
319
320 Ok(())
321 }
322
323 #[test]
324 fn test_union_not_null() -> TestResult {
325 let schema = Schema::parse_str(UNION_SCHEMA)?;
326 let union = Value::Union(1, Box::new(Value::Long(3)));
327
328 let mut expected = Vec::new();
329 zig_i64(1, &mut expected)?;
330 zig_i64(3, &mut expected)?;
331
332 let written = GenericDatumWriter::builder(&schema)
333 .build()?
334 .write_value_to_vec(union)?;
335 assert_eq!(written, expected);
336
337 Ok(())
338 }
339
340 #[test]
341 fn test_union_null() -> TestResult {
342 let schema = Schema::parse_str(UNION_SCHEMA)?;
343 let union = Value::Union(0, Box::new(Value::Null));
344
345 let mut expected = Vec::new();
346 zig_i64(0, &mut expected)?;
347
348 let written = GenericDatumWriter::builder(&schema)
349 .build()?
350 .write_value_to_vec(union)?;
351 assert_eq!(written, expected);
352
353 Ok(())
354 }
355
356 #[expect(
357 clippy::needless_pass_by_value,
358 reason = "Value does not implement PartialEq<&Value>"
359 )]
360 fn logical_type_test<T: Into<Value> + Clone>(
361 schema_str: &'static str,
362
363 expected_schema: &Schema,
364 value: Value,
365
366 raw_schema: &Schema,
367 raw_value: T,
368 ) -> TestResult {
369 let schema = Schema::parse_str(schema_str)?;
370 assert_eq!(&schema, expected_schema);
371 let ser = GenericDatumWriter::builder(&schema)
373 .build()?
374 .write_value_to_vec(value.clone())?;
375 let raw_ser = GenericDatumWriter::builder(raw_schema)
376 .build()?
377 .write_value_to_vec(raw_value)?;
378 assert_eq!(ser, raw_ser);
379
380 let mut r = ser.as_slice();
382 let de = GenericDatumReader::builder(&schema)
383 .build()?
384 .read_value(&mut r)?;
385 assert_eq!(de, value);
386 Ok(())
387 }
388
389 #[test]
390 fn date() -> TestResult {
391 logical_type_test(
392 r#"{"type": "int", "logicalType": "date"}"#,
393 &Schema::Date,
394 Value::Date(1_i32),
395 &Schema::Int,
396 1_i32,
397 )
398 }
399
400 #[test]
401 fn time_millis() -> TestResult {
402 logical_type_test(
403 r#"{"type": "int", "logicalType": "time-millis"}"#,
404 &Schema::TimeMillis,
405 Value::TimeMillis(1_i32),
406 &Schema::Int,
407 1_i32,
408 )
409 }
410
411 #[test]
412 fn time_micros() -> TestResult {
413 logical_type_test(
414 r#"{"type": "long", "logicalType": "time-micros"}"#,
415 &Schema::TimeMicros,
416 Value::TimeMicros(1_i64),
417 &Schema::Long,
418 1_i64,
419 )
420 }
421
422 #[test]
423 fn timestamp_millis() -> TestResult {
424 logical_type_test(
425 r#"{"type": "long", "logicalType": "timestamp-millis"}"#,
426 &Schema::TimestampMillis,
427 Value::TimestampMillis(1_i64),
428 &Schema::Long,
429 1_i64,
430 )
431 }
432
433 #[test]
434 fn timestamp_micros() -> TestResult {
435 logical_type_test(
436 r#"{"type": "long", "logicalType": "timestamp-micros"}"#,
437 &Schema::TimestampMicros,
438 Value::TimestampMicros(1_i64),
439 &Schema::Long,
440 1_i64,
441 )
442 }
443
444 #[test]
445 fn decimal_fixed() -> TestResult {
446 let size = 30;
447 let fixed = FixedSchema {
448 name: Name::new("decimal")?,
449 aliases: None,
450 doc: None,
451 size,
452 attributes: Default::default(),
453 };
454 let inner = InnerDecimalSchema::Fixed(fixed.clone());
455 let value = vec![0u8; size];
456 logical_type_test(
457 r#"{"type": {"type": "fixed", "size": 30, "name": "decimal"}, "logicalType": "decimal", "precision": 20, "scale": 5}"#,
458 &Schema::Decimal(DecimalSchema {
459 precision: 20,
460 scale: 5,
461 inner,
462 }),
463 Value::Decimal(Decimal::from(value.clone())),
464 &Schema::Fixed(fixed),
465 Value::Fixed(size, value),
466 )
467 }
468
469 #[test]
470 fn decimal_bytes() -> TestResult {
471 let value = vec![0u8; 10];
472 logical_type_test(
473 r#"{"type": "bytes", "logicalType": "decimal", "precision": 4, "scale": 3}"#,
474 &Schema::Decimal(DecimalSchema {
475 precision: 4,
476 scale: 3,
477 inner: InnerDecimalSchema::Bytes,
478 }),
479 Value::Decimal(Decimal::from(value.clone())),
480 &Schema::Bytes,
481 value,
482 )
483 }
484
485 #[test]
486 fn duration() -> TestResult {
487 let inner = Schema::Fixed(FixedSchema {
488 name: Name::new("duration")?,
489 aliases: None,
490 doc: None,
491 size: 12,
492 attributes: Default::default(),
493 });
494 let value = Value::Duration(Duration::new(
495 Months::new(256),
496 Days::new(512),
497 Millis::new(1024),
498 ));
499 logical_type_test(
500 r#"{"type": {"type": "fixed", "name": "duration", "size": 12}, "logicalType": "duration"}"#,
501 &Schema::Duration(FixedSchema {
502 name: Name::try_from("duration").expect("Name is valid"),
503 aliases: None,
504 doc: None,
505 size: 12,
506 attributes: Default::default(),
507 }),
508 value,
509 &inner,
510 Value::Fixed(12, vec![0, 1, 0, 0, 0, 2, 0, 0, 0, 4, 0, 0]),
511 )
512 }
513}