1mod block;
21pub mod datum;
22pub mod single_object;
23
24use crate::{AvroResult, schema::Schema, types::Value};
25use block::Block;
26use bon::bon;
27use std::{collections::HashMap, io::Read};
28
29pub struct Reader<'a, R> {
45 block: Block<'a, R>,
46 reader_schema: Option<&'a Schema>,
47 errored: bool,
48 should_resolve_schema: bool,
49}
50
51#[bon]
52impl<'a, R: Read> Reader<'a, R> {
53 pub fn new(reader: R) -> AvroResult<Reader<'a, R>> {
58 Reader::builder(reader).build()
59 }
60
61 #[builder(finish_fn = build)]
67 pub fn builder(
68 #[builder(start_fn)] reader: R,
69 reader_schema: Option<&'a Schema>,
70 schemata: Option<Vec<&'a Schema>>,
71 ) -> AvroResult<Reader<'a, R>> {
72 let schemata =
73 schemata.unwrap_or_else(|| reader_schema.map(|rs| vec![rs]).unwrap_or_default());
74
75 let block = Block::new(reader, schemata)?;
76 let mut reader = Reader {
77 block,
78 reader_schema,
79 errored: false,
80 should_resolve_schema: false,
81 };
82 reader.should_resolve_schema =
84 reader_schema.is_some_and(|reader_schema| reader.writer_schema() != reader_schema);
85 Ok(reader)
86 }
87
88 #[inline]
90 pub fn writer_schema(&self) -> &Schema {
91 &self.block.writer_schema
92 }
93
94 #[inline]
96 pub fn reader_schema(&self) -> Option<&Schema> {
97 self.reader_schema
98 }
99
100 #[inline]
102 pub fn user_metadata(&self) -> &HashMap<String, Vec<u8>> {
103 &self.block.user_metadata
104 }
105
106 #[inline]
107 fn read_next(&mut self) -> AvroResult<Option<Value>> {
108 let read_schema = if self.should_resolve_schema {
109 self.reader_schema
110 } else {
111 None
112 };
113
114 self.block.read_next(read_schema)
115 }
116}
117
118impl<R: Read> Iterator for Reader<'_, R> {
119 type Item = AvroResult<Value>;
120
121 fn next(&mut self) -> Option<Self::Item> {
122 if self.errored {
124 return None;
125 };
126 match self.read_next() {
127 Ok(opt) => opt.map(Ok),
128 Err(e) => {
129 self.errored = true;
130 Some(Err(e))
131 }
132 }
133 }
134}
135
136pub fn read_marker(bytes: &[u8]) -> [u8; 16] {
138 assert!(
139 bytes.len() > 16,
140 "The bytes are too short to read a marker from them"
141 );
142 let mut marker = [0_u8; 16];
143 marker.clone_from_slice(&bytes[(bytes.len() - 16)..]);
144 marker
145}
146
147#[cfg(test)]
148mod tests {
149 use super::*;
150 use crate::types::Record;
151 use apache_avro_test_helper::TestResult;
152 use pretty_assertions::assert_eq;
153 use std::io::Cursor;
154
155 const SCHEMA: &str = r#"
156 {
157 "type": "record",
158 "name": "test",
159 "fields": [
160 {
161 "name": "a",
162 "type": "long",
163 "default": 42
164 },
165 {
166 "name": "b",
167 "type": "string"
168 }
169 ]
170 }
171 "#;
172 const ENCODED: &[u8] = &[
173 79u8, 98u8, 106u8, 1u8, 4u8, 22u8, 97u8, 118u8, 114u8, 111u8, 46u8, 115u8, 99u8, 104u8,
174 101u8, 109u8, 97u8, 222u8, 1u8, 123u8, 34u8, 116u8, 121u8, 112u8, 101u8, 34u8, 58u8, 34u8,
175 114u8, 101u8, 99u8, 111u8, 114u8, 100u8, 34u8, 44u8, 34u8, 110u8, 97u8, 109u8, 101u8, 34u8,
176 58u8, 34u8, 116u8, 101u8, 115u8, 116u8, 34u8, 44u8, 34u8, 102u8, 105u8, 101u8, 108u8,
177 100u8, 115u8, 34u8, 58u8, 91u8, 123u8, 34u8, 110u8, 97u8, 109u8, 101u8, 34u8, 58u8, 34u8,
178 97u8, 34u8, 44u8, 34u8, 116u8, 121u8, 112u8, 101u8, 34u8, 58u8, 34u8, 108u8, 111u8, 110u8,
179 103u8, 34u8, 44u8, 34u8, 100u8, 101u8, 102u8, 97u8, 117u8, 108u8, 116u8, 34u8, 58u8, 52u8,
180 50u8, 125u8, 44u8, 123u8, 34u8, 110u8, 97u8, 109u8, 101u8, 34u8, 58u8, 34u8, 98u8, 34u8,
181 44u8, 34u8, 116u8, 121u8, 112u8, 101u8, 34u8, 58u8, 34u8, 115u8, 116u8, 114u8, 105u8,
182 110u8, 103u8, 34u8, 125u8, 93u8, 125u8, 20u8, 97u8, 118u8, 114u8, 111u8, 46u8, 99u8, 111u8,
183 100u8, 101u8, 99u8, 8u8, 110u8, 117u8, 108u8, 108u8, 0u8, 94u8, 61u8, 54u8, 221u8, 190u8,
184 207u8, 108u8, 180u8, 158u8, 57u8, 114u8, 40u8, 173u8, 199u8, 228u8, 239u8, 4u8, 20u8, 54u8,
185 6u8, 102u8, 111u8, 111u8, 84u8, 6u8, 98u8, 97u8, 114u8, 94u8, 61u8, 54u8, 221u8, 190u8,
186 207u8, 108u8, 180u8, 158u8, 57u8, 114u8, 40u8, 173u8, 199u8, 228u8, 239u8,
187 ];
188
189 #[test]
190 fn test_reader_iterator() -> TestResult {
191 let schema = Schema::parse_str(SCHEMA)?;
192 let reader = Reader::builder(ENCODED).reader_schema(&schema).build()?;
193
194 let mut record1 = Record::new(&schema).unwrap();
195 record1.put("a", 27i64);
196 record1.put("b", "foo");
197
198 let mut record2 = Record::new(&schema).unwrap();
199 record2.put("a", 42i64);
200 record2.put("b", "bar");
201
202 let expected = [record1.into(), record2.into()];
203
204 for (i, value) in reader.enumerate() {
205 assert_eq!(value?, expected[i]);
206 }
207
208 Ok(())
209 }
210
211 #[test]
212 fn test_reader_invalid_header() -> TestResult {
213 let schema = Schema::parse_str(SCHEMA)?;
214 let mut invalid = &ENCODED[1..];
215 assert!(
216 Reader::builder(&mut invalid)
217 .reader_schema(&schema)
218 .build()
219 .is_err()
220 );
221
222 Ok(())
223 }
224
225 #[test]
226 fn test_reader_invalid_block() -> TestResult {
227 let schema = Schema::parse_str(SCHEMA)?;
228 let mut invalid = &ENCODED[0..ENCODED.len() - 19];
229 let reader = Reader::builder(&mut invalid)
230 .reader_schema(&schema)
231 .build()?;
232 for value in reader {
233 assert!(value.is_err());
234 }
235
236 Ok(())
237 }
238
239 #[test]
240 fn test_reader_empty_buffer() -> TestResult {
241 let empty = Cursor::new(Vec::new());
242 assert!(Reader::new(empty).is_err());
243
244 Ok(())
245 }
246
247 #[test]
248 fn test_reader_only_header() -> TestResult {
249 let mut invalid = &ENCODED[..165];
250 let reader = Reader::new(&mut invalid)?;
251 for value in reader {
252 assert!(value.is_err());
253 }
254
255 Ok(())
256 }
257
258 #[test]
259 fn test_avro_3405_read_user_metadata_success() -> TestResult {
260 use crate::writer::Writer;
261
262 let schema = Schema::parse_str(SCHEMA)?;
263 let mut writer = Writer::new(&schema, Vec::new())?;
264
265 let mut user_meta_data: HashMap<String, Vec<u8>> = HashMap::new();
266 user_meta_data.insert(
267 "stringKey".to_string(),
268 "stringValue".to_string().into_bytes(),
269 );
270 user_meta_data.insert("bytesKey".to_string(), b"bytesValue".to_vec());
271 user_meta_data.insert("vecKey".to_string(), vec![1, 2, 3]);
272
273 for (k, v) in user_meta_data.iter() {
274 writer.add_user_metadata(k.to_string(), v)?;
275 }
276
277 let mut record = Record::new(&schema).unwrap();
278 record.put("a", 27i64);
279 record.put("b", "foo");
280
281 writer.append_value(record.clone())?;
282 writer.append_value(record.clone())?;
283 writer.flush()?;
284 let result = writer.into_inner()?;
285
286 let reader = Reader::new(&result[..])?;
287 assert_eq!(reader.user_metadata(), &user_meta_data);
288
289 Ok(())
290 }
291
292 #[cfg(not(feature = "snappy"))]
293 #[test]
294 fn test_avro_3549_read_not_enabled_codec() {
295 let snappy_compressed_avro = vec![
296 79, 98, 106, 1, 4, 22, 97, 118, 114, 111, 46, 115, 99, 104, 101, 109, 97, 210, 1, 123,
297 34, 102, 105, 101, 108, 100, 115, 34, 58, 91, 123, 34, 110, 97, 109, 101, 34, 58, 34,
298 110, 117, 109, 34, 44, 34, 116, 121, 112, 101, 34, 58, 34, 115, 116, 114, 105, 110,
299 103, 34, 125, 93, 44, 34, 110, 97, 109, 101, 34, 58, 34, 101, 118, 101, 110, 116, 34,
300 44, 34, 110, 97, 109, 101, 115, 112, 97, 99, 101, 34, 58, 34, 101, 120, 97, 109, 112,
301 108, 101, 110, 97, 109, 101, 115, 112, 97, 99, 101, 34, 44, 34, 116, 121, 112, 101, 34,
302 58, 34, 114, 101, 99, 111, 114, 100, 34, 125, 20, 97, 118, 114, 111, 46, 99, 111, 100,
303 101, 99, 12, 115, 110, 97, 112, 112, 121, 0, 213, 209, 241, 208, 200, 110, 164, 47,
304 203, 25, 90, 235, 161, 167, 195, 177, 2, 20, 4, 12, 6, 49, 50, 51, 115, 38, 58, 0, 213,
305 209, 241, 208, 200, 110, 164, 47, 203, 25, 90, 235, 161, 167, 195, 177,
306 ];
307
308 if let Err(err) = Reader::new(snappy_compressed_avro.as_slice()) {
309 assert_eq!("Codec 'snappy' is not supported/enabled", err.to_string());
310 } else {
311 panic!("Expected an error in the reading of the codec!");
312 }
313 }
314}