Skip to main content

apache_avro/reader/
single_object.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::{io::Read, marker::PhantomData};
19
20use bon::bon;
21use serde::de::DeserializeOwned;
22
23use crate::{
24    AvroResult, AvroSchema, Schema,
25    decode::decode_internal,
26    error::Details,
27    headers::{HeaderBuilder, RabinFingerprintHeader},
28    schema::ResolvedOwnedSchema,
29    serde::deser_schema::{Config, SchemaAwareDeserializer},
30    types::Value,
31    util::is_human_readable,
32};
33
34pub struct GenericSingleObjectReader {
35    write_schema: ResolvedOwnedSchema,
36    expected_header: Vec<u8>,
37    human_readable: bool,
38}
39
40#[bon]
41impl GenericSingleObjectReader {
42    #[builder]
43    pub fn new(
44        schema: Schema,
45        /// The expected header.
46        #[builder(default = RabinFingerprintHeader::from_schema(&schema).build_header())]
47        header: Vec<u8>,
48        /// Was the data serialized with `human_readable`.
49        #[builder(default = is_human_readable())]
50        human_readable: bool,
51    ) -> AvroResult<GenericSingleObjectReader> {
52        Ok(Self {
53            write_schema: schema.try_into()?,
54            expected_header: header,
55            human_readable,
56        })
57    }
58}
59
60impl GenericSingleObjectReader {
61    pub fn read_value<R: Read>(&self, reader: &mut R) -> AvroResult<Value> {
62        self.read_header(reader)?;
63        decode_internal(
64            self.write_schema.get_root_schema(),
65            self.write_schema.get_names(),
66            None,
67            reader,
68        )
69    }
70
71    pub fn read_deser<T: DeserializeOwned>(&self, reader: &mut impl Read) -> AvroResult<T> {
72        self.read_header(reader)?;
73        let config = Config {
74            names: self.write_schema.get_names(),
75            human_readable: self.human_readable,
76        };
77        T::deserialize(SchemaAwareDeserializer::new(
78            reader,
79            self.write_schema.get_root_schema(),
80            config,
81        )?)
82    }
83
84    fn read_header(&self, reader: &mut impl Read) -> AvroResult<()> {
85        let mut header = vec![0; self.expected_header.len()];
86        reader
87            .read_exact(&mut header)
88            .map_err(Details::ReadHeader)?;
89        if self.expected_header == header {
90            Ok(())
91        } else {
92            Err(Details::SingleObjectHeaderMismatch(self.expected_header.clone(), header).into())
93        }
94    }
95}
96
97pub struct SpecificSingleObjectReader<T>
98where
99    T: AvroSchema,
100{
101    inner: GenericSingleObjectReader,
102    _model: PhantomData<T>,
103}
104
105impl<T> SpecificSingleObjectReader<T>
106where
107    T: AvroSchema,
108{
109    pub fn new() -> AvroResult<SpecificSingleObjectReader<T>> {
110        Ok(SpecificSingleObjectReader {
111            inner: GenericSingleObjectReader::builder()
112                .schema(T::get_schema())
113                .build()?,
114            _model: PhantomData,
115        })
116    }
117}
118
119impl<T> SpecificSingleObjectReader<T>
120where
121    T: AvroSchema + From<Value>,
122{
123    pub fn read_from_value<R: Read>(&self, reader: &mut R) -> AvroResult<T> {
124        self.inner.read_value(reader).map(|v| v.into())
125    }
126}
127
128impl<T> SpecificSingleObjectReader<T>
129where
130    T: AvroSchema + DeserializeOwned,
131{
132    pub fn read<R: Read>(&self, reader: &mut R) -> AvroResult<T> {
133        self.inner.read_deser(reader)
134    }
135}
136
137#[cfg(test)]
138mod tests {
139    use apache_avro_test_helper::TestResult;
140    use serde::Deserialize;
141    use uuid::Uuid;
142
143    use super::*;
144    use crate::{AvroSchema, Schema, encode::encode, headers::GlueSchemaUuidHeader, rabin::Rabin};
145
146    #[derive(Deserialize, Clone, PartialEq, Debug)]
147    struct TestSingleObjectReader {
148        a: i64,
149        b: f64,
150        c: Vec<String>,
151    }
152
153    impl AvroSchema for TestSingleObjectReader {
154        fn get_schema() -> Schema {
155            let schema = r#"
156            {
157                "type":"record",
158                "name":"TestSingleObjectReader",
159                "fields":[
160                    {
161                        "name":"a",
162                        "type":"long"
163                    },
164                    {
165                        "name":"b",
166                        "type":"double"
167                    },
168                    {
169                        "name":"c",
170                        "type":{
171                            "type":"array",
172                            "items":"string"
173                        }
174                    }
175                ]
176            }
177            "#;
178            Schema::parse_str(schema).unwrap()
179        }
180    }
181
182    impl From<Value> for TestSingleObjectReader {
183        fn from(obj: Value) -> TestSingleObjectReader {
184            if let Value::Record(fields) = obj {
185                let mut a = None;
186                let mut b = None;
187                let mut c = vec![];
188                for (field_name, v) in fields {
189                    match (field_name.as_str(), v) {
190                        ("a", Value::Long(i)) => a = Some(i),
191                        ("b", Value::Double(d)) => b = Some(d),
192                        ("c", Value::Array(v)) => {
193                            for inner_val in v {
194                                if let Value::String(s) = inner_val {
195                                    c.push(s);
196                                }
197                            }
198                        }
199                        (key, value) => panic!("Unexpected pair: {key:?} -> {value:?}"),
200                    }
201                }
202                TestSingleObjectReader {
203                    a: a.unwrap(),
204                    b: b.unwrap(),
205                    c,
206                }
207            } else {
208                panic!("Expected a Value::Record but was {obj:?}")
209            }
210        }
211    }
212
213    impl From<TestSingleObjectReader> for Value {
214        fn from(obj: TestSingleObjectReader) -> Value {
215            Value::Record(vec![
216                ("a".into(), obj.a.into()),
217                ("b".into(), obj.b.into()),
218                (
219                    "c".into(),
220                    Value::Array(obj.c.into_iter().map(|s| s.into()).collect()),
221                ),
222            ])
223        }
224    }
225
226    #[test]
227    fn test_avro_3507_single_object_reader() -> TestResult {
228        let obj = TestSingleObjectReader {
229            a: 42,
230            b: 3.33,
231            c: vec!["cat".into(), "dog".into()],
232        };
233        let mut to_read = Vec::<u8>::new();
234        to_read.extend_from_slice(&[0xC3, 0x01]);
235        to_read.extend_from_slice(
236            &TestSingleObjectReader::get_schema()
237                .fingerprint::<Rabin>()
238                .bytes[..],
239        );
240        encode(
241            &obj.clone().into(),
242            &TestSingleObjectReader::get_schema(),
243            &mut to_read,
244        )
245        .expect("Encode should succeed");
246        let mut to_read = &to_read[..];
247        let generic_reader = GenericSingleObjectReader::builder()
248            .schema(TestSingleObjectReader::get_schema())
249            .build()
250            .expect("Schema should resolve");
251        let val = generic_reader
252            .read_value(&mut to_read)
253            .expect("Should read");
254        let expected_value: Value = obj.into();
255        pretty_assertions::assert_eq!(expected_value, val);
256
257        Ok(())
258    }
259
260    #[test]
261    fn avro_3642_test_single_object_reader_incomplete_reads() -> TestResult {
262        let obj = TestSingleObjectReader {
263            a: 42,
264            b: 3.33,
265            c: vec!["cat".into(), "dog".into()],
266        };
267        // The two-byte marker, to show that the message uses this single-record format
268        let to_read_1 = [0xC3, 0x01];
269        let mut to_read_2 = Vec::<u8>::new();
270        to_read_2.extend_from_slice(
271            &TestSingleObjectReader::get_schema()
272                .fingerprint::<Rabin>()
273                .bytes[..],
274        );
275        let mut to_read_3 = Vec::<u8>::new();
276        encode(
277            &obj.clone().into(),
278            &TestSingleObjectReader::get_schema(),
279            &mut to_read_3,
280        )
281        .expect("Encode should succeed");
282        let mut to_read = (&to_read_1[..]).chain(&to_read_2[..]).chain(&to_read_3[..]);
283        let generic_reader = GenericSingleObjectReader::builder()
284            .schema(TestSingleObjectReader::get_schema())
285            .build()
286            .expect("Schema should resolve");
287        let val = generic_reader
288            .read_value(&mut to_read)
289            .expect("Should read");
290        let expected_value: Value = obj.into();
291        pretty_assertions::assert_eq!(expected_value, val);
292
293        Ok(())
294    }
295
296    #[test]
297    fn test_avro_3507_reader_parity() -> TestResult {
298        let obj = TestSingleObjectReader {
299            a: 42,
300            b: 3.33,
301            c: vec!["cat".into(), "dog".into()],
302        };
303
304        let mut to_read = Vec::<u8>::new();
305        to_read.extend_from_slice(&[0xC3, 0x01]);
306        to_read.extend_from_slice(
307            &TestSingleObjectReader::get_schema()
308                .fingerprint::<Rabin>()
309                .bytes[..],
310        );
311        encode(
312            &obj.clone().into(),
313            &TestSingleObjectReader::get_schema(),
314            &mut to_read,
315        )
316        .expect("Encode should succeed");
317        let generic_reader = GenericSingleObjectReader::builder()
318            .schema(TestSingleObjectReader::get_schema())
319            .build()
320            .expect("Schema should resolve");
321        let specific_reader = SpecificSingleObjectReader::<TestSingleObjectReader>::new()
322            .expect("schema should resolve");
323        let mut to_read1 = &to_read[..];
324        let mut to_read2 = &to_read[..];
325        let mut to_read3 = &to_read[..];
326
327        let val = generic_reader
328            .read_value(&mut to_read1)
329            .expect("Should read");
330        let read_obj1 = specific_reader
331            .read_from_value(&mut to_read2)
332            .expect("Should read from value");
333        let read_obj2 = specific_reader
334            .read(&mut to_read3)
335            .expect("Should read from deserilize");
336        let expected_value: Value = obj.clone().into();
337        pretty_assertions::assert_eq!(obj, read_obj1);
338        pretty_assertions::assert_eq!(obj, read_obj2);
339        pretty_assertions::assert_eq!(val, expected_value);
340
341        Ok(())
342    }
343
344    #[test]
345    fn avro_rs_164_generic_reader_alternate_header() -> TestResult {
346        let schema_uuid = Uuid::parse_str("b2f1cf00-0434-013e-439a-125eb8485a5f")?;
347        let header_builder = GlueSchemaUuidHeader::from_uuid(schema_uuid);
348        let generic_reader = GenericSingleObjectReader::builder()
349            .schema(TestSingleObjectReader::get_schema())
350            .header(header_builder.build_header())
351            .build()
352            .expect("failed to build reader");
353        // First 18 bytes are the header, then it's a varint 0, double 0 and an empty list (0)
354        let data_to_read: Vec<u8> = vec![
355            3, 0, 178, 241, 207, 0, 4, 52, 1, 62, 67, 154, 18, 94, 184, 72, 90, 95, 0, 0, 0, 0, 0,
356            0, 0, 0, 0, 0,
357        ];
358        let mut to_read = &data_to_read[..];
359        let read_result = generic_reader.read_value(&mut to_read)?;
360        assert_eq!(
361            read_result,
362            Value::Record(vec![
363                ("a".to_string(), Value::Long(0)),
364                ("b".to_string(), Value::Double(0.0)),
365                ("c".to_string(), Value::Array(vec![]))
366            ])
367        );
368        Ok(())
369    }
370}