apache_avro/reader/
single_object.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::decode::decode_internal;
19use crate::error::Details;
20use crate::headers::{HeaderBuilder, RabinFingerprintHeader};
21use crate::schema::ResolvedOwnedSchema;
22use crate::types::Value;
23use crate::{AvroResult, AvroSchema, Schema, from_value};
24use serde::de::DeserializeOwned;
25use std::io::Read;
26use std::marker::PhantomData;
27
28pub struct GenericSingleObjectReader {
29    write_schema: ResolvedOwnedSchema,
30    expected_header: Vec<u8>,
31}
32
33impl GenericSingleObjectReader {
34    pub fn new(schema: Schema) -> AvroResult<GenericSingleObjectReader> {
35        let header_builder = RabinFingerprintHeader::from_schema(&schema);
36        Self::new_with_header_builder(schema, header_builder)
37    }
38
39    pub fn new_with_header_builder<HB: HeaderBuilder>(
40        schema: Schema,
41        header_builder: HB,
42    ) -> AvroResult<GenericSingleObjectReader> {
43        let expected_header = header_builder.build_header();
44        Ok(GenericSingleObjectReader {
45            write_schema: ResolvedOwnedSchema::try_from(schema)?,
46            expected_header,
47        })
48    }
49
50    pub fn read_value<R: Read>(&self, reader: &mut R) -> AvroResult<Value> {
51        let mut header = vec![0; self.expected_header.len()];
52        match reader.read_exact(&mut header) {
53            Ok(_) => {
54                if self.expected_header == header {
55                    decode_internal(
56                        self.write_schema.get_root_schema(),
57                        self.write_schema.get_names(),
58                        &None,
59                        reader,
60                    )
61                } else {
62                    Err(
63                        Details::SingleObjectHeaderMismatch(self.expected_header.clone(), header)
64                            .into(),
65                    )
66                }
67            }
68            Err(io_error) => Err(Details::ReadHeader(io_error).into()),
69        }
70    }
71}
72
73pub struct SpecificSingleObjectReader<T>
74where
75    T: AvroSchema,
76{
77    inner: GenericSingleObjectReader,
78    _model: PhantomData<T>,
79}
80
81impl<T> SpecificSingleObjectReader<T>
82where
83    T: AvroSchema,
84{
85    pub fn new() -> AvroResult<SpecificSingleObjectReader<T>> {
86        Ok(SpecificSingleObjectReader {
87            inner: GenericSingleObjectReader::new(T::get_schema())?,
88            _model: PhantomData,
89        })
90    }
91}
92
93impl<T> SpecificSingleObjectReader<T>
94where
95    T: AvroSchema + From<Value>,
96{
97    pub fn read_from_value<R: Read>(&self, reader: &mut R) -> AvroResult<T> {
98        self.inner.read_value(reader).map(|v| v.into())
99    }
100}
101
102impl<T> SpecificSingleObjectReader<T>
103where
104    T: AvroSchema + DeserializeOwned,
105{
106    pub fn read<R: Read>(&self, reader: &mut R) -> AvroResult<T> {
107        from_value::<T>(&self.inner.read_value(reader)?)
108    }
109}
110
111#[cfg(test)]
112mod tests {
113    use super::*;
114    use crate::encode::encode;
115    use crate::headers::GlueSchemaUuidHeader;
116    use crate::rabin::Rabin;
117    use crate::{AvroSchema, Error, Schema};
118    use apache_avro_test_helper::TestResult;
119    use serde::Deserialize;
120    use uuid::Uuid;
121
122    #[derive(Deserialize, Clone, PartialEq, Debug)]
123    struct TestSingleObjectReader {
124        a: i64,
125        b: f64,
126        c: Vec<String>,
127    }
128
129    impl AvroSchema for TestSingleObjectReader {
130        fn get_schema() -> Schema {
131            let schema = r#"
132            {
133                "type":"record",
134                "name":"TestSingleObjectWrtierSerialize",
135                "fields":[
136                    {
137                        "name":"a",
138                        "type":"long"
139                    },
140                    {
141                        "name":"b",
142                        "type":"double"
143                    },
144                    {
145                        "name":"c",
146                        "type":{
147                            "type":"array",
148                            "items":"string"
149                        }
150                    }
151                ]
152            }
153            "#;
154            Schema::parse_str(schema).unwrap()
155        }
156    }
157
158    impl From<Value> for TestSingleObjectReader {
159        fn from(obj: Value) -> TestSingleObjectReader {
160            if let Value::Record(fields) = obj {
161                let mut a = None;
162                let mut b = None;
163                let mut c = vec![];
164                for (field_name, v) in fields {
165                    match (field_name.as_str(), v) {
166                        ("a", Value::Long(i)) => a = Some(i),
167                        ("b", Value::Double(d)) => b = Some(d),
168                        ("c", Value::Array(v)) => {
169                            for inner_val in v {
170                                if let Value::String(s) = inner_val {
171                                    c.push(s);
172                                }
173                            }
174                        }
175                        (key, value) => panic!("Unexpected pair: {key:?} -> {value:?}"),
176                    }
177                }
178                TestSingleObjectReader {
179                    a: a.unwrap(),
180                    b: b.unwrap(),
181                    c,
182                }
183            } else {
184                panic!("Expected a Value::Record but was {obj:?}")
185            }
186        }
187    }
188
189    impl From<TestSingleObjectReader> for Value {
190        fn from(obj: TestSingleObjectReader) -> Value {
191            Value::Record(vec![
192                ("a".into(), obj.a.into()),
193                ("b".into(), obj.b.into()),
194                (
195                    "c".into(),
196                    Value::Array(obj.c.into_iter().map(|s| s.into()).collect()),
197                ),
198            ])
199        }
200    }
201
202    #[test]
203    fn test_avro_3507_single_object_reader() -> TestResult {
204        let obj = TestSingleObjectReader {
205            a: 42,
206            b: 3.33,
207            c: vec!["cat".into(), "dog".into()],
208        };
209        let mut to_read = Vec::<u8>::new();
210        to_read.extend_from_slice(&[0xC3, 0x01]);
211        to_read.extend_from_slice(
212            &TestSingleObjectReader::get_schema()
213                .fingerprint::<Rabin>()
214                .bytes[..],
215        );
216        encode(
217            &obj.clone().into(),
218            &TestSingleObjectReader::get_schema(),
219            &mut to_read,
220        )
221        .expect("Encode should succeed");
222        let mut to_read = &to_read[..];
223        let generic_reader = GenericSingleObjectReader::new(TestSingleObjectReader::get_schema())
224            .expect("Schema should resolve");
225        let val = generic_reader
226            .read_value(&mut to_read)
227            .expect("Should read");
228        let expected_value: Value = obj.into();
229        pretty_assertions::assert_eq!(expected_value, val);
230
231        Ok(())
232    }
233
234    #[test]
235    fn avro_3642_test_single_object_reader_incomplete_reads() -> TestResult {
236        let obj = TestSingleObjectReader {
237            a: 42,
238            b: 3.33,
239            c: vec!["cat".into(), "dog".into()],
240        };
241        // The two-byte marker, to show that the message uses this single-record format
242        let to_read_1 = [0xC3, 0x01];
243        let mut to_read_2 = Vec::<u8>::new();
244        to_read_2.extend_from_slice(
245            &TestSingleObjectReader::get_schema()
246                .fingerprint::<Rabin>()
247                .bytes[..],
248        );
249        let mut to_read_3 = Vec::<u8>::new();
250        encode(
251            &obj.clone().into(),
252            &TestSingleObjectReader::get_schema(),
253            &mut to_read_3,
254        )
255        .expect("Encode should succeed");
256        let mut to_read = (&to_read_1[..]).chain(&to_read_2[..]).chain(&to_read_3[..]);
257        let generic_reader = GenericSingleObjectReader::new(TestSingleObjectReader::get_schema())
258            .expect("Schema should resolve");
259        let val = generic_reader
260            .read_value(&mut to_read)
261            .expect("Should read");
262        let expected_value: Value = obj.into();
263        pretty_assertions::assert_eq!(expected_value, val);
264
265        Ok(())
266    }
267
268    #[test]
269    fn test_avro_3507_reader_parity() -> TestResult {
270        let obj = TestSingleObjectReader {
271            a: 42,
272            b: 3.33,
273            c: vec!["cat".into(), "dog".into()],
274        };
275
276        let mut to_read = Vec::<u8>::new();
277        to_read.extend_from_slice(&[0xC3, 0x01]);
278        to_read.extend_from_slice(
279            &TestSingleObjectReader::get_schema()
280                .fingerprint::<Rabin>()
281                .bytes[..],
282        );
283        encode(
284            &obj.clone().into(),
285            &TestSingleObjectReader::get_schema(),
286            &mut to_read,
287        )
288        .expect("Encode should succeed");
289        let generic_reader = GenericSingleObjectReader::new(TestSingleObjectReader::get_schema())
290            .expect("Schema should resolve");
291        let specific_reader = SpecificSingleObjectReader::<TestSingleObjectReader>::new()
292            .expect("schema should resolve");
293        let mut to_read1 = &to_read[..];
294        let mut to_read2 = &to_read[..];
295        let mut to_read3 = &to_read[..];
296
297        let val = generic_reader
298            .read_value(&mut to_read1)
299            .expect("Should read");
300        let read_obj1 = specific_reader
301            .read_from_value(&mut to_read2)
302            .expect("Should read from value");
303        let read_obj2 = specific_reader
304            .read(&mut to_read3)
305            .expect("Should read from deserilize");
306        let expected_value: Value = obj.clone().into();
307        pretty_assertions::assert_eq!(obj, read_obj1);
308        pretty_assertions::assert_eq!(obj, read_obj2);
309        pretty_assertions::assert_eq!(val, expected_value);
310
311        Ok(())
312    }
313
314    #[test]
315    fn avro_rs_164_generic_reader_alternate_header() -> TestResult {
316        let schema_uuid = Uuid::parse_str("b2f1cf00-0434-013e-439a-125eb8485a5f")?;
317        let header_builder = GlueSchemaUuidHeader::from_uuid(schema_uuid);
318        let generic_reader = GenericSingleObjectReader::new_with_header_builder(
319            TestSingleObjectReader::get_schema(),
320            header_builder,
321        )
322        .expect("failed to build reader");
323        let data_to_read: Vec<u8> = vec![
324            3, 0, 178, 241, 207, 0, 4, 52, 1, 62, 67, 154, 18, 94, 184, 72, 90, 95,
325        ];
326        let mut to_read = &data_to_read[..];
327        let read_result = generic_reader
328            .read_value(&mut to_read)
329            .map_err(Error::into_details);
330        matches!(read_result, Err(Details::ReadBytes(_)));
331        Ok(())
332    }
333}