1use crate::decode::decode_internal;
19use crate::error::Details;
20use crate::headers::{HeaderBuilder, RabinFingerprintHeader};
21use crate::schema::ResolvedOwnedSchema;
22use crate::types::Value;
23use crate::{AvroResult, AvroSchema, Schema, from_value};
24use serde::de::DeserializeOwned;
25use std::io::Read;
26use std::marker::PhantomData;
27
28pub struct GenericSingleObjectReader {
29 write_schema: ResolvedOwnedSchema,
30 expected_header: Vec<u8>,
31}
32
33impl GenericSingleObjectReader {
34 pub fn new(schema: Schema) -> AvroResult<GenericSingleObjectReader> {
35 let header_builder = RabinFingerprintHeader::from_schema(&schema);
36 Self::new_with_header_builder(schema, header_builder)
37 }
38
39 pub fn new_with_header_builder<HB: HeaderBuilder>(
40 schema: Schema,
41 header_builder: HB,
42 ) -> AvroResult<GenericSingleObjectReader> {
43 let expected_header = header_builder.build_header();
44 Ok(GenericSingleObjectReader {
45 write_schema: ResolvedOwnedSchema::try_from(schema)?,
46 expected_header,
47 })
48 }
49
50 pub fn read_value<R: Read>(&self, reader: &mut R) -> AvroResult<Value> {
51 let mut header = vec![0; self.expected_header.len()];
52 match reader.read_exact(&mut header) {
53 Ok(_) => {
54 if self.expected_header == header {
55 decode_internal(
56 self.write_schema.get_root_schema(),
57 self.write_schema.get_names(),
58 &None,
59 reader,
60 )
61 } else {
62 Err(
63 Details::SingleObjectHeaderMismatch(self.expected_header.clone(), header)
64 .into(),
65 )
66 }
67 }
68 Err(io_error) => Err(Details::ReadHeader(io_error).into()),
69 }
70 }
71}
72
73pub struct SpecificSingleObjectReader<T>
74where
75 T: AvroSchema,
76{
77 inner: GenericSingleObjectReader,
78 _model: PhantomData<T>,
79}
80
81impl<T> SpecificSingleObjectReader<T>
82where
83 T: AvroSchema,
84{
85 pub fn new() -> AvroResult<SpecificSingleObjectReader<T>> {
86 Ok(SpecificSingleObjectReader {
87 inner: GenericSingleObjectReader::new(T::get_schema())?,
88 _model: PhantomData,
89 })
90 }
91}
92
93impl<T> SpecificSingleObjectReader<T>
94where
95 T: AvroSchema + From<Value>,
96{
97 pub fn read_from_value<R: Read>(&self, reader: &mut R) -> AvroResult<T> {
98 self.inner.read_value(reader).map(|v| v.into())
99 }
100}
101
102impl<T> SpecificSingleObjectReader<T>
103where
104 T: AvroSchema + DeserializeOwned,
105{
106 pub fn read<R: Read>(&self, reader: &mut R) -> AvroResult<T> {
107 from_value::<T>(&self.inner.read_value(reader)?)
108 }
109}
110
111#[cfg(test)]
112mod tests {
113 use super::*;
114 use crate::encode::encode;
115 use crate::headers::GlueSchemaUuidHeader;
116 use crate::rabin::Rabin;
117 use crate::{AvroSchema, Error, Schema};
118 use apache_avro_test_helper::TestResult;
119 use serde::Deserialize;
120 use uuid::Uuid;
121
122 #[derive(Deserialize, Clone, PartialEq, Debug)]
123 struct TestSingleObjectReader {
124 a: i64,
125 b: f64,
126 c: Vec<String>,
127 }
128
129 impl AvroSchema for TestSingleObjectReader {
130 fn get_schema() -> Schema {
131 let schema = r#"
132 {
133 "type":"record",
134 "name":"TestSingleObjectWrtierSerialize",
135 "fields":[
136 {
137 "name":"a",
138 "type":"long"
139 },
140 {
141 "name":"b",
142 "type":"double"
143 },
144 {
145 "name":"c",
146 "type":{
147 "type":"array",
148 "items":"string"
149 }
150 }
151 ]
152 }
153 "#;
154 Schema::parse_str(schema).unwrap()
155 }
156 }
157
158 impl From<Value> for TestSingleObjectReader {
159 fn from(obj: Value) -> TestSingleObjectReader {
160 if let Value::Record(fields) = obj {
161 let mut a = None;
162 let mut b = None;
163 let mut c = vec![];
164 for (field_name, v) in fields {
165 match (field_name.as_str(), v) {
166 ("a", Value::Long(i)) => a = Some(i),
167 ("b", Value::Double(d)) => b = Some(d),
168 ("c", Value::Array(v)) => {
169 for inner_val in v {
170 if let Value::String(s) = inner_val {
171 c.push(s);
172 }
173 }
174 }
175 (key, value) => panic!("Unexpected pair: {key:?} -> {value:?}"),
176 }
177 }
178 TestSingleObjectReader {
179 a: a.unwrap(),
180 b: b.unwrap(),
181 c,
182 }
183 } else {
184 panic!("Expected a Value::Record but was {obj:?}")
185 }
186 }
187 }
188
189 impl From<TestSingleObjectReader> for Value {
190 fn from(obj: TestSingleObjectReader) -> Value {
191 Value::Record(vec![
192 ("a".into(), obj.a.into()),
193 ("b".into(), obj.b.into()),
194 (
195 "c".into(),
196 Value::Array(obj.c.into_iter().map(|s| s.into()).collect()),
197 ),
198 ])
199 }
200 }
201
202 #[test]
203 fn test_avro_3507_single_object_reader() -> TestResult {
204 let obj = TestSingleObjectReader {
205 a: 42,
206 b: 3.33,
207 c: vec!["cat".into(), "dog".into()],
208 };
209 let mut to_read = Vec::<u8>::new();
210 to_read.extend_from_slice(&[0xC3, 0x01]);
211 to_read.extend_from_slice(
212 &TestSingleObjectReader::get_schema()
213 .fingerprint::<Rabin>()
214 .bytes[..],
215 );
216 encode(
217 &obj.clone().into(),
218 &TestSingleObjectReader::get_schema(),
219 &mut to_read,
220 )
221 .expect("Encode should succeed");
222 let mut to_read = &to_read[..];
223 let generic_reader = GenericSingleObjectReader::new(TestSingleObjectReader::get_schema())
224 .expect("Schema should resolve");
225 let val = generic_reader
226 .read_value(&mut to_read)
227 .expect("Should read");
228 let expected_value: Value = obj.into();
229 pretty_assertions::assert_eq!(expected_value, val);
230
231 Ok(())
232 }
233
234 #[test]
235 fn avro_3642_test_single_object_reader_incomplete_reads() -> TestResult {
236 let obj = TestSingleObjectReader {
237 a: 42,
238 b: 3.33,
239 c: vec!["cat".into(), "dog".into()],
240 };
241 let to_read_1 = [0xC3, 0x01];
243 let mut to_read_2 = Vec::<u8>::new();
244 to_read_2.extend_from_slice(
245 &TestSingleObjectReader::get_schema()
246 .fingerprint::<Rabin>()
247 .bytes[..],
248 );
249 let mut to_read_3 = Vec::<u8>::new();
250 encode(
251 &obj.clone().into(),
252 &TestSingleObjectReader::get_schema(),
253 &mut to_read_3,
254 )
255 .expect("Encode should succeed");
256 let mut to_read = (&to_read_1[..]).chain(&to_read_2[..]).chain(&to_read_3[..]);
257 let generic_reader = GenericSingleObjectReader::new(TestSingleObjectReader::get_schema())
258 .expect("Schema should resolve");
259 let val = generic_reader
260 .read_value(&mut to_read)
261 .expect("Should read");
262 let expected_value: Value = obj.into();
263 pretty_assertions::assert_eq!(expected_value, val);
264
265 Ok(())
266 }
267
268 #[test]
269 fn test_avro_3507_reader_parity() -> TestResult {
270 let obj = TestSingleObjectReader {
271 a: 42,
272 b: 3.33,
273 c: vec!["cat".into(), "dog".into()],
274 };
275
276 let mut to_read = Vec::<u8>::new();
277 to_read.extend_from_slice(&[0xC3, 0x01]);
278 to_read.extend_from_slice(
279 &TestSingleObjectReader::get_schema()
280 .fingerprint::<Rabin>()
281 .bytes[..],
282 );
283 encode(
284 &obj.clone().into(),
285 &TestSingleObjectReader::get_schema(),
286 &mut to_read,
287 )
288 .expect("Encode should succeed");
289 let generic_reader = GenericSingleObjectReader::new(TestSingleObjectReader::get_schema())
290 .expect("Schema should resolve");
291 let specific_reader = SpecificSingleObjectReader::<TestSingleObjectReader>::new()
292 .expect("schema should resolve");
293 let mut to_read1 = &to_read[..];
294 let mut to_read2 = &to_read[..];
295 let mut to_read3 = &to_read[..];
296
297 let val = generic_reader
298 .read_value(&mut to_read1)
299 .expect("Should read");
300 let read_obj1 = specific_reader
301 .read_from_value(&mut to_read2)
302 .expect("Should read from value");
303 let read_obj2 = specific_reader
304 .read(&mut to_read3)
305 .expect("Should read from deserilize");
306 let expected_value: Value = obj.clone().into();
307 pretty_assertions::assert_eq!(obj, read_obj1);
308 pretty_assertions::assert_eq!(obj, read_obj2);
309 pretty_assertions::assert_eq!(val, expected_value);
310
311 Ok(())
312 }
313
314 #[test]
315 fn avro_rs_164_generic_reader_alternate_header() -> TestResult {
316 let schema_uuid = Uuid::parse_str("b2f1cf00-0434-013e-439a-125eb8485a5f")?;
317 let header_builder = GlueSchemaUuidHeader::from_uuid(schema_uuid);
318 let generic_reader = GenericSingleObjectReader::new_with_header_builder(
319 TestSingleObjectReader::get_schema(),
320 header_builder,
321 )
322 .expect("failed to build reader");
323 let data_to_read: Vec<u8> = vec![
324 3, 0, 178, 241, 207, 0, 4, 52, 1, 62, 67, 154, 18, 94, 184, 72, 90, 95,
325 ];
326 let mut to_read = &data_to_read[..];
327 let read_result = generic_reader
328 .read_value(&mut to_read)
329 .map_err(Error::into_details);
330 matches!(read_result, Err(Details::ReadBytes(_)));
331 Ok(())
332 }
333}