1use std::{
19 collections::HashMap,
20 io::{ErrorKind, Read},
21 str::FromStr,
22};
23
24use log::warn;
25use serde::de::DeserializeOwned;
26use serde_json::from_slice;
27
28use crate::{
29 AvroResult, Codec, Error,
30 decode::{decode, decode_internal},
31 error::Details,
32 schema::{Names, Schema, resolve_names, resolve_names_with_schemata},
33 serde::deser_schema::{Config, SchemaAwareDeserializer},
34 types::Value,
35 util,
36};
37
38#[derive(Debug, Clone)]
40pub(super) struct Block<'r, R> {
41 reader: R,
42 buf: Vec<u8>,
44 buf_idx: usize,
45 message_count: usize,
47 marker: [u8; 16],
48 codec: Codec,
49 pub(super) writer_schema: Schema,
50 schemata: Vec<&'r Schema>,
51 pub(super) user_metadata: HashMap<String, Vec<u8>>,
52 names_refs: Names,
53 human_readable: bool,
54}
55
56impl<'r, R: Read> Block<'r, R> {
57 pub(super) fn new(
58 reader: R,
59 schemata: Vec<&'r Schema>,
60 human_readable: bool,
61 ) -> AvroResult<Block<'r, R>> {
62 let mut block = Block {
63 reader,
64 codec: Codec::Null,
65 writer_schema: Schema::Null,
66 schemata,
67 buf: vec![],
68 buf_idx: 0,
69 message_count: 0,
70 marker: [0; 16],
71 user_metadata: Default::default(),
72 names_refs: Default::default(),
73 human_readable,
74 };
75
76 block.read_header()?;
77 Ok(block)
78 }
79
80 fn read_header(&mut self) -> AvroResult<()> {
83 let mut buf = [0u8; 4];
84 self.reader
85 .read_exact(&mut buf)
86 .map_err(Details::ReadHeader)?;
87
88 if buf != [b'O', b'b', b'j', 1u8] {
89 return Err(Details::HeaderMagic.into());
90 }
91
92 let meta_schema = Schema::map(Schema::Bytes).build();
93 match decode(&meta_schema, &mut self.reader)? {
94 Value::Map(metadata) => {
95 self.read_writer_schema(&metadata)?;
96 self.codec = read_codec(&metadata)?;
97
98 for (key, value) in metadata {
99 if key == "avro.schema"
100 || key == "avro.codec"
101 || key == "avro.codec.compression_level"
102 {
103 } else if key.starts_with("avro.") {
105 warn!("Ignoring unknown metadata key: {key}");
106 } else {
107 self.read_user_metadata(key, value);
108 }
109 }
110 }
111 _ => {
112 return Err(Details::GetHeaderMetadata.into());
113 }
114 }
115
116 self.reader
117 .read_exact(&mut self.marker)
118 .map_err(|e| Details::ReadMarker(e).into())
119 }
120
121 fn fill_buf(&mut self, n: usize) -> AvroResult<()> {
122 self.buf.resize(util::safe_len(n)?, 0);
134 self.reader
135 .read_exact(&mut self.buf)
136 .map_err(Details::ReadIntoBuf)?;
137 self.buf_idx = 0;
138 Ok(())
139 }
140
141 fn read_block_next(&mut self) -> AvroResult<()> {
144 assert!(self.is_empty(), "Expected self to be empty!");
145 match util::read_long(&mut self.reader).map_err(Error::into_details) {
146 Ok(block_len) => {
147 self.message_count = block_len as usize;
148 let block_bytes = util::read_long(&mut self.reader)?;
149 self.fill_buf(block_bytes as usize)?;
150 let mut marker = [0u8; 16];
151 self.reader
152 .read_exact(&mut marker)
153 .map_err(Details::ReadBlockMarker)?;
154
155 if marker != self.marker {
156 return Err(Details::GetBlockMarker.into());
157 }
158
159 self.codec.decompress(&mut self.buf)
166 }
167 Err(Details::ReadVariableIntegerBytes(io_err)) => {
168 if let ErrorKind::UnexpectedEof = io_err.kind() {
169 Ok(())
171 } else {
172 Err(Details::ReadVariableIntegerBytes(io_err).into())
173 }
174 }
175 Err(e) => Err(Error::new(e)),
176 }
177 }
178
179 fn len(&self) -> usize {
180 self.message_count
181 }
182
183 fn is_empty(&self) -> bool {
184 self.len() == 0
185 }
186
187 pub(super) fn read_next(&mut self, read_schema: Option<&Schema>) -> AvroResult<Option<Value>> {
188 if self.is_empty() {
189 self.read_block_next()?;
190 if self.is_empty() {
191 return Ok(None);
192 }
193 }
194
195 let mut block_bytes = &self.buf[self.buf_idx..];
196 let b_original = block_bytes.len();
197
198 let item = decode_internal(
199 &self.writer_schema,
200 &self.names_refs,
201 None,
202 &mut block_bytes,
203 )?;
204 let item = match read_schema {
205 Some(schema) => item.resolve(schema)?,
206 None => item,
207 };
208
209 if b_original != 0 && b_original == block_bytes.len() {
210 return Err(Details::ReadBlock.into());
212 }
213 self.buf_idx += b_original - block_bytes.len();
214 self.message_count -= 1;
215 Ok(Some(item))
216 }
217
218 pub(super) fn read_next_deser<T: DeserializeOwned>(
219 &mut self,
220 reader_schema: Option<&Schema>,
221 ) -> AvroResult<Option<T>> {
222 if self.is_empty() {
223 self.read_block_next()?;
224 if self.is_empty() {
225 return Ok(None);
226 }
227 }
228
229 let mut block_bytes = &self.buf[self.buf_idx..];
230 let b_original = block_bytes.len();
231
232 let item = if reader_schema.is_some() {
233 panic!("Schema aware deserialisation does not resolve schemas yet");
235 } else {
236 let config = Config {
237 names: &self.names_refs,
238 human_readable: self.human_readable,
239 };
240 T::deserialize(SchemaAwareDeserializer::new(
241 &mut block_bytes,
242 &self.writer_schema,
243 config,
244 )?)?
245 };
246
247 if b_original != 0 && b_original == block_bytes.len() {
248 return Err(Details::ReadBlock.into());
250 }
251 self.buf_idx += b_original - block_bytes.len();
252 self.message_count -= 1;
253 Ok(Some(item))
254 }
255
256 fn read_writer_schema(&mut self, metadata: &HashMap<String, Value>) -> AvroResult<()> {
257 let json: serde_json::Value = metadata
258 .get("avro.schema")
259 .and_then(|bytes| {
260 if let Value::Bytes(ref bytes) = *bytes {
261 from_slice(bytes.as_ref()).ok()
262 } else {
263 None
264 }
265 })
266 .ok_or(Details::GetAvroSchemaFromMap)?;
267 if !self.schemata.is_empty() {
268 let mut names = HashMap::new();
269 resolve_names_with_schemata(
270 self.schemata.iter().copied(),
271 &mut names,
272 None,
273 &HashMap::new(),
274 )?;
275 self.names_refs = names.into_iter().map(|(n, s)| (n, s.clone())).collect();
276 self.writer_schema = Schema::parse_with_names(&json, self.names_refs.clone())?;
277 } else {
278 self.writer_schema = Schema::parse(&json)?;
279 let mut names = HashMap::new();
280 resolve_names(&self.writer_schema, &mut names, None, &HashMap::new())?;
281 self.names_refs = names.into_iter().map(|(n, s)| (n, s.clone())).collect();
282 }
283 Ok(())
284 }
285
286 fn read_user_metadata(&mut self, key: String, value: Value) {
287 match value {
288 Value::Bytes(ref vec) => {
289 self.user_metadata.insert(key, vec.clone());
290 }
291 wrong => {
292 warn!("User metadata values must be Value::Bytes, found {wrong:?}");
293 }
294 }
295 }
296}
297
298fn read_codec(metadata: &HashMap<String, Value>) -> AvroResult<Codec> {
299 let result = metadata
300 .get("avro.codec")
301 .map(|codec| {
302 if let Value::Bytes(ref bytes) = *codec {
303 match std::str::from_utf8(bytes.as_ref()) {
304 Ok(utf8) => Ok(utf8),
305 Err(utf8_error) => Err(Details::ConvertToUtf8Error(utf8_error).into()),
306 }
307 } else {
308 Err(Details::BadCodecMetadata.into())
309 }
310 })
311 .map(|codec_res| match codec_res {
312 Ok(codec) => match Codec::from_str(codec) {
313 Ok(codec) => match codec {
314 #[cfg(feature = "bzip")]
315 Codec::Bzip2(_) => {
316 use crate::Bzip2Settings;
317 if let Some(Value::Bytes(bytes)) =
318 metadata.get("avro.codec.compression_level")
319 {
320 Ok(Codec::Bzip2(Bzip2Settings::new(bytes[0])))
321 } else {
322 Ok(codec)
323 }
324 }
325 #[cfg(feature = "xz")]
326 Codec::Xz(_) => {
327 use crate::XzSettings;
328 if let Some(Value::Bytes(bytes)) =
329 metadata.get("avro.codec.compression_level")
330 {
331 Ok(Codec::Xz(XzSettings::new(bytes[0])))
332 } else {
333 Ok(codec)
334 }
335 }
336 #[cfg(feature = "zstandard")]
337 Codec::Zstandard(_) => {
338 use crate::ZstandardSettings;
339 if let Some(Value::Bytes(bytes)) =
340 metadata.get("avro.codec.compression_level")
341 {
342 Ok(Codec::Zstandard(ZstandardSettings::new(bytes[0])))
343 } else {
344 Ok(codec)
345 }
346 }
347 _ => Ok(codec),
348 },
349 Err(_) => Err(Details::CodecNotSupported(codec.to_owned()).into()),
350 },
351 Err(err) => Err(err),
352 });
353
354 result.unwrap_or(Ok(Codec::Null))
355}