1use std::{io::Read, marker::PhantomData};
19
20use bon::bon;
21use serde::de::DeserializeOwned;
22
23use crate::{
24 AvroResult, AvroSchema, Schema,
25 decode::decode_internal,
26 schema::{ResolvedOwnedSchema, ResolvedSchema},
27 serde::deser_schema::{Config, SchemaAwareDeserializer},
28 types::Value,
29 util::is_human_readable,
30};
31
32pub struct GenericDatumReader<'s> {
38 writer: &'s Schema,
39 resolved: ResolvedSchema<'s>,
40 reader: Option<(&'s Schema, ResolvedSchema<'s>)>,
41 human_readable: bool,
42}
43
44#[bon]
45impl<'s> GenericDatumReader<'s> {
46 #[builder]
52 pub fn new(
53 #[builder(start_fn)]
55 writer_schema: &'s Schema,
56 resolved_writer_schemata: Option<ResolvedSchema<'s>>,
58 reader_schema: Option<&'s Schema>,
60 resolved_reader_schemata: Option<ResolvedSchema<'s>>,
62 #[builder(default = is_human_readable())]
64 human_readable: bool,
65 ) -> AvroResult<Self> {
66 let resolved_writer_schemata = if let Some(resolved) = resolved_writer_schemata {
67 resolved
68 } else {
69 ResolvedSchema::try_from(writer_schema)?
70 };
71
72 let reader = if let Some(reader) = reader_schema {
73 if let Some(resolved) = resolved_reader_schemata {
74 Some((reader, resolved))
75 } else {
76 Some((reader, ResolvedSchema::try_from(reader)?))
77 }
78 } else {
79 None
80 };
81
82 Ok(Self {
83 writer: writer_schema,
84 resolved: resolved_writer_schemata,
85 reader,
86 human_readable,
87 })
88 }
89}
90
91impl<'s, S: generic_datum_reader_builder::State> GenericDatumReaderBuilder<'s, S> {
92 pub fn writer_schemata(
97 self,
98 schemata: Vec<&'s Schema>,
99 ) -> AvroResult<
100 GenericDatumReaderBuilder<'s, generic_datum_reader_builder::SetResolvedWriterSchemata<S>>,
101 >
102 where
103 S::ResolvedWriterSchemata: generic_datum_reader_builder::IsUnset,
104 {
105 let resolved = ResolvedSchema::new_with_schemata(schemata)?;
106 Ok(self.resolved_writer_schemata(resolved))
107 }
108
109 pub fn reader_schemata(
116 self,
117 schemata: Vec<&'s Schema>,
118 ) -> AvroResult<
119 GenericDatumReaderBuilder<'s, generic_datum_reader_builder::SetResolvedReaderSchemata<S>>,
120 >
121 where
122 S::ResolvedReaderSchemata: generic_datum_reader_builder::IsUnset,
123 S::ReaderSchema: generic_datum_reader_builder::IsSet,
124 {
125 let resolved = ResolvedSchema::new_with_schemata(schemata)?;
126 Ok(self.resolved_reader_schemata(resolved))
127 }
128}
129
130impl<'s> GenericDatumReader<'s> {
131 pub fn read_value<R: Read>(&self, reader: &mut R) -> AvroResult<Value> {
133 let value = decode_internal(self.writer, self.resolved.get_names(), None, reader)?;
134 if let Some((reader, resolved)) = &self.reader {
135 value.resolve_internal(reader, resolved.get_names(), None, &None)
136 } else {
137 Ok(value)
138 }
139 }
140
141 pub fn read_deser<T: DeserializeOwned>(&self, reader: &mut impl Read) -> AvroResult<T> {
146 if let Some((_, _)) = &self.reader {
149 panic!("Schema aware deserialisation does not resolve schemas yet");
151 } else {
152 T::deserialize(SchemaAwareDeserializer::new(
153 reader,
154 self.writer,
155 Config {
156 names: self.resolved.get_names(),
157 human_readable: self.human_readable,
158 },
159 )?)
160 }
161 }
162}
163
164pub struct SpecificDatumReader<T: AvroSchema> {
170 resolved: ResolvedOwnedSchema,
171 human_readable: bool,
172 phantom: PhantomData<T>,
173}
174
175#[bon]
176impl<T: AvroSchema> SpecificDatumReader<T> {
177 #[builder]
183 pub fn new(#[builder(default = is_human_readable())] human_readable: bool) -> AvroResult<Self> {
184 Ok(Self {
185 resolved: T::get_schema().try_into()?,
186 human_readable,
187 phantom: PhantomData,
188 })
189 }
190}
191
192impl<T: AvroSchema + DeserializeOwned> SpecificDatumReader<T> {
193 pub fn read<R: Read>(&self, reader: &mut R) -> AvroResult<T> {
194 T::deserialize(SchemaAwareDeserializer::new(
195 reader,
196 self.resolved.get_root_schema(),
197 Config {
198 names: self.resolved.get_names(),
199 human_readable: self.human_readable,
200 },
201 )?)
202 }
203}
204
205#[deprecated(since = "0.22.0", note = "Use `GenericDatumReader` instead")]
224pub fn from_avro_datum<R: Read>(
225 writer_schema: &Schema,
226 reader: &mut R,
227 reader_schema: Option<&Schema>,
228) -> AvroResult<Value> {
229 GenericDatumReader::builder(writer_schema)
230 .maybe_reader_schema(reader_schema)
231 .build()?
232 .read_value(reader)
233}
234
235#[deprecated(since = "0.22.0", note = "Use `GenericDatumReader` instead")]
253pub fn from_avro_datum_schemata<R: Read>(
254 writer_schema: &Schema,
255 writer_schemata: Vec<&Schema>,
256 reader: &mut R,
257 reader_schema: Option<&Schema>,
258) -> AvroResult<Value> {
259 GenericDatumReader::builder(writer_schema)
260 .writer_schemata(writer_schemata)?
261 .maybe_reader_schema(reader_schema)
262 .build()?
263 .read_value(reader)
264}
265
266#[deprecated(since = "0.22.0", note = "Use `GenericDatumReader` instead")]
285pub fn from_avro_datum_reader_schemata<R: Read>(
286 writer_schema: &Schema,
287 writer_schemata: Vec<&Schema>,
288 reader: &mut R,
289 reader_schema: Option<&Schema>,
290 reader_schemata: Vec<&Schema>,
291) -> AvroResult<Value> {
292 GenericDatumReader::builder(writer_schema)
293 .writer_schemata(writer_schemata)?
294 .maybe_reader_schema(reader_schema)
295 .reader_schemata(reader_schemata)?
296 .build()?
297 .read_value(reader)
298}
299
300#[cfg(test)]
301mod tests {
302 use apache_avro_test_helper::TestResult;
303 use serde::Deserialize;
304
305 use crate::{
306 Schema,
307 reader::datum::GenericDatumReader,
308 types::{Record, Value},
309 };
310
311 #[test]
312 fn test_from_avro_datum() -> TestResult {
313 let schema = Schema::parse_str(
314 r#"{
315 "type": "record",
316 "name": "test",
317 "fields": [
318 {
319 "name": "a",
320 "type": "long",
321 "default": 42
322 },
323 {
324 "name": "b",
325 "type": "string"
326 }
327 ]
328 }"#,
329 )?;
330 let mut encoded: &'static [u8] = &[54, 6, 102, 111, 111];
331
332 let mut record = Record::new(&schema).unwrap();
333 record.put("a", 27i64);
334 record.put("b", "foo");
335 let expected = record.into();
336
337 let avro_datum = GenericDatumReader::builder(&schema)
338 .build()?
339 .read_value(&mut encoded)?;
340
341 assert_eq!(avro_datum, expected);
342
343 Ok(())
344 }
345
346 #[test]
347 fn test_from_avro_datum_with_union_to_struct() -> TestResult {
348 const TEST_RECORD_SCHEMA_3240: &str = r#"
349 {
350 "type": "record",
351 "name": "TestRecord3240",
352 "fields": [
353 {
354 "name": "a",
355 "type": "long",
356 "default": 42
357 },
358 {
359 "name": "b",
360 "type": "string"
361 },
362 {
363 "name": "a_nullable_array",
364 "type": ["null", {"type": "array", "items": {"type": "string"}}],
365 "default": null
366 },
367 {
368 "name": "a_nullable_boolean",
369 "type": ["null", {"type": "boolean"}],
370 "default": null
371 },
372 {
373 "name": "a_nullable_string",
374 "type": ["null", {"type": "string"}],
375 "default": null
376 }
377 ]
378 }
379 "#;
380 #[derive(Default, Debug, Deserialize, PartialEq, Eq)]
381 struct TestRecord3240 {
382 a: i64,
383 b: String,
384 a_nullable_array: Option<Vec<String>>,
385 a_nullable_string: Option<String>,
388 }
389
390 let schema = Schema::parse_str(TEST_RECORD_SCHEMA_3240)?;
391 let mut encoded: &[u8] = &[54, 6, 102, 111, 111];
392
393 let error = GenericDatumReader::builder(&schema)
394 .build()?
395 .read_deser::<TestRecord3240>(&mut encoded)
396 .unwrap_err();
397 assert_eq!(
399 error.to_string(),
400 "Failed to read bytes for decoding variable length integer: failed to fill whole buffer"
401 );
402
403 Ok(())
404 }
405
406 #[test]
407 fn test_null_union() -> TestResult {
408 let schema = Schema::parse_str(r#"["null", "long"]"#)?;
409 let mut encoded: &'static [u8] = &[2, 0];
410
411 let avro_datum = GenericDatumReader::builder(&schema)
412 .build()?
413 .read_value(&mut encoded)?;
414 assert_eq!(avro_datum, Value::Union(1, Box::new(Value::Long(0))));
415
416 Ok(())
417 }
418}