1mod block;
21pub mod datum;
22pub mod single_object;
23
24use std::{collections::HashMap, io::Read, marker::PhantomData};
25
26use block::Block;
27use bon::bon;
28use serde::de::DeserializeOwned;
29
30use crate::{AvroResult, schema::Schema, types::Value, util::is_human_readable};
31
32pub struct Reader<'a, R> {
48 block: Block<'a, R>,
49 reader_schema: Option<&'a Schema>,
50 errored: bool,
51 should_resolve_schema: bool,
52}
53
54#[bon]
55impl<'a, R: Read> Reader<'a, R> {
56 pub fn new(reader: R) -> AvroResult<Reader<'a, R>> {
61 Reader::builder(reader).build()
62 }
63
64 #[builder(finish_fn = build)]
70 pub fn builder(
71 #[builder(start_fn)] reader: R,
72 reader_schema: Option<&'a Schema>,
77 schemata: Option<Vec<&'a Schema>>,
78 #[builder(default = is_human_readable())] human_readable: bool,
79 ) -> AvroResult<Reader<'a, R>> {
80 let schemata =
81 schemata.unwrap_or_else(|| reader_schema.map(|rs| vec![rs]).unwrap_or_default());
82
83 let block = Block::new(reader, schemata, human_readable)?;
84 let mut reader = Reader {
85 block,
86 reader_schema,
87 errored: false,
88 should_resolve_schema: false,
89 };
90 reader.should_resolve_schema =
92 reader_schema.is_some_and(|reader_schema| reader.writer_schema() != reader_schema);
93 Ok(reader)
94 }
95
96 #[inline]
98 pub fn writer_schema(&self) -> &Schema {
99 &self.block.writer_schema
100 }
101
102 #[inline]
104 pub fn reader_schema(&self) -> Option<&Schema> {
105 self.reader_schema
106 }
107
108 #[inline]
110 pub fn user_metadata(&self) -> &HashMap<String, Vec<u8>> {
111 &self.block.user_metadata
112 }
113
114 pub fn into_deser_iter<T: DeserializeOwned>(self) -> ReaderDeser<'a, R, T> {
116 ReaderDeser {
117 inner: self,
118 phantom: PhantomData,
119 }
120 }
121
122 #[inline]
123 fn read_next(&mut self) -> AvroResult<Option<Value>> {
124 let read_schema = if self.should_resolve_schema {
125 self.reader_schema
126 } else {
127 None
128 };
129
130 self.block.read_next(read_schema)
131 }
132
133 fn read_next_deser<T: DeserializeOwned>(&mut self) -> AvroResult<Option<T>> {
134 assert!(
136 !self.should_resolve_schema,
137 "Schema aware deserialisation does not resolve schemas yet"
138 );
139
140 self.block.read_next_deser(self.reader_schema)
141 }
142}
143
144impl<R: Read> Iterator for Reader<'_, R> {
145 type Item = AvroResult<Value>;
146
147 fn next(&mut self) -> Option<Self::Item> {
148 if self.errored {
150 return None;
151 };
152 match self.read_next() {
153 Ok(opt) => opt.map(Ok),
154 Err(e) => {
155 self.errored = true;
156 Some(Err(e))
157 }
158 }
159 }
160}
161
162pub struct ReaderDeser<'a, R, T> {
164 inner: Reader<'a, R>,
165 phantom: PhantomData<T>,
166}
167
168impl<R: Read, T: DeserializeOwned> Iterator for ReaderDeser<'_, R, T> {
169 type Item = AvroResult<T>;
170
171 fn next(&mut self) -> Option<Self::Item> {
172 if self.inner.errored {
174 return None;
175 }
176 match self.inner.read_next_deser::<T>() {
177 Ok(opt) => opt.map(Ok),
178 Err(e) => {
179 self.inner.errored = true;
180 Some(Err(e))
181 }
182 }
183 }
184}
185
186pub fn read_marker(bytes: &[u8]) -> [u8; 16] {
191 assert!(
192 bytes.len() > 16,
193 "The bytes are too short to read a marker from them"
194 );
195 let mut marker = [0_u8; 16];
196 marker.clone_from_slice(&bytes[(bytes.len() - 16)..]);
197 marker
198}
199
200#[cfg(test)]
201mod tests {
202 use std::io::Cursor;
203
204 use apache_avro_test_helper::TestResult;
205 use pretty_assertions::assert_eq;
206
207 use super::*;
208 use crate::types::Record;
209
210 const SCHEMA: &str = r#"
211 {
212 "type": "record",
213 "name": "test",
214 "fields": [
215 {
216 "name": "a",
217 "type": "long",
218 "default": 42
219 },
220 {
221 "name": "b",
222 "type": "string"
223 }
224 ]
225 }
226 "#;
227 const ENCODED: &[u8] = &[
228 79u8, 98u8, 106u8, 1u8, 4u8, 22u8, 97u8, 118u8, 114u8, 111u8, 46u8, 115u8, 99u8, 104u8,
229 101u8, 109u8, 97u8, 222u8, 1u8, 123u8, 34u8, 116u8, 121u8, 112u8, 101u8, 34u8, 58u8, 34u8,
230 114u8, 101u8, 99u8, 111u8, 114u8, 100u8, 34u8, 44u8, 34u8, 110u8, 97u8, 109u8, 101u8, 34u8,
231 58u8, 34u8, 116u8, 101u8, 115u8, 116u8, 34u8, 44u8, 34u8, 102u8, 105u8, 101u8, 108u8,
232 100u8, 115u8, 34u8, 58u8, 91u8, 123u8, 34u8, 110u8, 97u8, 109u8, 101u8, 34u8, 58u8, 34u8,
233 97u8, 34u8, 44u8, 34u8, 116u8, 121u8, 112u8, 101u8, 34u8, 58u8, 34u8, 108u8, 111u8, 110u8,
234 103u8, 34u8, 44u8, 34u8, 100u8, 101u8, 102u8, 97u8, 117u8, 108u8, 116u8, 34u8, 58u8, 52u8,
235 50u8, 125u8, 44u8, 123u8, 34u8, 110u8, 97u8, 109u8, 101u8, 34u8, 58u8, 34u8, 98u8, 34u8,
236 44u8, 34u8, 116u8, 121u8, 112u8, 101u8, 34u8, 58u8, 34u8, 115u8, 116u8, 114u8, 105u8,
237 110u8, 103u8, 34u8, 125u8, 93u8, 125u8, 20u8, 97u8, 118u8, 114u8, 111u8, 46u8, 99u8, 111u8,
238 100u8, 101u8, 99u8, 8u8, 110u8, 117u8, 108u8, 108u8, 0u8, 94u8, 61u8, 54u8, 221u8, 190u8,
239 207u8, 108u8, 180u8, 158u8, 57u8, 114u8, 40u8, 173u8, 199u8, 228u8, 239u8, 4u8, 20u8, 54u8,
240 6u8, 102u8, 111u8, 111u8, 84u8, 6u8, 98u8, 97u8, 114u8, 94u8, 61u8, 54u8, 221u8, 190u8,
241 207u8, 108u8, 180u8, 158u8, 57u8, 114u8, 40u8, 173u8, 199u8, 228u8, 239u8,
242 ];
243
244 #[test]
245 fn test_reader_iterator() -> TestResult {
246 let schema = Schema::parse_str(SCHEMA)?;
247 let reader = Reader::builder(ENCODED).reader_schema(&schema).build()?;
248
249 let mut record1 = Record::new(&schema).unwrap();
250 record1.put("a", 27i64);
251 record1.put("b", "foo");
252
253 let mut record2 = Record::new(&schema).unwrap();
254 record2.put("a", 42i64);
255 record2.put("b", "bar");
256
257 let expected = [record1.into(), record2.into()];
258
259 for (i, value) in reader.enumerate() {
260 assert_eq!(value?, expected[i]);
261 }
262
263 Ok(())
264 }
265
266 #[test]
267 fn test_reader_invalid_header() -> TestResult {
268 let schema = Schema::parse_str(SCHEMA)?;
269 let mut invalid = &ENCODED[1..];
270 assert!(
271 Reader::builder(&mut invalid)
272 .reader_schema(&schema)
273 .build()
274 .is_err()
275 );
276
277 Ok(())
278 }
279
280 #[test]
281 fn test_reader_invalid_block() -> TestResult {
282 let schema = Schema::parse_str(SCHEMA)?;
283 let mut invalid = &ENCODED[0..ENCODED.len() - 19];
284 let reader = Reader::builder(&mut invalid)
285 .reader_schema(&schema)
286 .build()?;
287 for value in reader {
288 assert!(value.is_err());
289 }
290
291 Ok(())
292 }
293
294 #[test]
295 fn test_reader_empty_buffer() -> TestResult {
296 let empty = Cursor::new(Vec::new());
297 assert!(Reader::new(empty).is_err());
298
299 Ok(())
300 }
301
302 #[test]
303 fn test_reader_only_header() -> TestResult {
304 let mut invalid = &ENCODED[..165];
305 let reader = Reader::new(&mut invalid)?;
306 for value in reader {
307 assert!(value.is_err());
308 }
309
310 Ok(())
311 }
312
313 #[test]
314 fn test_avro_3405_read_user_metadata_success() -> TestResult {
315 use crate::writer::Writer;
316
317 let schema = Schema::parse_str(SCHEMA)?;
318 let mut writer = Writer::new(&schema, Vec::new())?;
319
320 let mut user_meta_data: HashMap<String, Vec<u8>> = HashMap::new();
321 user_meta_data.insert(
322 "stringKey".to_string(),
323 "stringValue".to_string().into_bytes(),
324 );
325 user_meta_data.insert("bytesKey".to_string(), b"bytesValue".to_vec());
326 user_meta_data.insert("vecKey".to_string(), vec![1, 2, 3]);
327
328 for (k, v) in user_meta_data.iter() {
329 writer.add_user_metadata(k.to_string(), v)?;
330 }
331
332 let mut record = Record::new(&schema).unwrap();
333 record.put("a", 27i64);
334 record.put("b", "foo");
335
336 writer.append_value(record.clone())?;
337 writer.append_value(record.clone())?;
338 writer.flush()?;
339 let result = writer.into_inner()?;
340
341 let reader = Reader::new(&result[..])?;
342 assert_eq!(reader.user_metadata(), &user_meta_data);
343
344 Ok(())
345 }
346
347 #[cfg(not(feature = "snappy"))]
348 #[test]
349 fn test_avro_3549_read_not_enabled_codec() {
350 let snappy_compressed_avro = vec![
351 79, 98, 106, 1, 4, 22, 97, 118, 114, 111, 46, 115, 99, 104, 101, 109, 97, 210, 1, 123,
352 34, 102, 105, 101, 108, 100, 115, 34, 58, 91, 123, 34, 110, 97, 109, 101, 34, 58, 34,
353 110, 117, 109, 34, 44, 34, 116, 121, 112, 101, 34, 58, 34, 115, 116, 114, 105, 110,
354 103, 34, 125, 93, 44, 34, 110, 97, 109, 101, 34, 58, 34, 101, 118, 101, 110, 116, 34,
355 44, 34, 110, 97, 109, 101, 115, 112, 97, 99, 101, 34, 58, 34, 101, 120, 97, 109, 112,
356 108, 101, 110, 97, 109, 101, 115, 112, 97, 99, 101, 34, 44, 34, 116, 121, 112, 101, 34,
357 58, 34, 114, 101, 99, 111, 114, 100, 34, 125, 20, 97, 118, 114, 111, 46, 99, 111, 100,
358 101, 99, 12, 115, 110, 97, 112, 112, 121, 0, 213, 209, 241, 208, 200, 110, 164, 47,
359 203, 25, 90, 235, 161, 167, 195, 177, 2, 20, 4, 12, 6, 49, 50, 51, 115, 38, 58, 0, 213,
360 209, 241, 208, 200, 110, 164, 47, 203, 25, 90, 235, 161, 167, 195, 177,
361 ];
362
363 if let Err(err) = Reader::new(snappy_compressed_avro.as_slice()) {
364 assert_eq!("Codec 'snappy' is not supported/enabled", err.to_string());
365 } else {
366 panic!("Expected an error in the reading of the codec!");
367 }
368 }
369}