1mod block;
21pub mod single_object;
22
23use crate::{
24 AvroResult,
25 decode::{decode, decode_internal},
26 schema::{ResolvedSchema, Schema},
27 types::Value,
28};
29use block::Block;
30use bon::bon;
31use std::{collections::HashMap, io::Read};
32
33pub struct Reader<'a, R> {
49 block: Block<'a, R>,
50 reader_schema: Option<&'a Schema>,
51 errored: bool,
52 should_resolve_schema: bool,
53}
54
55#[bon]
56impl<'a, R: Read> Reader<'a, R> {
57 pub fn new(reader: R) -> AvroResult<Reader<'a, R>> {
62 Reader::builder(reader).build()
63 }
64
65 #[builder(finish_fn = build)]
71 pub fn builder(
72 #[builder(start_fn)] reader: R,
73 reader_schema: Option<&'a Schema>,
74 schemata: Option<Vec<&'a Schema>>,
75 ) -> AvroResult<Reader<'a, R>> {
76 let schemata =
77 schemata.unwrap_or_else(|| reader_schema.map(|rs| vec![rs]).unwrap_or_default());
78
79 let block = Block::new(reader, schemata)?;
80 let mut reader = Reader {
81 block,
82 reader_schema,
83 errored: false,
84 should_resolve_schema: false,
85 };
86 reader.should_resolve_schema =
88 reader_schema.is_some_and(|reader_schema| reader.writer_schema() != reader_schema);
89 Ok(reader)
90 }
91
92 #[inline]
94 pub fn writer_schema(&self) -> &Schema {
95 &self.block.writer_schema
96 }
97
98 #[inline]
100 pub fn reader_schema(&self) -> Option<&Schema> {
101 self.reader_schema
102 }
103
104 #[inline]
106 pub fn user_metadata(&self) -> &HashMap<String, Vec<u8>> {
107 &self.block.user_metadata
108 }
109
110 #[inline]
111 fn read_next(&mut self) -> AvroResult<Option<Value>> {
112 let read_schema = if self.should_resolve_schema {
113 self.reader_schema
114 } else {
115 None
116 };
117
118 self.block.read_next(read_schema)
119 }
120}
121
122impl<R: Read> Iterator for Reader<'_, R> {
123 type Item = AvroResult<Value>;
124
125 fn next(&mut self) -> Option<Self::Item> {
126 if self.errored {
128 return None;
129 };
130 match self.read_next() {
131 Ok(opt) => opt.map(Ok),
132 Err(e) => {
133 self.errored = true;
134 Some(Err(e))
135 }
136 }
137 }
138}
139
140pub fn from_avro_datum<R: Read>(
149 writer_schema: &Schema,
150 reader: &mut R,
151 reader_schema: Option<&Schema>,
152) -> AvroResult<Value> {
153 let value = decode(writer_schema, reader)?;
154 match reader_schema {
155 Some(schema) => value.resolve(schema),
156 None => Ok(value),
157 }
158}
159
160pub fn from_avro_datum_schemata<R: Read>(
167 writer_schema: &Schema,
168 writer_schemata: Vec<&Schema>,
169 reader: &mut R,
170 reader_schema: Option<&Schema>,
171) -> AvroResult<Value> {
172 from_avro_datum_reader_schemata(
173 writer_schema,
174 writer_schemata,
175 reader,
176 reader_schema,
177 Vec::with_capacity(0),
178 )
179}
180
181pub fn from_avro_datum_reader_schemata<R: Read>(
188 writer_schema: &Schema,
189 writer_schemata: Vec<&Schema>,
190 reader: &mut R,
191 reader_schema: Option<&Schema>,
192 reader_schemata: Vec<&Schema>,
193) -> AvroResult<Value> {
194 let rs = ResolvedSchema::try_from(writer_schemata)?;
195 let value = decode_internal(writer_schema, rs.get_names(), &None, reader)?;
196 match reader_schema {
197 Some(schema) => {
198 if reader_schemata.is_empty() {
199 value.resolve(schema)
200 } else {
201 value.resolve_schemata(schema, reader_schemata)
202 }
203 }
204 None => Ok(value),
205 }
206}
207
208pub fn read_marker(bytes: &[u8]) -> [u8; 16] {
210 assert!(
211 bytes.len() > 16,
212 "The bytes are too short to read a marker from them"
213 );
214 let mut marker = [0_u8; 16];
215 marker.clone_from_slice(&bytes[(bytes.len() - 16)..]);
216 marker
217}
218
219#[cfg(test)]
220mod tests {
221 use super::*;
222 use crate::from_value;
223 use crate::types::Record;
224 use apache_avro_test_helper::TestResult;
225 use pretty_assertions::assert_eq;
226 use serde::Deserialize;
227 use std::io::Cursor;
228
229 const SCHEMA: &str = r#"
230 {
231 "type": "record",
232 "name": "test",
233 "fields": [
234 {
235 "name": "a",
236 "type": "long",
237 "default": 42
238 },
239 {
240 "name": "b",
241 "type": "string"
242 }
243 ]
244 }
245 "#;
246 const UNION_SCHEMA: &str = r#"["null", "long"]"#;
247 const ENCODED: &[u8] = &[
248 79u8, 98u8, 106u8, 1u8, 4u8, 22u8, 97u8, 118u8, 114u8, 111u8, 46u8, 115u8, 99u8, 104u8,
249 101u8, 109u8, 97u8, 222u8, 1u8, 123u8, 34u8, 116u8, 121u8, 112u8, 101u8, 34u8, 58u8, 34u8,
250 114u8, 101u8, 99u8, 111u8, 114u8, 100u8, 34u8, 44u8, 34u8, 110u8, 97u8, 109u8, 101u8, 34u8,
251 58u8, 34u8, 116u8, 101u8, 115u8, 116u8, 34u8, 44u8, 34u8, 102u8, 105u8, 101u8, 108u8,
252 100u8, 115u8, 34u8, 58u8, 91u8, 123u8, 34u8, 110u8, 97u8, 109u8, 101u8, 34u8, 58u8, 34u8,
253 97u8, 34u8, 44u8, 34u8, 116u8, 121u8, 112u8, 101u8, 34u8, 58u8, 34u8, 108u8, 111u8, 110u8,
254 103u8, 34u8, 44u8, 34u8, 100u8, 101u8, 102u8, 97u8, 117u8, 108u8, 116u8, 34u8, 58u8, 52u8,
255 50u8, 125u8, 44u8, 123u8, 34u8, 110u8, 97u8, 109u8, 101u8, 34u8, 58u8, 34u8, 98u8, 34u8,
256 44u8, 34u8, 116u8, 121u8, 112u8, 101u8, 34u8, 58u8, 34u8, 115u8, 116u8, 114u8, 105u8,
257 110u8, 103u8, 34u8, 125u8, 93u8, 125u8, 20u8, 97u8, 118u8, 114u8, 111u8, 46u8, 99u8, 111u8,
258 100u8, 101u8, 99u8, 8u8, 110u8, 117u8, 108u8, 108u8, 0u8, 94u8, 61u8, 54u8, 221u8, 190u8,
259 207u8, 108u8, 180u8, 158u8, 57u8, 114u8, 40u8, 173u8, 199u8, 228u8, 239u8, 4u8, 20u8, 54u8,
260 6u8, 102u8, 111u8, 111u8, 84u8, 6u8, 98u8, 97u8, 114u8, 94u8, 61u8, 54u8, 221u8, 190u8,
261 207u8, 108u8, 180u8, 158u8, 57u8, 114u8, 40u8, 173u8, 199u8, 228u8, 239u8,
262 ];
263
264 #[test]
265 fn test_from_avro_datum() -> TestResult {
266 let schema = Schema::parse_str(SCHEMA)?;
267 let mut encoded: &'static [u8] = &[54, 6, 102, 111, 111];
268
269 let mut record = Record::new(&schema).unwrap();
270 record.put("a", 27i64);
271 record.put("b", "foo");
272 let expected = record.into();
273
274 assert_eq!(from_avro_datum(&schema, &mut encoded, None)?, expected);
275
276 Ok(())
277 }
278
279 #[test]
280 fn test_from_avro_datum_with_union_to_struct() -> TestResult {
281 const TEST_RECORD_SCHEMA_3240: &str = r#"
282 {
283 "type": "record",
284 "name": "test",
285 "fields": [
286 {
287 "name": "a",
288 "type": "long",
289 "default": 42
290 },
291 {
292 "name": "b",
293 "type": "string"
294 },
295 {
296 "name": "a_nullable_array",
297 "type": ["null", {"type": "array", "items": {"type": "string"}}],
298 "default": null
299 },
300 {
301 "name": "a_nullable_boolean",
302 "type": ["null", {"type": "boolean"}],
303 "default": null
304 },
305 {
306 "name": "a_nullable_string",
307 "type": ["null", {"type": "string"}],
308 "default": null
309 }
310 ]
311 }
312 "#;
313 #[derive(Default, Debug, Deserialize, PartialEq, Eq)]
314 struct TestRecord3240 {
315 a: i64,
316 b: String,
317 a_nullable_array: Option<Vec<String>>,
318 a_nullable_string: Option<String>,
321 }
322
323 let schema = Schema::parse_str(TEST_RECORD_SCHEMA_3240)?;
324 let mut encoded: &'static [u8] = &[54, 6, 102, 111, 111];
325
326 let expected_record: TestRecord3240 = TestRecord3240 {
327 a: 27i64,
328 b: String::from("foo"),
329 a_nullable_array: None,
330 a_nullable_string: None,
331 };
332
333 let avro_datum = from_avro_datum(&schema, &mut encoded, None)?;
334 let parsed_record: TestRecord3240 = match &avro_datum {
335 Value::Record(_) => from_value::<TestRecord3240>(&avro_datum)?,
336 unexpected => {
337 panic!("could not map avro data to struct, found unexpected: {unexpected:?}")
338 }
339 };
340
341 assert_eq!(parsed_record, expected_record);
342
343 Ok(())
344 }
345
346 #[test]
347 fn test_null_union() -> TestResult {
348 let schema = Schema::parse_str(UNION_SCHEMA)?;
349 let mut encoded: &'static [u8] = &[2, 0];
350
351 assert_eq!(
352 from_avro_datum(&schema, &mut encoded, None)?,
353 Value::Union(1, Box::new(Value::Long(0)))
354 );
355
356 Ok(())
357 }
358
359 #[test]
360 fn test_reader_iterator() -> TestResult {
361 let schema = Schema::parse_str(SCHEMA)?;
362 let reader = Reader::builder(ENCODED).reader_schema(&schema).build()?;
363
364 let mut record1 = Record::new(&schema).unwrap();
365 record1.put("a", 27i64);
366 record1.put("b", "foo");
367
368 let mut record2 = Record::new(&schema).unwrap();
369 record2.put("a", 42i64);
370 record2.put("b", "bar");
371
372 let expected = [record1.into(), record2.into()];
373
374 for (i, value) in reader.enumerate() {
375 assert_eq!(value?, expected[i]);
376 }
377
378 Ok(())
379 }
380
381 #[test]
382 fn test_reader_invalid_header() -> TestResult {
383 let schema = Schema::parse_str(SCHEMA)?;
384 let mut invalid = &ENCODED[1..];
385 assert!(
386 Reader::builder(&mut invalid)
387 .reader_schema(&schema)
388 .build()
389 .is_err()
390 );
391
392 Ok(())
393 }
394
395 #[test]
396 fn test_reader_invalid_block() -> TestResult {
397 let schema = Schema::parse_str(SCHEMA)?;
398 let mut invalid = &ENCODED[0..ENCODED.len() - 19];
399 let reader = Reader::builder(&mut invalid)
400 .reader_schema(&schema)
401 .build()?;
402 for value in reader {
403 assert!(value.is_err());
404 }
405
406 Ok(())
407 }
408
409 #[test]
410 fn test_reader_empty_buffer() -> TestResult {
411 let empty = Cursor::new(Vec::new());
412 assert!(Reader::new(empty).is_err());
413
414 Ok(())
415 }
416
417 #[test]
418 fn test_reader_only_header() -> TestResult {
419 let mut invalid = &ENCODED[..165];
420 let reader = Reader::new(&mut invalid)?;
421 for value in reader {
422 assert!(value.is_err());
423 }
424
425 Ok(())
426 }
427
428 #[test]
429 fn test_avro_3405_read_user_metadata_success() -> TestResult {
430 use crate::writer::Writer;
431
432 let schema = Schema::parse_str(SCHEMA)?;
433 let mut writer = Writer::new(&schema, Vec::new())?;
434
435 let mut user_meta_data: HashMap<String, Vec<u8>> = HashMap::new();
436 user_meta_data.insert(
437 "stringKey".to_string(),
438 "stringValue".to_string().into_bytes(),
439 );
440 user_meta_data.insert("bytesKey".to_string(), b"bytesValue".to_vec());
441 user_meta_data.insert("vecKey".to_string(), vec![1, 2, 3]);
442
443 for (k, v) in user_meta_data.iter() {
444 writer.add_user_metadata(k.to_string(), v)?;
445 }
446
447 let mut record = Record::new(&schema).unwrap();
448 record.put("a", 27i64);
449 record.put("b", "foo");
450
451 writer.append_value(record.clone())?;
452 writer.append_value(record.clone())?;
453 writer.flush()?;
454 let result = writer.into_inner()?;
455
456 let reader = Reader::new(&result[..])?;
457 assert_eq!(reader.user_metadata(), &user_meta_data);
458
459 Ok(())
460 }
461
462 #[cfg(not(feature = "snappy"))]
463 #[test]
464 fn test_avro_3549_read_not_enabled_codec() {
465 let snappy_compressed_avro = vec![
466 79, 98, 106, 1, 4, 22, 97, 118, 114, 111, 46, 115, 99, 104, 101, 109, 97, 210, 1, 123,
467 34, 102, 105, 101, 108, 100, 115, 34, 58, 91, 123, 34, 110, 97, 109, 101, 34, 58, 34,
468 110, 117, 109, 34, 44, 34, 116, 121, 112, 101, 34, 58, 34, 115, 116, 114, 105, 110,
469 103, 34, 125, 93, 44, 34, 110, 97, 109, 101, 34, 58, 34, 101, 118, 101, 110, 116, 34,
470 44, 34, 110, 97, 109, 101, 115, 112, 97, 99, 101, 34, 58, 34, 101, 120, 97, 109, 112,
471 108, 101, 110, 97, 109, 101, 115, 112, 97, 99, 101, 34, 44, 34, 116, 121, 112, 101, 34,
472 58, 34, 114, 101, 99, 111, 114, 100, 34, 125, 20, 97, 118, 114, 111, 46, 99, 111, 100,
473 101, 99, 12, 115, 110, 97, 112, 112, 121, 0, 213, 209, 241, 208, 200, 110, 164, 47,
474 203, 25, 90, 235, 161, 167, 195, 177, 2, 20, 4, 12, 6, 49, 50, 51, 115, 38, 58, 0, 213,
475 209, 241, 208, 200, 110, 164, 47, 203, 25, 90, 235, 161, 167, 195, 177,
476 ];
477
478 if let Err(err) = Reader::new(snappy_compressed_avro.as_slice()) {
479 assert_eq!("Codec 'snappy' is not supported/enabled", err.to_string());
480 } else {
481 panic!("Expected an error in the reading of the codec!");
482 }
483 }
484}