1use std::io::Read;
19
20use bon::bon;
21
22use crate::{AvroResult, Schema, decode::decode_internal, schema::ResolvedSchema, types::Value};
23
24pub struct GenericDatumReader<'s> {
30 writer: &'s Schema,
31 resolved: ResolvedSchema<'s>,
32 reader: Option<(&'s Schema, ResolvedSchema<'s>)>,
33}
34
35#[bon]
36impl<'s> GenericDatumReader<'s> {
37 #[builder]
43 pub fn new(
44 #[builder(start_fn)]
46 writer_schema: &'s Schema,
47 resolved_writer_schemata: Option<ResolvedSchema<'s>>,
49 reader_schema: Option<&'s Schema>,
51 resolved_reader_schemata: Option<ResolvedSchema<'s>>,
53 ) -> AvroResult<Self> {
54 let resolved_writer_schemata = if let Some(resolved) = resolved_writer_schemata {
55 resolved
56 } else {
57 ResolvedSchema::try_from(writer_schema)?
58 };
59
60 let reader = if let Some(reader) = reader_schema {
61 if let Some(resolved) = resolved_reader_schemata {
62 Some((reader, resolved))
63 } else {
64 Some((reader, ResolvedSchema::try_from(reader)?))
65 }
66 } else {
67 None
68 };
69
70 Ok(Self {
71 writer: writer_schema,
72 resolved: resolved_writer_schemata,
73 reader,
74 })
75 }
76}
77
78impl<'s, S: generic_datum_reader_builder::State> GenericDatumReaderBuilder<'s, S> {
79 pub fn writer_schemata(
84 self,
85 schemata: Vec<&'s Schema>,
86 ) -> AvroResult<
87 GenericDatumReaderBuilder<'s, generic_datum_reader_builder::SetResolvedWriterSchemata<S>>,
88 >
89 where
90 S::ResolvedWriterSchemata: generic_datum_reader_builder::IsUnset,
91 {
92 let resolved = ResolvedSchema::new_with_schemata(schemata)?;
93 Ok(self.resolved_writer_schemata(resolved))
94 }
95
96 pub fn reader_schemata(
103 self,
104 schemata: Vec<&'s Schema>,
105 ) -> AvroResult<
106 GenericDatumReaderBuilder<'s, generic_datum_reader_builder::SetResolvedReaderSchemata<S>>,
107 >
108 where
109 S::ResolvedReaderSchemata: generic_datum_reader_builder::IsUnset,
110 S::ReaderSchema: generic_datum_reader_builder::IsSet,
111 {
112 let resolved = ResolvedSchema::new_with_schemata(schemata)?;
113 Ok(self.resolved_reader_schemata(resolved))
114 }
115}
116
117impl<'s> GenericDatumReader<'s> {
118 pub fn read_value<R: Read>(&self, reader: &mut R) -> AvroResult<Value> {
120 let value = decode_internal(self.writer, self.resolved.get_names(), None, reader)?;
121 if let Some((reader, resolved)) = &self.reader {
122 value.resolve_internal(reader, resolved.get_names(), None, &None)
123 } else {
124 Ok(value)
125 }
126 }
127}
128
129#[deprecated(since = "0.22.0", note = "Use `GenericDatumReader` instead")]
148pub fn from_avro_datum<R: Read>(
149 writer_schema: &Schema,
150 reader: &mut R,
151 reader_schema: Option<&Schema>,
152) -> AvroResult<Value> {
153 GenericDatumReader::builder(writer_schema)
154 .maybe_reader_schema(reader_schema)
155 .build()?
156 .read_value(reader)
157}
158
159#[deprecated(since = "0.22.0", note = "Use `GenericDatumReader` instead")]
177pub fn from_avro_datum_schemata<R: Read>(
178 writer_schema: &Schema,
179 writer_schemata: Vec<&Schema>,
180 reader: &mut R,
181 reader_schema: Option<&Schema>,
182) -> AvroResult<Value> {
183 GenericDatumReader::builder(writer_schema)
184 .writer_schemata(writer_schemata)?
185 .maybe_reader_schema(reader_schema)
186 .build()?
187 .read_value(reader)
188}
189
190#[deprecated(since = "0.22.0", note = "Use `GenericDatumReader` instead")]
209pub fn from_avro_datum_reader_schemata<R: Read>(
210 writer_schema: &Schema,
211 writer_schemata: Vec<&Schema>,
212 reader: &mut R,
213 reader_schema: Option<&Schema>,
214 reader_schemata: Vec<&Schema>,
215) -> AvroResult<Value> {
216 GenericDatumReader::builder(writer_schema)
217 .writer_schemata(writer_schemata)?
218 .maybe_reader_schema(reader_schema)
219 .reader_schemata(reader_schemata)?
220 .build()?
221 .read_value(reader)
222}
223
224#[cfg(test)]
225mod tests {
226 use apache_avro_test_helper::TestResult;
227 use serde::Deserialize;
228
229 use crate::{
230 Schema, from_value,
231 reader::datum::GenericDatumReader,
232 types::{Record, Value},
233 };
234
235 #[test]
236 fn test_from_avro_datum() -> TestResult {
237 let schema = Schema::parse_str(
238 r#"{
239 "type": "record",
240 "name": "test",
241 "fields": [
242 {
243 "name": "a",
244 "type": "long",
245 "default": 42
246 },
247 {
248 "name": "b",
249 "type": "string"
250 }
251 ]
252 }"#,
253 )?;
254 let mut encoded: &'static [u8] = &[54, 6, 102, 111, 111];
255
256 let mut record = Record::new(&schema).unwrap();
257 record.put("a", 27i64);
258 record.put("b", "foo");
259 let expected = record.into();
260
261 let avro_datum = GenericDatumReader::builder(&schema)
262 .build()?
263 .read_value(&mut encoded)?;
264
265 assert_eq!(avro_datum, expected);
266
267 Ok(())
268 }
269
270 #[test]
271 fn test_from_avro_datum_with_union_to_struct() -> TestResult {
272 const TEST_RECORD_SCHEMA_3240: &str = r#"
273 {
274 "type": "record",
275 "name": "test",
276 "fields": [
277 {
278 "name": "a",
279 "type": "long",
280 "default": 42
281 },
282 {
283 "name": "b",
284 "type": "string"
285 },
286 {
287 "name": "a_nullable_array",
288 "type": ["null", {"type": "array", "items": {"type": "string"}}],
289 "default": null
290 },
291 {
292 "name": "a_nullable_boolean",
293 "type": ["null", {"type": "boolean"}],
294 "default": null
295 },
296 {
297 "name": "a_nullable_string",
298 "type": ["null", {"type": "string"}],
299 "default": null
300 }
301 ]
302 }
303 "#;
304 #[derive(Default, Debug, Deserialize, PartialEq, Eq)]
305 struct TestRecord3240 {
306 a: i64,
307 b: String,
308 a_nullable_array: Option<Vec<String>>,
309 a_nullable_string: Option<String>,
312 }
313
314 let schema = Schema::parse_str(TEST_RECORD_SCHEMA_3240)?;
315 let mut encoded: &'static [u8] = &[54, 6, 102, 111, 111];
316
317 let expected_record: TestRecord3240 = TestRecord3240 {
318 a: 27i64,
319 b: String::from("foo"),
320 a_nullable_array: None,
321 a_nullable_string: None,
322 };
323
324 let avro_datum = GenericDatumReader::builder(&schema)
325 .build()?
326 .read_value(&mut encoded)?;
327 let parsed_record: TestRecord3240 = match &avro_datum {
328 Value::Record(_) => from_value::<TestRecord3240>(&avro_datum)?,
329 unexpected => {
330 panic!("could not map avro data to struct, found unexpected: {unexpected:?}")
331 }
332 };
333
334 assert_eq!(parsed_record, expected_record);
335
336 Ok(())
337 }
338
339 #[test]
340 fn test_null_union() -> TestResult {
341 let schema = Schema::parse_str(r#"["null", "long"]"#)?;
342 let mut encoded: &'static [u8] = &[2, 0];
343
344 let avro_datum = GenericDatumReader::builder(&schema)
345 .build()?
346 .read_value(&mut encoded)?;
347 assert_eq!(avro_datum, Value::Union(1, Box::new(Value::Long(0))));
348
349 Ok(())
350 }
351}