Skip to main content

apache_avro/writer/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Logic handling writing in Avro format at user level.
19use crate::{
20    AvroResult, Codec, Error,
21    encode::{encode, encode_internal, encode_to_vec},
22    error::Details,
23    schema::{ResolvedSchema, Schema},
24    serde::ser_schema::{Config, SchemaAwareSerializer},
25    types::Value,
26    util::is_human_readable,
27};
28use serde::Serialize;
29use std::{collections::HashMap, io::Write, mem::ManuallyDrop};
30
31pub mod datum;
32pub mod single_object;
33
34const DEFAULT_BLOCK_SIZE: usize = 16000;
35const AVRO_OBJECT_HEADER: &[u8] = b"Obj\x01";
36
37/// Main interface for writing Avro formatted values.
38///
39/// It is critical to call flush before `Writer<W>` is dropped. Though dropping will attempt to flush
40/// the contents of the buffer, any errors that happen in the process of dropping will be ignored.
41/// Calling flush ensures that the buffer is empty and thus dropping will not even attempt file operations.
42pub struct Writer<'a, W: Write> {
43    schema: &'a Schema,
44    writer: W,
45    resolved_schema: ResolvedSchema<'a>,
46    codec: Codec,
47    block_size: usize,
48    buffer: Vec<u8>,
49    num_values: usize,
50    marker: [u8; 16],
51    has_header: bool,
52    user_metadata: HashMap<String, Value>,
53    human_readable: bool,
54    map_array_target_block_size: Option<usize>,
55}
56
57#[bon::bon]
58impl<'a, W: Write> Writer<'a, W> {
59    #[builder]
60    pub fn builder(
61        schema: &'a Schema,
62        schemata: Option<Vec<&'a Schema>>,
63        writer: W,
64        #[builder(default = Codec::Null)] codec: Codec,
65        #[builder(default = DEFAULT_BLOCK_SIZE)] block_size: usize,
66        #[builder(default = generate_sync_marker())] marker: [u8; 16],
67        /// Has the header already been written.
68        ///
69        /// To disable writing the header, this can be set to `true`.
70        #[builder(default = false)]
71        has_header: bool,
72        #[builder(default)] user_metadata: HashMap<String, Value>,
73        /// Should [`Serialize`] implementations pick a human readable representation.
74        ///
75        /// It is recommended to set this to `false`.
76        #[builder(default = is_human_readable())]
77        human_readable: bool,
78        /// At what block size to start a new block (for arrays and maps).
79        ///
80        /// This is a minimum value, the block size will always be larger than this except for the last
81        /// block.
82        ///
83        /// When set to `None` all values will be written in a single block. This can be faster as no
84        /// intermediate buffer is used, but seeking through written data will be slower.
85        map_array_target_block_size: Option<usize>,
86    ) -> AvroResult<Self> {
87        let resolved_schema = if let Some(schemata) = schemata {
88            ResolvedSchema::try_from(schemata)?
89        } else {
90            ResolvedSchema::try_from(schema)?
91        };
92        Ok(Self {
93            schema,
94            writer,
95            resolved_schema,
96            codec,
97            block_size,
98            buffer: Vec::with_capacity(block_size),
99            num_values: 0,
100            marker,
101            has_header,
102            user_metadata,
103            human_readable,
104            map_array_target_block_size,
105        })
106    }
107}
108
109impl<'a, W: Write> Writer<'a, W> {
110    /// Creates a `Writer` given a `Schema` and something implementing the `io::Write` trait to write
111    /// to.
112    /// No compression `Codec` will be used.
113    pub fn new(schema: &'a Schema, writer: W) -> AvroResult<Self> {
114        Writer::with_codec(schema, writer, Codec::Null)
115    }
116
117    /// Creates a `Writer` with a specific `Codec` given a `Schema` and something implementing the
118    /// `io::Write` trait to write to.
119    pub fn with_codec(schema: &'a Schema, writer: W, codec: Codec) -> AvroResult<Self> {
120        Self::builder()
121            .schema(schema)
122            .writer(writer)
123            .codec(codec)
124            .build()
125    }
126
127    /// Creates a `Writer` with a specific `Codec` given a `Schema` and something implementing the
128    /// `io::Write` trait to write to.
129    /// If the `schema` is incomplete, i.e. contains `Schema::Ref`s then all dependencies must
130    /// be provided in `schemata`.
131    pub fn with_schemata(
132        schema: &'a Schema,
133        schemata: Vec<&'a Schema>,
134        writer: W,
135        codec: Codec,
136    ) -> AvroResult<Self> {
137        Self::builder()
138            .schema(schema)
139            .schemata(schemata)
140            .writer(writer)
141            .codec(codec)
142            .build()
143    }
144
145    /// Creates a `Writer` that will append values to already populated
146    /// `std::io::Write` using the provided `marker`
147    /// No compression `Codec` will be used.
148    pub fn append_to(schema: &'a Schema, writer: W, marker: [u8; 16]) -> AvroResult<Self> {
149        Writer::append_to_with_codec(schema, writer, Codec::Null, marker)
150    }
151
152    /// Creates a `Writer` that will append values to already populated
153    /// `std::io::Write` using the provided `marker`
154    pub fn append_to_with_codec(
155        schema: &'a Schema,
156        writer: W,
157        codec: Codec,
158        marker: [u8; 16],
159    ) -> AvroResult<Self> {
160        Self::builder()
161            .schema(schema)
162            .writer(writer)
163            .codec(codec)
164            .marker(marker)
165            .has_header(true)
166            .build()
167    }
168
169    /// Creates a `Writer` that will append values to already populated
170    /// `std::io::Write` using the provided `marker`
171    pub fn append_to_with_codec_schemata(
172        schema: &'a Schema,
173        schemata: Vec<&'a Schema>,
174        writer: W,
175        codec: Codec,
176        marker: [u8; 16],
177    ) -> AvroResult<Self> {
178        Self::builder()
179            .schema(schema)
180            .schemata(schemata)
181            .writer(writer)
182            .codec(codec)
183            .marker(marker)
184            .has_header(true)
185            .build()
186    }
187
188    /// Get a reference to the `Schema` associated to a `Writer`.
189    pub fn schema(&self) -> &'a Schema {
190        self.schema
191    }
192
193    /// Deprecated. Use [`Writer::append_value`] instead.
194    #[deprecated(since = "0.22.0", note = "Use `Writer::append_value` instead")]
195    pub fn append<T: Into<Value>>(&mut self, value: T) -> AvroResult<usize> {
196        self.append_value(value)
197    }
198
199    /// Append a value to the `Writer`, also performs schema validation.
200    ///
201    /// Returns the number of bytes written (it might be 0, see below).
202    ///
203    /// **NOTE**: This function is not guaranteed to perform any actual write, since it relies on
204    /// internal buffering for performance reasons. If you want to be sure the value has been
205    /// written, then call [`flush`](Writer::flush).
206    pub fn append_value<T: Into<Value>>(&mut self, value: T) -> AvroResult<usize> {
207        let avro = value.into();
208        self.append_value_ref(&avro)
209    }
210
211    /// Append a compatible value to a `Writer`, also performs schema validation.
212    ///
213    /// Returns the number of bytes written (it might be 0, see below).
214    ///
215    /// **NOTE**: This function is not guaranteed to perform any actual write, since it relies on
216    /// internal buffering for performance reasons. If you want to be sure the value has been
217    /// written, then call [`flush`](Writer::flush).
218    pub fn append_value_ref(&mut self, value: &Value) -> AvroResult<usize> {
219        if let Some(reason) = value.validate_internal(
220            self.schema,
221            self.resolved_schema.get_names(),
222            self.schema.namespace(),
223        ) {
224            return Err(Details::ValidationWithReason {
225                value: value.clone(),
226                schema: self.schema.clone(),
227                reason,
228            }
229            .into());
230        }
231        self.unvalidated_append_value_ref(value)
232    }
233
234    /// Append a compatible value to a `Writer`.
235    ///
236    /// This function does **not** validate that the provided value matches the schema. If it does
237    /// not match, the file will contain corrupt data. Use [`Writer::append_value`] to have the
238    /// value validated during write or use [`Value::validate`] to validate the value.
239    ///
240    /// Returns the number of bytes written (it might be 0, see below).
241    ///
242    /// **NOTE**: This function is not guaranteed to perform any actual write, since it relies on
243    /// internal buffering for performance reasons. If you want to be sure the value has been
244    /// written, then call [`flush`](Writer::flush).
245    pub fn unvalidated_append_value<T: Into<Value>>(&mut self, value: T) -> AvroResult<usize> {
246        let value = value.into();
247        self.unvalidated_append_value_ref(&value)
248    }
249
250    /// Append a compatible value to a `Writer`.
251    ///
252    /// This function does **not** validate that the provided value matches the schema. If it does
253    /// not match, the file will contain corrupt data. Use [`Writer::append_value_ref`] to have the
254    /// value validated during write or use [`Value::validate`] to validate the value.
255    ///
256    /// Returns the number of bytes written (it might be 0, see below).
257    ///
258    /// **NOTE**: This function is not guaranteed to perform any actual write, since it relies on
259    /// internal buffering for performance reasons. If you want to be sure the value has been
260    /// written, then call [`flush`](Writer::flush).
261    pub fn unvalidated_append_value_ref(&mut self, value: &Value) -> AvroResult<usize> {
262        let n = self.maybe_write_header()?;
263        encode_internal(
264            value,
265            self.schema,
266            self.resolved_schema.get_names(),
267            self.schema.namespace(),
268            &mut self.buffer,
269        )?;
270
271        self.num_values += 1;
272
273        if self.buffer.len() >= self.block_size {
274            return self.flush().map(|b| b + n);
275        }
276
277        Ok(n)
278    }
279
280    /// Append anything implementing the `Serialize` trait to a `Writer` for
281    /// [`serde`](https://docs.serde.rs/serde/index.html) compatibility, also performing schema
282    /// validation.
283    ///
284    /// Returns the number of bytes written.
285    ///
286    /// **NOTE**: This function is not guaranteed to perform any actual write, since it relies on
287    /// internal buffering for performance reasons. If you want to be sure the value has been
288    /// written, then call [`flush`](Writer::flush).
289    pub fn append_ser<S: Serialize>(&mut self, value: S) -> AvroResult<usize> {
290        let n = self.maybe_write_header()?;
291
292        let config = Config {
293            names: self.resolved_schema.get_names(),
294            target_block_size: self.map_array_target_block_size,
295            human_readable: self.human_readable,
296        };
297
298        value.serialize(SchemaAwareSerializer::new(
299            &mut self.buffer,
300            self.schema,
301            config,
302        )?)?;
303        self.num_values += 1;
304
305        if self.buffer.len() >= self.block_size {
306            return self.flush().map(|b| b + n);
307        }
308
309        Ok(n)
310    }
311
312    /// Extend a `Writer` with an `Iterator` of values, also performs schema validation.
313    ///
314    /// Returns the number of bytes written.
315    ///
316    /// **NOTE**: This function forces the written data to be flushed (an implicit
317    /// call to [`flush`](Writer::flush) is performed).
318    pub fn extend<I, T: Into<Value>>(&mut self, values: I) -> AvroResult<usize>
319    where
320        I: IntoIterator<Item = T>,
321    {
322        /*
323        https://github.com/rust-lang/rfcs/issues/811 :(
324        let mut stream = values
325            .filter_map(|value| value.serialize(&mut self.serializer).ok())
326            .map(|value| value.encode(self.schema))
327            .collect::<Option<Vec<_>>>()
328            .ok_or_else(|| err_msg("value does not match given schema"))?
329            .into_iter()
330            .fold(Vec::new(), |mut acc, stream| {
331                num_values += 1;
332                acc.extend(stream); acc
333            });
334        */
335
336        let mut num_bytes = 0;
337        for value in values {
338            num_bytes += self.append_value(value)?;
339        }
340        num_bytes += self.flush()?;
341
342        Ok(num_bytes)
343    }
344
345    /// Extend a `Writer` with an `Iterator` of anything implementing the `Serialize` trait for
346    /// [`serde`](https://docs.serde.rs/serde/index.html) compatibility, also performing schema
347    /// validation.
348    ///
349    /// Returns the number of bytes written.
350    ///
351    /// **NOTE**: This function forces the written data to be flushed (an implicit
352    /// call to [`flush`](Writer::flush) is performed).
353    pub fn extend_ser<I, T: Serialize>(&mut self, values: I) -> AvroResult<usize>
354    where
355        I: IntoIterator<Item = T>,
356    {
357        /*
358        https://github.com/rust-lang/rfcs/issues/811 :(
359        let mut stream = values
360            .filter_map(|value| value.serialize(&mut self.serializer).ok())
361            .map(|value| value.encode(self.schema))
362            .collect::<Option<Vec<_>>>()
363            .ok_or_else(|| err_msg("value does not match given schema"))?
364            .into_iter()
365            .fold(Vec::new(), |mut acc, stream| {
366                num_values += 1;
367                acc.extend(stream); acc
368            });
369        */
370
371        let mut num_bytes = 0;
372        for value in values {
373            num_bytes += self.append_ser(value)?;
374        }
375        num_bytes += self.flush()?;
376
377        Ok(num_bytes)
378    }
379
380    /// Extend a `Writer` by appending each `Value` from a slice, while also performing schema
381    /// validation on each value appended.
382    ///
383    /// Returns the number of bytes written.
384    ///
385    /// **NOTE**: This function forces the written data to be flushed (an implicit
386    /// call to [`flush`](Writer::flush) is performed).
387    pub fn extend_from_slice(&mut self, values: &[Value]) -> AvroResult<usize> {
388        let mut num_bytes = 0;
389        for value in values {
390            num_bytes += self.append_value_ref(value)?;
391        }
392        num_bytes += self.flush()?;
393
394        Ok(num_bytes)
395    }
396
397    /// Flush the content to the inner `Writer`.
398    ///
399    /// Call this function to make sure all the content has been written before releasing the `Writer`.
400    /// This will also write the header if it wasn't written yet and hasn't been disabled using
401    /// [`WriterBuilder::has_header`].
402    ///
403    /// Returns the number of bytes written.
404    pub fn flush(&mut self) -> AvroResult<usize> {
405        let mut num_bytes = self.maybe_write_header()?;
406        if self.num_values == 0 {
407            return Ok(num_bytes);
408        }
409
410        self.codec.compress(&mut self.buffer)?;
411
412        let num_values = self.num_values;
413        let stream_len = self.buffer.len();
414
415        num_bytes += self.append_raw(&num_values.try_into()?, &Schema::Long)?
416            + self.append_raw(&stream_len.try_into()?, &Schema::Long)?
417            + self
418                .writer
419                .write(self.buffer.as_ref())
420                .map_err(Details::WriteBytes)?
421            + self.append_marker()?;
422
423        self.buffer.clear();
424        self.num_values = 0;
425
426        self.writer.flush().map_err(Details::FlushWriter)?;
427
428        Ok(num_bytes)
429    }
430
431    /// Return what the `Writer` is writing to, consuming the `Writer` itself.
432    ///
433    /// **NOTE**: This function forces the written data to be flushed (an implicit
434    /// call to [`flush`](Writer::flush) is performed).
435    pub fn into_inner(mut self) -> AvroResult<W> {
436        self.maybe_write_header()?;
437        self.flush()?;
438
439        let mut this = ManuallyDrop::new(self);
440
441        // Extract every member that is not Copy and therefore should be dropped
442        let _buffer = std::mem::take(&mut this.buffer);
443        let _user_metadata = std::mem::take(&mut this.user_metadata);
444        // SAFETY: resolved schema is not accessed after this and won't be dropped because of ManuallyDrop
445        unsafe { std::ptr::drop_in_place(&mut this.resolved_schema) };
446
447        // SAFETY: double-drops are prevented by putting `this` in a ManuallyDrop that is never dropped
448        let writer = unsafe { std::ptr::read(&this.writer) };
449
450        Ok(writer)
451    }
452
453    /// Gets a reference to the underlying writer.
454    ///
455    /// **NOTE**: There is likely data still in the buffer. To have all the data
456    /// in the writer call [`flush`](Writer::flush) first.
457    pub fn get_ref(&self) -> &W {
458        &self.writer
459    }
460
461    /// Gets a mutable reference to the underlying writer.
462    ///
463    /// It is inadvisable to directly write to the underlying writer.
464    ///
465    /// **NOTE**: There is likely data still in the buffer. To have all the data
466    /// in the writer call [`flush`](Writer::flush) first.
467    pub fn get_mut(&mut self) -> &mut W {
468        &mut self.writer
469    }
470
471    /// Generate and append synchronization marker to the payload.
472    fn append_marker(&mut self) -> AvroResult<usize> {
473        // using .writer.write directly to avoid mutable borrow of self
474        // with ref borrowing of self.marker
475        self.writer
476            .write(&self.marker)
477            .map_err(|e| Details::WriteMarker(e).into())
478    }
479
480    /// Append a raw Avro Value to the payload avoiding to encode it again.
481    fn append_raw(&mut self, value: &Value, schema: &Schema) -> AvroResult<usize> {
482        self.append_bytes(encode_to_vec(value, schema)?.as_ref())
483    }
484
485    /// Append pure bytes to the payload.
486    fn append_bytes(&mut self, bytes: &[u8]) -> AvroResult<usize> {
487        self.writer
488            .write(bytes)
489            .map_err(|e| Details::WriteBytes(e).into())
490    }
491
492    /// Adds custom metadata to the file.
493    /// This method could be used only before adding the first record to the writer.
494    pub fn add_user_metadata<T: AsRef<[u8]>>(&mut self, key: String, value: T) -> AvroResult<()> {
495        if !self.has_header {
496            if key.starts_with("avro.") {
497                return Err(Details::InvalidMetadataKey(key).into());
498            }
499            self.user_metadata
500                .insert(key, Value::Bytes(value.as_ref().to_vec()));
501            Ok(())
502        } else {
503            Err(Details::FileHeaderAlreadyWritten.into())
504        }
505    }
506
507    /// Create an Avro header based on schema, codec and sync marker.
508    fn header(&self) -> Result<Vec<u8>, Error> {
509        let schema_bytes = serde_json::to_string(self.schema)
510            .map_err(Details::ConvertJsonToString)?
511            .into_bytes();
512
513        let mut metadata = HashMap::with_capacity(2);
514        metadata.insert("avro.schema", Value::Bytes(schema_bytes));
515        if self.codec != Codec::Null {
516            metadata.insert("avro.codec", self.codec.into());
517        }
518        match self.codec {
519            #[cfg(feature = "bzip")]
520            Codec::Bzip2(settings) => {
521                metadata.insert(
522                    "avro.codec.compression_level",
523                    Value::Bytes(vec![settings.compression_level]),
524                );
525            }
526            #[cfg(feature = "xz")]
527            Codec::Xz(settings) => {
528                metadata.insert(
529                    "avro.codec.compression_level",
530                    Value::Bytes(vec![settings.compression_level]),
531                );
532            }
533            #[cfg(feature = "zstandard")]
534            Codec::Zstandard(settings) => {
535                metadata.insert(
536                    "avro.codec.compression_level",
537                    Value::Bytes(vec![settings.compression_level]),
538                );
539            }
540            _ => {}
541        }
542
543        for (k, v) in &self.user_metadata {
544            metadata.insert(k.as_str(), v.clone());
545        }
546
547        let mut header = Vec::new();
548        header.extend_from_slice(AVRO_OBJECT_HEADER);
549        encode(
550            &metadata.into(),
551            &Schema::map(Schema::Bytes).build(),
552            &mut header,
553        )?;
554        header.extend_from_slice(&self.marker);
555
556        Ok(header)
557    }
558
559    fn maybe_write_header(&mut self) -> AvroResult<usize> {
560        if !self.has_header {
561            let header = self.header()?;
562            let n = self.append_bytes(header.as_ref())?;
563            self.has_header = true;
564            Ok(n)
565        } else {
566            Ok(0)
567        }
568    }
569}
570
571/// A buffer that can be cleared.
572pub trait Clearable {
573    /// Clear the buffer.
574    fn clear(&mut self);
575}
576
577impl Clearable for Vec<u8> {
578    fn clear(&mut self) {
579        Vec::clear(self);
580    }
581}
582
583impl<'a, W: Clearable + Write> Writer<'a, W> {
584    /// Reset the writer.
585    ///
586    /// This will clear the underlying writer, the internal buffer, and the user metadata.
587    /// It will also generate a new sync marker.
588    ///
589    /// # Example
590    /// ```
591    /// # use apache_avro::{Writer, Schema, Error};
592    /// # let schema = Schema::Boolean;
593    /// # let values = [true, false];
594    /// # fn send(_: &Vec<u8>) {}
595    /// let mut writer = Writer::new(&schema, Vec::new())?;
596    ///
597    /// // Write some values
598    /// for value in values {
599    ///     writer.append_value(value)?;
600    /// }
601    ///
602    /// // Flush the buffer and only then do something with buffer
603    /// writer.flush()?;
604    /// send(writer.get_ref());
605    ///
606    /// // Reset the writer
607    /// writer.reset();
608    ///
609    /// // Write some values again
610    /// for value in values {
611    ///     writer.append_value(value)?;
612    /// }
613    ///
614    /// # Ok::<(), Error>(())
615    /// ```
616    ///
617    /// # Warning
618    /// Any data that has been appended but not yet flushed will be silently
619    /// discarded. Call [`flush`](Writer::flush) before `reset()` if you need
620    /// to preserve in-flight records.
621    pub fn reset(&mut self) {
622        self.buffer.clear();
623        self.writer.clear();
624        self.has_header = false;
625        self.num_values = 0;
626        self.user_metadata.clear();
627        self.marker = generate_sync_marker();
628    }
629}
630
631impl<W: Write> Drop for Writer<'_, W> {
632    /// Drop the writer, will try to flush ignoring any errors.
633    fn drop(&mut self) {
634        let _ = self.maybe_write_header();
635        let _ = self.flush();
636    }
637}
638
639#[cfg(not(target_arch = "wasm32"))]
640fn generate_sync_marker() -> [u8; 16] {
641    rand::random()
642}
643
644#[cfg(target_arch = "wasm32")]
645fn generate_sync_marker() -> [u8; 16] {
646    let mut marker = [0_u8; 16];
647    std::iter::repeat_with(quad_rand::rand)
648        .take(4)
649        .flat_map(|i| i.to_be_bytes())
650        .enumerate()
651        .for_each(|(i, n)| marker[i] = n);
652    marker
653}
654
655#[cfg(test)]
656mod tests {
657    use std::{cell::RefCell, rc::Rc};
658
659    use super::*;
660    use crate::{Reader, types::Record, util::zig_i64};
661    use pretty_assertions::assert_eq;
662    use serde::{Deserialize, Serialize};
663
664    use crate::{codec::DeflateSettings, error::Details};
665    use apache_avro_test_helper::TestResult;
666
667    const AVRO_OBJECT_HEADER_LEN: usize = AVRO_OBJECT_HEADER.len();
668
669    const SCHEMA: &str = r#"
670    {
671      "type": "record",
672      "name": "test",
673      "fields": [
674        {
675          "name": "a",
676          "type": "long",
677          "default": 42
678        },
679        {
680          "name": "b",
681          "type": "string"
682        }
683      ]
684    }
685    "#;
686
687    #[test]
688    fn avro_rs_220_flush_write_header() -> TestResult {
689        let schema = Schema::parse_str(SCHEMA)?;
690
691        // By default flush should write the header even if nothing was added yet
692        let mut writer = Writer::new(&schema, Vec::new())?;
693        writer.flush()?;
694        let result = writer.into_inner()?;
695        assert_eq!(result.len(), 147);
696
697        // Unless the user indicates via the builder that the header has already been written
698        let mut writer = Writer::builder()
699            .has_header(true)
700            .schema(&schema)
701            .writer(Vec::new())
702            .build()?;
703        writer.flush()?;
704        let result = writer.into_inner()?;
705        assert_eq!(result.len(), 0);
706
707        Ok(())
708    }
709
710    #[test]
711    fn test_writer_append() -> TestResult {
712        let schema = Schema::parse_str(SCHEMA)?;
713        let mut writer = Writer::new(&schema, Vec::new())?;
714
715        let mut record = Record::new(&schema).unwrap();
716        record.put("a", 27i64);
717        record.put("b", "foo");
718
719        let n1 = writer.append_value(record.clone())?;
720        let n2 = writer.append_value(record.clone())?;
721        let n3 = writer.flush()?;
722        let result = writer.into_inner()?;
723
724        assert_eq!(n1 + n2 + n3, result.len());
725
726        let mut data = Vec::new();
727        zig_i64(27, &mut data)?;
728        zig_i64(3, &mut data)?;
729        data.extend(b"foo");
730        data.extend(data.clone());
731
732        // starts with magic
733        assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
734        // ends with data and sync marker
735        let last_data_byte = result.len() - 16;
736        assert_eq!(
737            &result[last_data_byte - data.len()..last_data_byte],
738            data.as_slice()
739        );
740
741        Ok(())
742    }
743
744    #[test]
745    fn test_writer_extend() -> TestResult {
746        let schema = Schema::parse_str(SCHEMA)?;
747        let mut writer = Writer::new(&schema, Vec::new())?;
748
749        let mut record = Record::new(&schema).unwrap();
750        record.put("a", 27i64);
751        record.put("b", "foo");
752        let record_copy = record.clone();
753        let records = vec![record, record_copy];
754
755        let n1 = writer.extend(records)?;
756        let n2 = writer.flush()?;
757        let result = writer.into_inner()?;
758
759        assert_eq!(n1 + n2, result.len());
760
761        let mut data = Vec::new();
762        zig_i64(27, &mut data)?;
763        zig_i64(3, &mut data)?;
764        data.extend(b"foo");
765        data.extend(data.clone());
766
767        // starts with magic
768        assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
769        // ends with data and sync marker
770        let last_data_byte = result.len() - 16;
771        assert_eq!(
772            &result[last_data_byte - data.len()..last_data_byte],
773            data.as_slice()
774        );
775
776        Ok(())
777    }
778
779    #[derive(Debug, Clone, Deserialize, Serialize)]
780    struct TestSerdeSerialize {
781        a: i64,
782        b: String,
783    }
784
785    #[test]
786    fn test_writer_append_ser() -> TestResult {
787        let schema = Schema::parse_str(SCHEMA)?;
788        let mut writer = Writer::new(&schema, Vec::new())?;
789
790        let record = TestSerdeSerialize {
791            a: 27,
792            b: "foo".to_owned(),
793        };
794
795        let n1 = writer.append_ser(record)?;
796        let n2 = writer.flush()?;
797        let result = writer.into_inner()?;
798
799        assert_eq!(n1 + n2, result.len());
800
801        let mut data = Vec::new();
802        zig_i64(27, &mut data)?;
803        zig_i64(3, &mut data)?;
804        data.extend(b"foo");
805
806        // starts with magic
807        assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
808        // ends with data and sync marker
809        let last_data_byte = result.len() - 16;
810        assert_eq!(
811            &result[last_data_byte - data.len()..last_data_byte],
812            data.as_slice()
813        );
814
815        Ok(())
816    }
817
818    #[test]
819    fn test_writer_extend_ser() -> TestResult {
820        let schema = Schema::parse_str(SCHEMA)?;
821        let mut writer = Writer::new(&schema, Vec::new())?;
822
823        let record = TestSerdeSerialize {
824            a: 27,
825            b: "foo".to_owned(),
826        };
827        let record_copy = record.clone();
828        let records = vec![record, record_copy];
829
830        let n1 = writer.extend_ser(records)?;
831        let n2 = writer.flush()?;
832        let result = writer.into_inner()?;
833
834        assert_eq!(n1 + n2, result.len());
835
836        let mut data = Vec::new();
837        zig_i64(27, &mut data)?;
838        zig_i64(3, &mut data)?;
839        data.extend(b"foo");
840        data.extend(data.clone());
841
842        // starts with magic
843        assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
844        // ends with data and sync marker
845        let last_data_byte = result.len() - 16;
846        assert_eq!(
847            &result[last_data_byte - data.len()..last_data_byte],
848            data.as_slice()
849        );
850
851        Ok(())
852    }
853
854    fn make_writer_with_codec(schema: &Schema) -> AvroResult<Writer<'_, Vec<u8>>> {
855        Writer::with_codec(
856            schema,
857            Vec::new(),
858            Codec::Deflate(DeflateSettings::default()),
859        )
860    }
861
862    fn make_writer_with_builder(schema: &Schema) -> AvroResult<Writer<'_, Vec<u8>>> {
863        Writer::builder()
864            .writer(Vec::new())
865            .schema(schema)
866            .codec(Codec::Deflate(DeflateSettings::default()))
867            .block_size(100)
868            .build()
869    }
870
871    fn check_writer(mut writer: Writer<'_, Vec<u8>>, schema: &Schema) -> TestResult {
872        let mut record = Record::new(schema).unwrap();
873        record.put("a", 27i64);
874        record.put("b", "foo");
875
876        let n1 = writer.append_value(record.clone())?;
877        let n2 = writer.append_value(record.clone())?;
878        let n3 = writer.flush()?;
879        let result = writer.into_inner()?;
880
881        assert_eq!(n1 + n2 + n3, result.len());
882
883        let mut data = Vec::new();
884        zig_i64(27, &mut data)?;
885        zig_i64(3, &mut data)?;
886        data.extend(b"foo");
887        data.extend(data.clone());
888        Codec::Deflate(DeflateSettings::default()).compress(&mut data)?;
889
890        // starts with magic
891        assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
892        // ends with data and sync marker
893        let last_data_byte = result.len() - 16;
894        assert_eq!(
895            &result[last_data_byte - data.len()..last_data_byte],
896            data.as_slice()
897        );
898
899        Ok(())
900    }
901
902    #[test]
903    fn test_writer_with_codec() -> TestResult {
904        let schema = Schema::parse_str(SCHEMA)?;
905        let writer = make_writer_with_codec(&schema)?;
906        check_writer(writer, &schema)
907    }
908
909    #[test]
910    fn test_writer_with_builder() -> TestResult {
911        let schema = Schema::parse_str(SCHEMA)?;
912        let writer = make_writer_with_builder(&schema)?;
913        check_writer(writer, &schema)
914    }
915
916    #[test]
917    fn test_logical_writer() -> TestResult {
918        const LOGICAL_TYPE_SCHEMA: &str = r#"
919        {
920          "type": "record",
921          "name": "logical_type_test",
922          "fields": [
923            {
924              "name": "a",
925              "type": [
926                "null",
927                {
928                  "type": "long",
929                  "logicalType": "timestamp-micros"
930                }
931              ]
932            }
933          ]
934        }
935        "#;
936        let codec = Codec::Deflate(DeflateSettings::default());
937        let schema = Schema::parse_str(LOGICAL_TYPE_SCHEMA)?;
938        let mut writer = Writer::builder()
939            .schema(&schema)
940            .codec(codec)
941            .writer(Vec::new())
942            .build()?;
943
944        let mut record1 = Record::new(&schema).unwrap();
945        record1.put(
946            "a",
947            Value::Union(1, Box::new(Value::TimestampMicros(1234_i64))),
948        );
949
950        let mut record2 = Record::new(&schema).unwrap();
951        record2.put("a", Value::Union(0, Box::new(Value::Null)));
952
953        let n1 = writer.append_value(record1)?;
954        let n2 = writer.append_value(record2)?;
955        let n3 = writer.flush()?;
956        let result = writer.into_inner()?;
957
958        assert_eq!(n1 + n2 + n3, result.len());
959
960        let mut data = Vec::new();
961        // byte indicating not null
962        zig_i64(1, &mut data)?;
963        zig_i64(1234, &mut data)?;
964
965        // byte indicating null
966        zig_i64(0, &mut data)?;
967        codec.compress(&mut data)?;
968
969        // starts with magic
970        assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
971        // ends with data and sync marker
972        let last_data_byte = result.len() - 16;
973        assert_eq!(
974            &result[last_data_byte - data.len()..last_data_byte],
975            data.as_slice()
976        );
977
978        Ok(())
979    }
980
981    #[test]
982    fn test_avro_3405_writer_add_metadata_success() -> TestResult {
983        let schema = Schema::parse_str(SCHEMA)?;
984        let mut writer = Writer::new(&schema, Vec::new())?;
985
986        writer.add_user_metadata("stringKey".to_string(), String::from("stringValue"))?;
987        writer.add_user_metadata("strKey".to_string(), "strValue")?;
988        writer.add_user_metadata("bytesKey".to_string(), b"bytesValue")?;
989        writer.add_user_metadata("vecKey".to_string(), vec![1, 2, 3])?;
990
991        let mut record = Record::new(&schema).unwrap();
992        record.put("a", 27i64);
993        record.put("b", "foo");
994
995        writer.append_value(record.clone())?;
996        writer.append_value(record.clone())?;
997        writer.flush()?;
998        let result = writer.into_inner()?;
999
1000        assert_eq!(result.len(), 244);
1001
1002        Ok(())
1003    }
1004
1005    #[test]
1006    fn test_avro_3881_metadata_empty_body() -> TestResult {
1007        let schema = Schema::parse_str(SCHEMA)?;
1008        let mut writer = Writer::new(&schema, Vec::new())?;
1009        writer.add_user_metadata("a".to_string(), "b")?;
1010        let result = writer.into_inner()?;
1011
1012        let reader = Reader::builder(&result[..])
1013            .reader_schema(&schema)
1014            .build()?;
1015        let mut expected = HashMap::new();
1016        expected.insert("a".to_string(), vec![b'b']);
1017        assert_eq!(reader.user_metadata(), &expected);
1018        assert_eq!(reader.into_iter().count(), 0);
1019
1020        Ok(())
1021    }
1022
1023    #[test]
1024    fn test_avro_3405_writer_add_metadata_failure() -> TestResult {
1025        let schema = Schema::parse_str(SCHEMA)?;
1026        let mut writer = Writer::new(&schema, Vec::new())?;
1027
1028        let mut record = Record::new(&schema).unwrap();
1029        record.put("a", 27i64);
1030        record.put("b", "foo");
1031        writer.append_value(record.clone())?;
1032
1033        match writer
1034            .add_user_metadata("stringKey".to_string(), String::from("value2"))
1035            .map_err(Error::into_details)
1036        {
1037            Err(e @ Details::FileHeaderAlreadyWritten) => {
1038                assert_eq!(e.to_string(), "The file metadata is already flushed.");
1039            }
1040            Err(e) => panic!("Unexpected error occurred while writing user metadata: {e:?}"),
1041            Ok(_) => panic!("Expected an error that metadata cannot be added after adding data"),
1042        }
1043
1044        Ok(())
1045    }
1046
1047    #[test]
1048    fn test_avro_3405_writer_add_metadata_reserved_prefix_failure() -> TestResult {
1049        let schema = Schema::parse_str(SCHEMA)?;
1050        let mut writer = Writer::new(&schema, Vec::new())?;
1051
1052        let key = "avro.stringKey".to_string();
1053        match writer
1054            .add_user_metadata(key.clone(), "value")
1055            .map_err(Error::into_details)
1056        {
1057            Err(ref e @ Details::InvalidMetadataKey(_)) => {
1058                assert_eq!(
1059                    e.to_string(),
1060                    format!(
1061                        "Metadata keys starting with 'avro.' are reserved for internal usage: {key}."
1062                    )
1063                );
1064            }
1065            Err(e) => panic!(
1066                "Unexpected error occurred while writing user metadata with reserved prefix ('avro.'): {e:?}"
1067            ),
1068            Ok(_) => {
1069                panic!("Expected an error that the metadata key cannot be prefixed with 'avro.'")
1070            }
1071        }
1072
1073        Ok(())
1074    }
1075
1076    #[test]
1077    fn test_avro_3405_writer_add_metadata_with_builder_api_success() -> TestResult {
1078        let schema = Schema::parse_str(SCHEMA)?;
1079
1080        let mut user_meta_data: HashMap<String, Value> = HashMap::new();
1081        user_meta_data.insert(
1082            "stringKey".to_string(),
1083            Value::String("stringValue".to_string()),
1084        );
1085        user_meta_data.insert("bytesKey".to_string(), Value::Bytes(b"bytesValue".to_vec()));
1086        user_meta_data.insert("vecKey".to_string(), Value::Bytes(vec![1, 2, 3]));
1087
1088        let writer: Writer<'_, Vec<u8>> = Writer::builder()
1089            .writer(Vec::new())
1090            .schema(&schema)
1091            .user_metadata(user_meta_data.clone())
1092            .build()?;
1093
1094        assert_eq!(writer.user_metadata, user_meta_data);
1095
1096        Ok(())
1097    }
1098
1099    #[test]
1100    fn avro_3894_take_aliases_into_account_when_serializing() -> TestResult {
1101        const SCHEMA: &str = r#"
1102  {
1103      "type": "record",
1104      "name": "Conference",
1105      "fields": [
1106          {"type": "string", "name": "name"},
1107          {"type": ["null", "long"], "name": "date", "aliases" : [ "time2", "time" ]}
1108      ]
1109  }"#;
1110
1111        #[derive(Debug, PartialEq, Eq, Clone, Serialize)]
1112        pub struct Conference {
1113            pub name: String,
1114            pub time: Option<i64>,
1115        }
1116
1117        let conf = Conference {
1118            name: "RustConf".to_string(),
1119            time: Some(1234567890),
1120        };
1121
1122        let schema = Schema::parse_str(SCHEMA)?;
1123        let mut writer = Writer::new(&schema, Vec::new())?;
1124
1125        let bytes = writer.append_ser(conf)?;
1126
1127        assert_eq!(182, bytes);
1128
1129        Ok(())
1130    }
1131
1132    #[test]
1133    fn avro_4014_validation_returns_a_detailed_error() -> TestResult {
1134        const SCHEMA: &str = r#"
1135  {
1136      "type": "record",
1137      "name": "Conference",
1138      "fields": [
1139          {"type": "string", "name": "name"},
1140          {"type": ["null", "long"], "name": "date", "aliases" : [ "time2", "time" ]}
1141      ]
1142  }"#;
1143
1144        #[derive(Debug, PartialEq, Clone, Serialize)]
1145        pub struct Conference {
1146            pub name: String,
1147            pub time: Option<f64>, // wrong type: f64 instead of i64
1148        }
1149
1150        let conf = Conference {
1151            name: "RustConf".to_string(),
1152            time: Some(12345678.90),
1153        };
1154
1155        let schema = Schema::parse_str(SCHEMA)?;
1156        let mut writer = Writer::new(&schema, Vec::new())?;
1157
1158        match writer.append_ser(conf) {
1159            Ok(bytes) => panic!("Expected an error, but got {bytes} bytes written"),
1160            Err(e) => {
1161                assert_eq!(
1162                    e.to_string(),
1163                    r#"Failed to serialize field 'date' of record RecordSchema { name: Name { name: "Conference", .. }, fields: [RecordField { name: "name", schema: String, .. }, RecordField { name: "date", aliases: ["time2", "time"], schema: Union(UnionSchema { schemas: [Null, Long] }), .. }], .. }: Failed to serialize value of type `f64`: Expected Schema::Double"#
1164                );
1165            }
1166        }
1167        Ok(())
1168    }
1169
1170    #[test]
1171    fn avro_4063_flush_applies_to_inner_writer() -> TestResult {
1172        const SCHEMA: &str = r#"
1173        {
1174            "type": "record",
1175            "name": "ExampleSchema",
1176            "fields": [
1177                {"name": "exampleField", "type": "string"}
1178            ]
1179        }
1180        "#;
1181
1182        #[derive(Clone, Default)]
1183        struct TestBuffer(Rc<RefCell<Vec<u8>>>);
1184
1185        impl TestBuffer {
1186            fn len(&self) -> usize {
1187                self.0.borrow().len()
1188            }
1189        }
1190
1191        impl Write for TestBuffer {
1192            fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
1193                self.0.borrow_mut().write(buf)
1194            }
1195
1196            fn flush(&mut self) -> std::io::Result<()> {
1197                Ok(())
1198            }
1199        }
1200
1201        let shared_buffer = TestBuffer::default();
1202
1203        let buffered_writer = std::io::BufWriter::new(shared_buffer.clone());
1204
1205        let schema = Schema::parse_str(SCHEMA)?;
1206
1207        let mut writer = Writer::new(&schema, buffered_writer)?;
1208
1209        let mut record = Record::new(writer.schema()).unwrap();
1210        record.put("exampleField", "value");
1211
1212        writer.append_value(record)?;
1213        writer.flush()?;
1214
1215        assert_eq!(
1216            shared_buffer.len(),
1217            151,
1218            "the test buffer was not fully written to after Writer::flush was called"
1219        );
1220
1221        Ok(())
1222    }
1223
1224    #[test]
1225    fn avro_rs_310_append_unvalidated_value() -> TestResult {
1226        let schema = Schema::String;
1227        let value = Value::Int(1);
1228
1229        let mut writer = Writer::new(&schema, Vec::new())?;
1230        writer.unvalidated_append_value_ref(&value)?;
1231        writer.unvalidated_append_value(value)?;
1232        let buffer = writer.into_inner()?;
1233
1234        // Check the last two bytes for the sync marker
1235        assert_eq!(&buffer[buffer.len() - 18..buffer.len() - 16], &[2, 2]);
1236
1237        let mut writer = Writer::new(&schema, Vec::new())?;
1238        let value = Value::Int(1);
1239        let err = writer.append_value_ref(&value).unwrap_err();
1240        assert_eq!(
1241            err.to_string(),
1242            "Value Int(1) does not match schema String: Reason: Unsupported value-schema combination! Value: Int(1), schema: String"
1243        );
1244        let err = writer.append_value(value).unwrap_err();
1245        assert_eq!(
1246            err.to_string(),
1247            "Value Int(1) does not match schema String: Reason: Unsupported value-schema combination! Value: Int(1), schema: String"
1248        );
1249
1250        Ok(())
1251    }
1252
1253    #[test]
1254    fn avro_rs_469_reset_writer() -> TestResult {
1255        let schema = Schema::Boolean;
1256        let values = [true, false, true, false];
1257        let mut writer = Writer::new(&schema, Vec::new())?;
1258
1259        for value in values {
1260            writer.append_value(value)?;
1261        }
1262
1263        writer.flush()?;
1264        let first_buffer = writer.get_ref().clone();
1265
1266        writer.reset();
1267        assert_eq!(writer.get_ref().len(), 0);
1268
1269        for value in values {
1270            writer.append_value(value)?;
1271        }
1272
1273        writer.flush()?;
1274        let second_buffer = writer.get_ref().clone();
1275        assert_eq!(first_buffer.len(), second_buffer.len());
1276        // File structure:
1277        // Header: ? bytes
1278        // Sync marker: 16 bytes
1279        // Data: 6 bytes
1280        // Sync marker: 16 bytes
1281        let len = first_buffer.len();
1282        let header = len - 16 - 6 - 16;
1283        let data = header + 16;
1284        assert_eq!(
1285            first_buffer[..header],
1286            second_buffer[..header],
1287            "Written header must be the same, excluding sync marker"
1288        );
1289        assert_ne!(
1290            first_buffer[header..data],
1291            second_buffer[header..data],
1292            "Sync markers should be different"
1293        );
1294        assert_eq!(
1295            first_buffer[data..data + 6],
1296            second_buffer[data..data + 6],
1297            "Written data must be the same"
1298        );
1299        assert_ne!(
1300            first_buffer[len - 16..],
1301            second_buffer[len - 16..],
1302            "Sync markers should be different"
1303        );
1304
1305        Ok(())
1306    }
1307}