1use std::{io::Read, marker::PhantomData};
19
20use bon::bon;
21use serde::de::DeserializeOwned;
22
23use crate::{
24 AvroResult, AvroSchema, Schema,
25 decode::decode_internal,
26 error::Details,
27 headers::{HeaderBuilder, RabinFingerprintHeader},
28 schema::ResolvedOwnedSchema,
29 serde::deser_schema::{Config, SchemaAwareDeserializer},
30 types::Value,
31 util::is_human_readable,
32};
33
34pub struct GenericSingleObjectReader {
35 write_schema: ResolvedOwnedSchema,
36 expected_header: Vec<u8>,
37 human_readable: bool,
38}
39
40#[bon]
41impl GenericSingleObjectReader {
42 #[builder]
43 pub fn new(
44 schema: Schema,
45 #[builder(default = RabinFingerprintHeader::from_schema(&schema).build_header())]
47 header: Vec<u8>,
48 #[builder(default = is_human_readable())]
50 human_readable: bool,
51 ) -> AvroResult<GenericSingleObjectReader> {
52 Ok(Self {
53 write_schema: schema.try_into()?,
54 expected_header: header,
55 human_readable,
56 })
57 }
58}
59
60impl GenericSingleObjectReader {
61 pub fn read_value<R: Read>(&self, reader: &mut R) -> AvroResult<Value> {
62 self.read_header(reader)?;
63 decode_internal(
64 self.write_schema.get_root_schema(),
65 self.write_schema.get_names(),
66 None,
67 reader,
68 )
69 }
70
71 pub fn read_deser<T: DeserializeOwned>(&self, reader: &mut impl Read) -> AvroResult<T> {
72 self.read_header(reader)?;
73 let config = Config {
74 names: self.write_schema.get_names(),
75 human_readable: self.human_readable,
76 };
77 T::deserialize(SchemaAwareDeserializer::new(
78 reader,
79 self.write_schema.get_root_schema(),
80 config,
81 )?)
82 }
83
84 fn read_header(&self, reader: &mut impl Read) -> AvroResult<()> {
85 let mut header = vec![0; self.expected_header.len()];
86 reader
87 .read_exact(&mut header)
88 .map_err(Details::ReadHeader)?;
89 if self.expected_header == header {
90 Ok(())
91 } else {
92 Err(Details::SingleObjectHeaderMismatch(self.expected_header.clone(), header).into())
93 }
94 }
95}
96
97pub struct SpecificSingleObjectReader<T>
98where
99 T: AvroSchema,
100{
101 inner: GenericSingleObjectReader,
102 _model: PhantomData<T>,
103}
104
105impl<T> SpecificSingleObjectReader<T>
106where
107 T: AvroSchema,
108{
109 pub fn new() -> AvroResult<SpecificSingleObjectReader<T>> {
110 Ok(SpecificSingleObjectReader {
111 inner: GenericSingleObjectReader::builder()
112 .schema(T::get_schema())
113 .build()?,
114 _model: PhantomData,
115 })
116 }
117}
118
119impl<T> SpecificSingleObjectReader<T>
120where
121 T: AvroSchema + From<Value>,
122{
123 pub fn read_from_value<R: Read>(&self, reader: &mut R) -> AvroResult<T> {
124 self.inner.read_value(reader).map(|v| v.into())
125 }
126}
127
128impl<T> SpecificSingleObjectReader<T>
129where
130 T: AvroSchema + DeserializeOwned,
131{
132 pub fn read<R: Read>(&self, reader: &mut R) -> AvroResult<T> {
133 self.inner.read_deser(reader)
134 }
135}
136
137#[cfg(test)]
138mod tests {
139 use apache_avro_test_helper::TestResult;
140 use serde::Deserialize;
141 use uuid::Uuid;
142
143 use super::*;
144 use crate::{AvroSchema, Schema, encode::encode, headers::GlueSchemaUuidHeader, rabin::Rabin};
145
146 #[derive(Deserialize, Clone, PartialEq, Debug)]
147 struct TestSingleObjectReader {
148 a: i64,
149 b: f64,
150 c: Vec<String>,
151 }
152
153 impl AvroSchema for TestSingleObjectReader {
154 fn get_schema() -> Schema {
155 let schema = r#"
156 {
157 "type":"record",
158 "name":"TestSingleObjectReader",
159 "fields":[
160 {
161 "name":"a",
162 "type":"long"
163 },
164 {
165 "name":"b",
166 "type":"double"
167 },
168 {
169 "name":"c",
170 "type":{
171 "type":"array",
172 "items":"string"
173 }
174 }
175 ]
176 }
177 "#;
178 Schema::parse_str(schema).unwrap()
179 }
180 }
181
182 impl From<Value> for TestSingleObjectReader {
183 fn from(obj: Value) -> TestSingleObjectReader {
184 if let Value::Record(fields) = obj {
185 let mut a = None;
186 let mut b = None;
187 let mut c = vec![];
188 for (field_name, v) in fields {
189 match (field_name.as_str(), v) {
190 ("a", Value::Long(i)) => a = Some(i),
191 ("b", Value::Double(d)) => b = Some(d),
192 ("c", Value::Array(v)) => {
193 for inner_val in v {
194 if let Value::String(s) = inner_val {
195 c.push(s);
196 }
197 }
198 }
199 (key, value) => panic!("Unexpected pair: {key:?} -> {value:?}"),
200 }
201 }
202 TestSingleObjectReader {
203 a: a.unwrap(),
204 b: b.unwrap(),
205 c,
206 }
207 } else {
208 panic!("Expected a Value::Record but was {obj:?}")
209 }
210 }
211 }
212
213 impl From<TestSingleObjectReader> for Value {
214 fn from(obj: TestSingleObjectReader) -> Value {
215 Value::Record(vec![
216 ("a".into(), obj.a.into()),
217 ("b".into(), obj.b.into()),
218 (
219 "c".into(),
220 Value::Array(obj.c.into_iter().map(|s| s.into()).collect()),
221 ),
222 ])
223 }
224 }
225
226 #[test]
227 fn test_avro_3507_single_object_reader() -> TestResult {
228 let obj = TestSingleObjectReader {
229 a: 42,
230 b: 3.33,
231 c: vec!["cat".into(), "dog".into()],
232 };
233 let mut to_read = Vec::<u8>::new();
234 to_read.extend_from_slice(&[0xC3, 0x01]);
235 to_read.extend_from_slice(
236 &TestSingleObjectReader::get_schema()
237 .fingerprint::<Rabin>()
238 .bytes[..],
239 );
240 encode(
241 &obj.clone().into(),
242 &TestSingleObjectReader::get_schema(),
243 &mut to_read,
244 )
245 .expect("Encode should succeed");
246 let mut to_read = &to_read[..];
247 let generic_reader = GenericSingleObjectReader::builder()
248 .schema(TestSingleObjectReader::get_schema())
249 .build()
250 .expect("Schema should resolve");
251 let val = generic_reader
252 .read_value(&mut to_read)
253 .expect("Should read");
254 let expected_value: Value = obj.into();
255 pretty_assertions::assert_eq!(expected_value, val);
256
257 Ok(())
258 }
259
260 #[test]
261 fn avro_3642_test_single_object_reader_incomplete_reads() -> TestResult {
262 let obj = TestSingleObjectReader {
263 a: 42,
264 b: 3.33,
265 c: vec!["cat".into(), "dog".into()],
266 };
267 let to_read_1 = [0xC3, 0x01];
269 let mut to_read_2 = Vec::<u8>::new();
270 to_read_2.extend_from_slice(
271 &TestSingleObjectReader::get_schema()
272 .fingerprint::<Rabin>()
273 .bytes[..],
274 );
275 let mut to_read_3 = Vec::<u8>::new();
276 encode(
277 &obj.clone().into(),
278 &TestSingleObjectReader::get_schema(),
279 &mut to_read_3,
280 )
281 .expect("Encode should succeed");
282 let mut to_read = (&to_read_1[..]).chain(&to_read_2[..]).chain(&to_read_3[..]);
283 let generic_reader = GenericSingleObjectReader::builder()
284 .schema(TestSingleObjectReader::get_schema())
285 .build()
286 .expect("Schema should resolve");
287 let val = generic_reader
288 .read_value(&mut to_read)
289 .expect("Should read");
290 let expected_value: Value = obj.into();
291 pretty_assertions::assert_eq!(expected_value, val);
292
293 Ok(())
294 }
295
296 #[test]
297 fn test_avro_3507_reader_parity() -> TestResult {
298 let obj = TestSingleObjectReader {
299 a: 42,
300 b: 3.33,
301 c: vec!["cat".into(), "dog".into()],
302 };
303
304 let mut to_read = Vec::<u8>::new();
305 to_read.extend_from_slice(&[0xC3, 0x01]);
306 to_read.extend_from_slice(
307 &TestSingleObjectReader::get_schema()
308 .fingerprint::<Rabin>()
309 .bytes[..],
310 );
311 encode(
312 &obj.clone().into(),
313 &TestSingleObjectReader::get_schema(),
314 &mut to_read,
315 )
316 .expect("Encode should succeed");
317 let generic_reader = GenericSingleObjectReader::builder()
318 .schema(TestSingleObjectReader::get_schema())
319 .build()
320 .expect("Schema should resolve");
321 let specific_reader = SpecificSingleObjectReader::<TestSingleObjectReader>::new()
322 .expect("schema should resolve");
323 let mut to_read1 = &to_read[..];
324 let mut to_read2 = &to_read[..];
325 let mut to_read3 = &to_read[..];
326
327 let val = generic_reader
328 .read_value(&mut to_read1)
329 .expect("Should read");
330 let read_obj1 = specific_reader
331 .read_from_value(&mut to_read2)
332 .expect("Should read from value");
333 let read_obj2 = specific_reader
334 .read(&mut to_read3)
335 .expect("Should read from deserilize");
336 let expected_value: Value = obj.clone().into();
337 pretty_assertions::assert_eq!(obj, read_obj1);
338 pretty_assertions::assert_eq!(obj, read_obj2);
339 pretty_assertions::assert_eq!(val, expected_value);
340
341 Ok(())
342 }
343
344 #[test]
345 fn avro_rs_164_generic_reader_alternate_header() -> TestResult {
346 let schema_uuid = Uuid::parse_str("b2f1cf00-0434-013e-439a-125eb8485a5f")?;
347 let header_builder = GlueSchemaUuidHeader::from_uuid(schema_uuid);
348 let generic_reader = GenericSingleObjectReader::builder()
349 .schema(TestSingleObjectReader::get_schema())
350 .header(header_builder.build_header())
351 .build()
352 .expect("failed to build reader");
353 let data_to_read: Vec<u8> = vec![
355 3, 0, 178, 241, 207, 0, 4, 52, 1, 62, 67, 154, 18, 94, 184, 72, 90, 95, 0, 0, 0, 0, 0,
356 0, 0, 0, 0, 0,
357 ];
358 let mut to_read = &data_to_read[..];
359 let read_result = generic_reader.read_value(&mut to_read)?;
360 assert_eq!(
361 read_result,
362 Value::Record(vec![
363 ("a".to_string(), Value::Long(0)),
364 ("b".to_string(), Value::Double(0.0)),
365 ("c".to_string(), Value::Array(vec![]))
366 ])
367 );
368 Ok(())
369 }
370}