Skip to main content

apache_avro/reader/
block.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::{
19    collections::HashMap,
20    io::{ErrorKind, Read},
21    str::FromStr,
22};
23
24use log::warn;
25use serde::de::DeserializeOwned;
26use serde_json::from_slice;
27
28use crate::{
29    AvroResult, Codec, Error,
30    decode::{decode, decode_internal},
31    error::Details,
32    schema::{Names, Schema, resolve_names, resolve_names_with_schemata},
33    serde::deser_schema::{Config, SchemaAwareDeserializer},
34    types::Value,
35    util,
36};
37
38/// Internal Block reader.
39#[derive(Debug, Clone)]
40pub(super) struct Block<'r, R> {
41    reader: R,
42    /// Internal buffering to reduce allocation.
43    buf: Vec<u8>,
44    buf_idx: usize,
45    /// Number of elements expected to exist within this block.
46    message_count: usize,
47    marker: [u8; 16],
48    codec: Codec,
49    pub(super) writer_schema: Schema,
50    schemata: Vec<&'r Schema>,
51    pub(super) user_metadata: HashMap<String, Vec<u8>>,
52    names_refs: Names,
53    human_readable: bool,
54}
55
56impl<'r, R: Read> Block<'r, R> {
57    pub(super) fn new(
58        reader: R,
59        schemata: Vec<&'r Schema>,
60        human_readable: bool,
61    ) -> AvroResult<Block<'r, R>> {
62        let mut block = Block {
63            reader,
64            codec: Codec::Null,
65            writer_schema: Schema::Null,
66            schemata,
67            buf: vec![],
68            buf_idx: 0,
69            message_count: 0,
70            marker: [0; 16],
71            user_metadata: Default::default(),
72            names_refs: Default::default(),
73            human_readable,
74        };
75
76        block.read_header()?;
77        Ok(block)
78    }
79
80    /// Try to read the header and to set the writer `Schema`, the `Codec` and the marker based on
81    /// its content.
82    fn read_header(&mut self) -> AvroResult<()> {
83        let mut buf = [0u8; 4];
84        self.reader
85            .read_exact(&mut buf)
86            .map_err(Details::ReadHeader)?;
87
88        if buf != [b'O', b'b', b'j', 1u8] {
89            return Err(Details::HeaderMagic.into());
90        }
91
92        let meta_schema = Schema::map(Schema::Bytes).build();
93        match decode(&meta_schema, &mut self.reader)? {
94            Value::Map(metadata) => {
95                self.read_writer_schema(&metadata)?;
96                self.codec = read_codec(&metadata)?;
97
98                for (key, value) in metadata {
99                    if key == "avro.schema"
100                        || key == "avro.codec"
101                        || key == "avro.codec.compression_level"
102                    {
103                        // already processed
104                    } else if key.starts_with("avro.") {
105                        warn!("Ignoring unknown metadata key: {key}");
106                    } else {
107                        self.read_user_metadata(key, value);
108                    }
109                }
110            }
111            _ => {
112                return Err(Details::GetHeaderMetadata.into());
113            }
114        }
115
116        self.reader
117            .read_exact(&mut self.marker)
118            .map_err(|e| Details::ReadMarker(e).into())
119    }
120
121    fn fill_buf(&mut self, n: usize) -> AvroResult<()> {
122        // The buffer needs to contain exactly `n` elements, otherwise codecs will potentially read
123        // invalid bytes.
124        //
125        // The are two cases to handle here:
126        //
127        // 1. `n > self.buf.len()`:
128        //    In this case we call `Vec::resize`, which guarantees that `self.buf.len() == n`.
129        // 2. `n < self.buf.len()`:
130        //    We need to resize to ensure that the buffer len is safe to read `n` elements.
131        //
132        // TODO: Figure out a way to avoid having to truncate for the second case.
133        self.buf.resize(util::safe_len(n)?, 0);
134        self.reader
135            .read_exact(&mut self.buf)
136            .map_err(Details::ReadIntoBuf)?;
137        self.buf_idx = 0;
138        Ok(())
139    }
140
141    /// Try to read a data block, also performing schema resolution for the objects contained in
142    /// the block. The objects are stored in an internal buffer to the `Reader`.
143    fn read_block_next(&mut self) -> AvroResult<()> {
144        assert!(self.is_empty(), "Expected self to be empty!");
145        match util::read_long(&mut self.reader).map_err(Error::into_details) {
146            Ok(block_len) => {
147                self.message_count = block_len as usize;
148                let block_bytes = util::read_long(&mut self.reader)?;
149                self.fill_buf(block_bytes as usize)?;
150                let mut marker = [0u8; 16];
151                self.reader
152                    .read_exact(&mut marker)
153                    .map_err(Details::ReadBlockMarker)?;
154
155                if marker != self.marker {
156                    return Err(Details::GetBlockMarker.into());
157                }
158
159                // NOTE (JAB): This doesn't fit this Reader pattern very well.
160                // `self.buf` is a growable buffer that is reused as the reader is iterated.
161                // For non `Codec::Null` variants, `decompress` will allocate a new `Vec`
162                // and replace `buf` with the new one, instead of reusing the same buffer.
163                // We can address this by using some "limited read" type to decode directly
164                // into the buffer. But this is fine, for now.
165                self.codec.decompress(&mut self.buf)
166            }
167            Err(Details::ReadVariableIntegerBytes(io_err)) => {
168                if let ErrorKind::UnexpectedEof = io_err.kind() {
169                    // to not return any error in case we only finished to read cleanly from the stream
170                    Ok(())
171                } else {
172                    Err(Details::ReadVariableIntegerBytes(io_err).into())
173                }
174            }
175            Err(e) => Err(Error::new(e)),
176        }
177    }
178
179    fn len(&self) -> usize {
180        self.message_count
181    }
182
183    fn is_empty(&self) -> bool {
184        self.len() == 0
185    }
186
187    pub(super) fn read_next(&mut self, read_schema: Option<&Schema>) -> AvroResult<Option<Value>> {
188        if self.is_empty() {
189            self.read_block_next()?;
190            if self.is_empty() {
191                return Ok(None);
192            }
193        }
194
195        let mut block_bytes = &self.buf[self.buf_idx..];
196        let b_original = block_bytes.len();
197
198        let item = decode_internal(
199            &self.writer_schema,
200            &self.names_refs,
201            None,
202            &mut block_bytes,
203        )?;
204        let item = match read_schema {
205            Some(schema) => item.resolve(schema)?,
206            None => item,
207        };
208
209        if b_original != 0 && b_original == block_bytes.len() {
210            // from_avro_datum did not consume any bytes, so return an error to avoid an infinite loop
211            return Err(Details::ReadBlock.into());
212        }
213        self.buf_idx += b_original - block_bytes.len();
214        self.message_count -= 1;
215        Ok(Some(item))
216    }
217
218    pub(super) fn read_next_deser<T: DeserializeOwned>(
219        &mut self,
220        reader_schema: Option<&Schema>,
221    ) -> AvroResult<Option<T>> {
222        if self.is_empty() {
223            self.read_block_next()?;
224            if self.is_empty() {
225                return Ok(None);
226            }
227        }
228
229        let mut block_bytes = &self.buf[self.buf_idx..];
230        let b_original = block_bytes.len();
231
232        let item = if reader_schema.is_some() {
233            // TODO: Implement SchemaAwareResolvingDeserializer
234            panic!("Schema aware deserialisation does not resolve schemas yet");
235        } else {
236            let config = Config {
237                names: &self.names_refs,
238                human_readable: self.human_readable,
239            };
240            T::deserialize(SchemaAwareDeserializer::new(
241                &mut block_bytes,
242                &self.writer_schema,
243                config,
244            )?)?
245        };
246
247        if b_original != 0 && b_original == block_bytes.len() {
248            // No bytes were read, return an error to avoid an infinite loop
249            return Err(Details::ReadBlock.into());
250        }
251        self.buf_idx += b_original - block_bytes.len();
252        self.message_count -= 1;
253        Ok(Some(item))
254    }
255
256    fn read_writer_schema(&mut self, metadata: &HashMap<String, Value>) -> AvroResult<()> {
257        let json: serde_json::Value = metadata
258            .get("avro.schema")
259            .and_then(|bytes| {
260                if let Value::Bytes(ref bytes) = *bytes {
261                    from_slice(bytes.as_ref()).ok()
262                } else {
263                    None
264                }
265            })
266            .ok_or(Details::GetAvroSchemaFromMap)?;
267        if !self.schemata.is_empty() {
268            let mut names = HashMap::new();
269            resolve_names_with_schemata(
270                self.schemata.iter().copied(),
271                &mut names,
272                None,
273                &HashMap::new(),
274            )?;
275            self.names_refs = names.into_iter().map(|(n, s)| (n, s.clone())).collect();
276            self.writer_schema = Schema::parse_with_names(&json, self.names_refs.clone())?;
277        } else {
278            self.writer_schema = Schema::parse(&json)?;
279            let mut names = HashMap::new();
280            resolve_names(&self.writer_schema, &mut names, None, &HashMap::new())?;
281            self.names_refs = names.into_iter().map(|(n, s)| (n, s.clone())).collect();
282        }
283        Ok(())
284    }
285
286    fn read_user_metadata(&mut self, key: String, value: Value) {
287        match value {
288            Value::Bytes(ref vec) => {
289                self.user_metadata.insert(key, vec.clone());
290            }
291            wrong => {
292                warn!("User metadata values must be Value::Bytes, found {wrong:?}");
293            }
294        }
295    }
296}
297
298fn read_codec(metadata: &HashMap<String, Value>) -> AvroResult<Codec> {
299    let result = metadata
300        .get("avro.codec")
301        .map(|codec| {
302            if let Value::Bytes(ref bytes) = *codec {
303                match std::str::from_utf8(bytes.as_ref()) {
304                    Ok(utf8) => Ok(utf8),
305                    Err(utf8_error) => Err(Details::ConvertToUtf8Error(utf8_error).into()),
306                }
307            } else {
308                Err(Details::BadCodecMetadata.into())
309            }
310        })
311        .map(|codec_res| match codec_res {
312            Ok(codec) => match Codec::from_str(codec) {
313                Ok(codec) => match codec {
314                    #[cfg(feature = "bzip")]
315                    Codec::Bzip2(_) => {
316                        use crate::Bzip2Settings;
317                        if let Some(Value::Bytes(bytes)) =
318                            metadata.get("avro.codec.compression_level")
319                        {
320                            Ok(Codec::Bzip2(Bzip2Settings::new(bytes[0])))
321                        } else {
322                            Ok(codec)
323                        }
324                    }
325                    #[cfg(feature = "xz")]
326                    Codec::Xz(_) => {
327                        use crate::XzSettings;
328                        if let Some(Value::Bytes(bytes)) =
329                            metadata.get("avro.codec.compression_level")
330                        {
331                            Ok(Codec::Xz(XzSettings::new(bytes[0])))
332                        } else {
333                            Ok(codec)
334                        }
335                    }
336                    #[cfg(feature = "zstandard")]
337                    Codec::Zstandard(_) => {
338                        use crate::ZstandardSettings;
339                        if let Some(Value::Bytes(bytes)) =
340                            metadata.get("avro.codec.compression_level")
341                        {
342                            Ok(Codec::Zstandard(ZstandardSettings::new(bytes[0])))
343                        } else {
344                            Ok(codec)
345                        }
346                    }
347                    _ => Ok(codec),
348                },
349                Err(_) => Err(Details::CodecNotSupported(codec.to_owned()).into()),
350            },
351            Err(err) => Err(err),
352        });
353
354    result.unwrap_or(Ok(Codec::Null))
355}