apache_avro/writer/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Logic handling writing in Avro format at user level.
19use crate::{
20    AvroResult, Codec, Error,
21    encode::{encode, encode_internal, encode_to_vec},
22    error::Details,
23    schema::{ResolvedSchema, Schema},
24    serde::ser_schema::SchemaAwareWriteSerializer,
25    types::Value,
26};
27use serde::Serialize;
28use std::{collections::HashMap, io::Write, mem::ManuallyDrop};
29
30pub mod datum;
31pub mod single_object;
32
33const DEFAULT_BLOCK_SIZE: usize = 16000;
34const AVRO_OBJECT_HEADER: &[u8] = b"Obj\x01";
35
36/// Main interface for writing Avro formatted values.
37///
38/// It is critical to call flush before `Writer<W>` is dropped. Though dropping will attempt to flush
39/// the contents of the buffer, any errors that happen in the process of dropping will be ignored.
40/// Calling flush ensures that the buffer is empty and thus dropping will not even attempt file operations.
41pub struct Writer<'a, W: Write> {
42    schema: &'a Schema,
43    writer: W,
44    resolved_schema: ResolvedSchema<'a>,
45    codec: Codec,
46    block_size: usize,
47    buffer: Vec<u8>,
48    num_values: usize,
49    marker: [u8; 16],
50    has_header: bool,
51    user_metadata: HashMap<String, Value>,
52}
53
54#[bon::bon]
55impl<'a, W: Write> Writer<'a, W> {
56    #[builder]
57    pub fn builder(
58        schema: &'a Schema,
59        schemata: Option<Vec<&'a Schema>>,
60        writer: W,
61        #[builder(default = Codec::Null)] codec: Codec,
62        #[builder(default = DEFAULT_BLOCK_SIZE)] block_size: usize,
63        #[builder(default = generate_sync_marker())] marker: [u8; 16],
64        /// Has the header already been written.
65        ///
66        /// To disable writing the header, this can be set to `true`.
67        #[builder(default = false)]
68        has_header: bool,
69        #[builder(default)] user_metadata: HashMap<String, Value>,
70    ) -> AvroResult<Self> {
71        let resolved_schema = if let Some(schemata) = schemata {
72            ResolvedSchema::try_from(schemata)?
73        } else {
74            ResolvedSchema::try_from(schema)?
75        };
76        Ok(Self {
77            schema,
78            writer,
79            resolved_schema,
80            codec,
81            block_size,
82            buffer: Vec::with_capacity(block_size),
83            num_values: 0,
84            marker,
85            has_header,
86            user_metadata,
87        })
88    }
89}
90
91impl<'a, W: Write> Writer<'a, W> {
92    /// Creates a `Writer` given a `Schema` and something implementing the `io::Write` trait to write
93    /// to.
94    /// No compression `Codec` will be used.
95    pub fn new(schema: &'a Schema, writer: W) -> AvroResult<Self> {
96        Writer::with_codec(schema, writer, Codec::Null)
97    }
98
99    /// Creates a `Writer` with a specific `Codec` given a `Schema` and something implementing the
100    /// `io::Write` trait to write to.
101    pub fn with_codec(schema: &'a Schema, writer: W, codec: Codec) -> AvroResult<Self> {
102        Self::builder()
103            .schema(schema)
104            .writer(writer)
105            .codec(codec)
106            .build()
107    }
108
109    /// Creates a `Writer` with a specific `Codec` given a `Schema` and something implementing the
110    /// `io::Write` trait to write to.
111    /// If the `schema` is incomplete, i.e. contains `Schema::Ref`s then all dependencies must
112    /// be provided in `schemata`.
113    pub fn with_schemata(
114        schema: &'a Schema,
115        schemata: Vec<&'a Schema>,
116        writer: W,
117        codec: Codec,
118    ) -> AvroResult<Self> {
119        Self::builder()
120            .schema(schema)
121            .schemata(schemata)
122            .writer(writer)
123            .codec(codec)
124            .build()
125    }
126
127    /// Creates a `Writer` that will append values to already populated
128    /// `std::io::Write` using the provided `marker`
129    /// No compression `Codec` will be used.
130    pub fn append_to(schema: &'a Schema, writer: W, marker: [u8; 16]) -> AvroResult<Self> {
131        Writer::append_to_with_codec(schema, writer, Codec::Null, marker)
132    }
133
134    /// Creates a `Writer` that will append values to already populated
135    /// `std::io::Write` using the provided `marker`
136    pub fn append_to_with_codec(
137        schema: &'a Schema,
138        writer: W,
139        codec: Codec,
140        marker: [u8; 16],
141    ) -> AvroResult<Self> {
142        Self::builder()
143            .schema(schema)
144            .writer(writer)
145            .codec(codec)
146            .marker(marker)
147            .has_header(true)
148            .build()
149    }
150
151    /// Creates a `Writer` that will append values to already populated
152    /// `std::io::Write` using the provided `marker`
153    pub fn append_to_with_codec_schemata(
154        schema: &'a Schema,
155        schemata: Vec<&'a Schema>,
156        writer: W,
157        codec: Codec,
158        marker: [u8; 16],
159    ) -> AvroResult<Self> {
160        Self::builder()
161            .schema(schema)
162            .schemata(schemata)
163            .writer(writer)
164            .codec(codec)
165            .marker(marker)
166            .has_header(true)
167            .build()
168    }
169
170    /// Get a reference to the `Schema` associated to a `Writer`.
171    pub fn schema(&self) -> &'a Schema {
172        self.schema
173    }
174
175    /// Deprecated. Use [`Writer::append_value`] instead.
176    #[deprecated(since = "0.22.0", note = "Use `Writer::append_value` instead")]
177    pub fn append<T: Into<Value>>(&mut self, value: T) -> AvroResult<usize> {
178        self.append_value(value)
179    }
180
181    /// Append a value to the `Writer`, also performs schema validation.
182    ///
183    /// Returns the number of bytes written (it might be 0, see below).
184    ///
185    /// **NOTE**: This function is not guaranteed to perform any actual write, since it relies on
186    /// internal buffering for performance reasons. If you want to be sure the value has been
187    /// written, then call [`flush`](Writer::flush).
188    pub fn append_value<T: Into<Value>>(&mut self, value: T) -> AvroResult<usize> {
189        let avro = value.into();
190        self.append_value_ref(&avro)
191    }
192
193    /// Append a compatible value to a `Writer`, also performs schema validation.
194    ///
195    /// Returns the number of bytes written (it might be 0, see below).
196    ///
197    /// **NOTE**: This function is not guaranteed to perform any actual write, since it relies on
198    /// internal buffering for performance reasons. If you want to be sure the value has been
199    /// written, then call [`flush`](Writer::flush).
200    pub fn append_value_ref(&mut self, value: &Value) -> AvroResult<usize> {
201        if let Some(reason) = value.validate_internal(
202            self.schema,
203            self.resolved_schema.get_names(),
204            self.schema.namespace(),
205        ) {
206            return Err(Details::ValidationWithReason {
207                value: value.clone(),
208                schema: self.schema.clone(),
209                reason,
210            }
211            .into());
212        }
213        self.unvalidated_append_value_ref(value)
214    }
215
216    /// Append a compatible value to a `Writer`.
217    ///
218    /// This function does **not** validate that the provided value matches the schema. If it does
219    /// not match, the file will contain corrupt data. Use [`Writer::append_value`] to have the
220    /// value validated during write or use [`Value::validate`] to validate the value.
221    ///
222    /// Returns the number of bytes written (it might be 0, see below).
223    ///
224    /// **NOTE**: This function is not guaranteed to perform any actual write, since it relies on
225    /// internal buffering for performance reasons. If you want to be sure the value has been
226    /// written, then call [`flush`](Writer::flush).
227    pub fn unvalidated_append_value<T: Into<Value>>(&mut self, value: T) -> AvroResult<usize> {
228        let value = value.into();
229        self.unvalidated_append_value_ref(&value)
230    }
231
232    /// Append a compatible value to a `Writer`.
233    ///
234    /// This function does **not** validate that the provided value matches the schema. If it does
235    /// not match, the file will contain corrupt data. Use [`Writer::append_value_ref`] to have the
236    /// value validated during write or use [`Value::validate`] to validate the value.
237    ///
238    /// Returns the number of bytes written (it might be 0, see below).
239    ///
240    /// **NOTE**: This function is not guaranteed to perform any actual write, since it relies on
241    /// internal buffering for performance reasons. If you want to be sure the value has been
242    /// written, then call [`flush`](Writer::flush).
243    pub fn unvalidated_append_value_ref(&mut self, value: &Value) -> AvroResult<usize> {
244        let n = self.maybe_write_header()?;
245        encode_internal(
246            value,
247            self.schema,
248            self.resolved_schema.get_names(),
249            self.schema.namespace(),
250            &mut self.buffer,
251        )?;
252
253        self.num_values += 1;
254
255        if self.buffer.len() >= self.block_size {
256            return self.flush().map(|b| b + n);
257        }
258
259        Ok(n)
260    }
261
262    /// Append anything implementing the `Serialize` trait to a `Writer` for
263    /// [`serde`](https://docs.serde.rs/serde/index.html) compatibility, also performing schema
264    /// validation.
265    ///
266    /// Returns the number of bytes written.
267    ///
268    /// **NOTE**: This function is not guaranteed to perform any actual write, since it relies on
269    /// internal buffering for performance reasons. If you want to be sure the value has been
270    /// written, then call [`flush`](Writer::flush).
271    pub fn append_ser<S: Serialize>(&mut self, value: S) -> AvroResult<usize> {
272        let n = self.maybe_write_header()?;
273
274        let mut serializer = SchemaAwareWriteSerializer::new(
275            &mut self.buffer,
276            self.schema,
277            self.resolved_schema.get_names(),
278            None,
279        );
280        value.serialize(&mut serializer)?;
281        self.num_values += 1;
282
283        if self.buffer.len() >= self.block_size {
284            return self.flush().map(|b| b + n);
285        }
286
287        Ok(n)
288    }
289
290    /// Extend a `Writer` with an `Iterator` of values, also performs schema validation.
291    ///
292    /// Returns the number of bytes written.
293    ///
294    /// **NOTE**: This function forces the written data to be flushed (an implicit
295    /// call to [`flush`](Writer::flush) is performed).
296    pub fn extend<I, T: Into<Value>>(&mut self, values: I) -> AvroResult<usize>
297    where
298        I: IntoIterator<Item = T>,
299    {
300        /*
301        https://github.com/rust-lang/rfcs/issues/811 :(
302        let mut stream = values
303            .filter_map(|value| value.serialize(&mut self.serializer).ok())
304            .map(|value| value.encode(self.schema))
305            .collect::<Option<Vec<_>>>()
306            .ok_or_else(|| err_msg("value does not match given schema"))?
307            .into_iter()
308            .fold(Vec::new(), |mut acc, stream| {
309                num_values += 1;
310                acc.extend(stream); acc
311            });
312        */
313
314        let mut num_bytes = 0;
315        for value in values {
316            num_bytes += self.append_value(value)?;
317        }
318        num_bytes += self.flush()?;
319
320        Ok(num_bytes)
321    }
322
323    /// Extend a `Writer` with an `Iterator` of anything implementing the `Serialize` trait for
324    /// [`serde`](https://docs.serde.rs/serde/index.html) compatibility, also performing schema
325    /// validation.
326    ///
327    /// Returns the number of bytes written.
328    ///
329    /// **NOTE**: This function forces the written data to be flushed (an implicit
330    /// call to [`flush`](Writer::flush) is performed).
331    pub fn extend_ser<I, T: Serialize>(&mut self, values: I) -> AvroResult<usize>
332    where
333        I: IntoIterator<Item = T>,
334    {
335        /*
336        https://github.com/rust-lang/rfcs/issues/811 :(
337        let mut stream = values
338            .filter_map(|value| value.serialize(&mut self.serializer).ok())
339            .map(|value| value.encode(self.schema))
340            .collect::<Option<Vec<_>>>()
341            .ok_or_else(|| err_msg("value does not match given schema"))?
342            .into_iter()
343            .fold(Vec::new(), |mut acc, stream| {
344                num_values += 1;
345                acc.extend(stream); acc
346            });
347        */
348
349        let mut num_bytes = 0;
350        for value in values {
351            num_bytes += self.append_ser(value)?;
352        }
353        num_bytes += self.flush()?;
354
355        Ok(num_bytes)
356    }
357
358    /// Extend a `Writer` by appending each `Value` from a slice, while also performing schema
359    /// validation on each value appended.
360    ///
361    /// Returns the number of bytes written.
362    ///
363    /// **NOTE**: This function forces the written data to be flushed (an implicit
364    /// call to [`flush`](Writer::flush) is performed).
365    pub fn extend_from_slice(&mut self, values: &[Value]) -> AvroResult<usize> {
366        let mut num_bytes = 0;
367        for value in values {
368            num_bytes += self.append_value_ref(value)?;
369        }
370        num_bytes += self.flush()?;
371
372        Ok(num_bytes)
373    }
374
375    /// Flush the content to the inner `Writer`.
376    ///
377    /// Call this function to make sure all the content has been written before releasing the `Writer`.
378    /// This will also write the header if it wasn't written yet and hasn't been disabled using
379    /// [`WriterBuilder::has_header`].
380    ///
381    /// Returns the number of bytes written.
382    pub fn flush(&mut self) -> AvroResult<usize> {
383        let mut num_bytes = self.maybe_write_header()?;
384        if self.num_values == 0 {
385            return Ok(num_bytes);
386        }
387
388        self.codec.compress(&mut self.buffer)?;
389
390        let num_values = self.num_values;
391        let stream_len = self.buffer.len();
392
393        num_bytes += self.append_raw(&num_values.try_into()?, &Schema::Long)?
394            + self.append_raw(&stream_len.try_into()?, &Schema::Long)?
395            + self
396                .writer
397                .write(self.buffer.as_ref())
398                .map_err(Details::WriteBytes)?
399            + self.append_marker()?;
400
401        self.buffer.clear();
402        self.num_values = 0;
403
404        self.writer.flush().map_err(Details::FlushWriter)?;
405
406        Ok(num_bytes)
407    }
408
409    /// Return what the `Writer` is writing to, consuming the `Writer` itself.
410    ///
411    /// **NOTE**: This function forces the written data to be flushed (an implicit
412    /// call to [`flush`](Writer::flush) is performed).
413    pub fn into_inner(mut self) -> AvroResult<W> {
414        self.maybe_write_header()?;
415        self.flush()?;
416
417        let mut this = ManuallyDrop::new(self);
418
419        // Extract every member that is not Copy and therefore should be dropped
420        let _buffer = std::mem::take(&mut this.buffer);
421        let _user_metadata = std::mem::take(&mut this.user_metadata);
422        // SAFETY: resolved schema is not accessed after this and won't be dropped because of ManuallyDrop
423        unsafe { std::ptr::drop_in_place(&mut this.resolved_schema) };
424
425        // SAFETY: double-drops are prevented by putting `this` in a ManuallyDrop that is never dropped
426        let writer = unsafe { std::ptr::read(&this.writer) };
427
428        Ok(writer)
429    }
430
431    /// Gets a reference to the underlying writer.
432    ///
433    /// **NOTE**: There is likely data still in the buffer. To have all the data
434    /// in the writer call [`flush`](Writer::flush) first.
435    pub fn get_ref(&self) -> &W {
436        &self.writer
437    }
438
439    /// Gets a mutable reference to the underlying writer.
440    ///
441    /// It is inadvisable to directly write to the underlying writer.
442    ///
443    /// **NOTE**: There is likely data still in the buffer. To have all the data
444    /// in the writer call [`flush`](Writer::flush) first.
445    pub fn get_mut(&mut self) -> &mut W {
446        &mut self.writer
447    }
448
449    /// Generate and append synchronization marker to the payload.
450    fn append_marker(&mut self) -> AvroResult<usize> {
451        // using .writer.write directly to avoid mutable borrow of self
452        // with ref borrowing of self.marker
453        self.writer
454            .write(&self.marker)
455            .map_err(|e| Details::WriteMarker(e).into())
456    }
457
458    /// Append a raw Avro Value to the payload avoiding to encode it again.
459    fn append_raw(&mut self, value: &Value, schema: &Schema) -> AvroResult<usize> {
460        self.append_bytes(encode_to_vec(value, schema)?.as_ref())
461    }
462
463    /// Append pure bytes to the payload.
464    fn append_bytes(&mut self, bytes: &[u8]) -> AvroResult<usize> {
465        self.writer
466            .write(bytes)
467            .map_err(|e| Details::WriteBytes(e).into())
468    }
469
470    /// Adds custom metadata to the file.
471    /// This method could be used only before adding the first record to the writer.
472    pub fn add_user_metadata<T: AsRef<[u8]>>(&mut self, key: String, value: T) -> AvroResult<()> {
473        if !self.has_header {
474            if key.starts_with("avro.") {
475                return Err(Details::InvalidMetadataKey(key).into());
476            }
477            self.user_metadata
478                .insert(key, Value::Bytes(value.as_ref().to_vec()));
479            Ok(())
480        } else {
481            Err(Details::FileHeaderAlreadyWritten.into())
482        }
483    }
484
485    /// Create an Avro header based on schema, codec and sync marker.
486    fn header(&self) -> Result<Vec<u8>, Error> {
487        let schema_bytes = serde_json::to_string(self.schema)
488            .map_err(Details::ConvertJsonToString)?
489            .into_bytes();
490
491        let mut metadata = HashMap::with_capacity(2);
492        metadata.insert("avro.schema", Value::Bytes(schema_bytes));
493        if self.codec != Codec::Null {
494            metadata.insert("avro.codec", self.codec.into());
495        }
496        match self.codec {
497            #[cfg(feature = "bzip")]
498            Codec::Bzip2(settings) => {
499                metadata.insert(
500                    "avro.codec.compression_level",
501                    Value::Bytes(vec![settings.compression_level]),
502                );
503            }
504            #[cfg(feature = "xz")]
505            Codec::Xz(settings) => {
506                metadata.insert(
507                    "avro.codec.compression_level",
508                    Value::Bytes(vec![settings.compression_level]),
509                );
510            }
511            #[cfg(feature = "zstandard")]
512            Codec::Zstandard(settings) => {
513                metadata.insert(
514                    "avro.codec.compression_level",
515                    Value::Bytes(vec![settings.compression_level]),
516                );
517            }
518            _ => {}
519        }
520
521        for (k, v) in &self.user_metadata {
522            metadata.insert(k.as_str(), v.clone());
523        }
524
525        let mut header = Vec::new();
526        header.extend_from_slice(AVRO_OBJECT_HEADER);
527        encode(
528            &metadata.into(),
529            &Schema::map(Schema::Bytes).build(),
530            &mut header,
531        )?;
532        header.extend_from_slice(&self.marker);
533
534        Ok(header)
535    }
536
537    fn maybe_write_header(&mut self) -> AvroResult<usize> {
538        if !self.has_header {
539            let header = self.header()?;
540            let n = self.append_bytes(header.as_ref())?;
541            self.has_header = true;
542            Ok(n)
543        } else {
544            Ok(0)
545        }
546    }
547}
548
549/// A buffer that can be cleared.
550pub trait Clearable {
551    /// Clear the buffer.
552    fn clear(&mut self);
553}
554
555impl Clearable for Vec<u8> {
556    fn clear(&mut self) {
557        Vec::clear(self);
558    }
559}
560
561impl<'a, W: Clearable + Write> Writer<'a, W> {
562    /// Reset the writer.
563    ///
564    /// This will clear the underlying writer, the internal buffer, and the user metadata.
565    /// It will also generate a new sync marker.
566    ///
567    /// # Example
568    /// ```
569    /// # use apache_avro::{Writer, Schema, Error};
570    /// # let schema = Schema::Boolean;
571    /// # let values = [true, false];
572    /// # fn send(_: &Vec<u8>) {}
573    /// let mut writer = Writer::new(&schema, Vec::new())?;
574    ///
575    /// // Write some values
576    /// for value in values {
577    ///     writer.append_value(value)?;
578    /// }
579    ///
580    /// // Flush the buffer and only then do something with buffer
581    /// writer.flush()?;
582    /// send(writer.get_ref());
583    ///
584    /// // Reset the writer
585    /// writer.reset();
586    ///
587    /// // Write some values again
588    /// for value in values {
589    ///     writer.append_value(value)?;
590    /// }
591    ///
592    /// # Ok::<(), Error>(())
593    /// ```
594    ///
595    /// # Warning
596    /// Any data that has been appended but not yet flushed will be silently
597    /// discarded. Call [`flush`](Writer::flush) before `reset()` if you need
598    /// to preserve in-flight records.
599    pub fn reset(&mut self) {
600        self.buffer.clear();
601        self.writer.clear();
602        self.has_header = false;
603        self.num_values = 0;
604        self.user_metadata.clear();
605        self.marker = generate_sync_marker();
606    }
607}
608
609impl<W: Write> Drop for Writer<'_, W> {
610    /// Drop the writer, will try to flush ignoring any errors.
611    fn drop(&mut self) {
612        let _ = self.maybe_write_header();
613        let _ = self.flush();
614    }
615}
616
617#[cfg(not(target_arch = "wasm32"))]
618fn generate_sync_marker() -> [u8; 16] {
619    rand::random()
620}
621
622#[cfg(target_arch = "wasm32")]
623fn generate_sync_marker() -> [u8; 16] {
624    let mut marker = [0_u8; 16];
625    std::iter::repeat_with(quad_rand::rand)
626        .take(4)
627        .flat_map(|i| i.to_be_bytes())
628        .enumerate()
629        .for_each(|(i, n)| marker[i] = n);
630    marker
631}
632
633#[cfg(test)]
634mod tests {
635    use std::{cell::RefCell, rc::Rc};
636
637    use super::*;
638    use crate::{Reader, types::Record, util::zig_i64};
639    use pretty_assertions::assert_eq;
640    use serde::{Deserialize, Serialize};
641
642    use crate::{codec::DeflateSettings, error::Details};
643    use apache_avro_test_helper::TestResult;
644
645    const AVRO_OBJECT_HEADER_LEN: usize = AVRO_OBJECT_HEADER.len();
646
647    const SCHEMA: &str = r#"
648    {
649      "type": "record",
650      "name": "test",
651      "fields": [
652        {
653          "name": "a",
654          "type": "long",
655          "default": 42
656        },
657        {
658          "name": "b",
659          "type": "string"
660        }
661      ]
662    }
663    "#;
664
665    #[test]
666    fn avro_rs_220_flush_write_header() -> TestResult {
667        let schema = Schema::parse_str(SCHEMA)?;
668
669        // By default flush should write the header even if nothing was added yet
670        let mut writer = Writer::new(&schema, Vec::new())?;
671        writer.flush()?;
672        let result = writer.into_inner()?;
673        assert_eq!(result.len(), 147);
674
675        // Unless the user indicates via the builder that the header has already been written
676        let mut writer = Writer::builder()
677            .has_header(true)
678            .schema(&schema)
679            .writer(Vec::new())
680            .build()?;
681        writer.flush()?;
682        let result = writer.into_inner()?;
683        assert_eq!(result.len(), 0);
684
685        Ok(())
686    }
687
688    #[test]
689    fn test_writer_append() -> TestResult {
690        let schema = Schema::parse_str(SCHEMA)?;
691        let mut writer = Writer::new(&schema, Vec::new())?;
692
693        let mut record = Record::new(&schema).unwrap();
694        record.put("a", 27i64);
695        record.put("b", "foo");
696
697        let n1 = writer.append_value(record.clone())?;
698        let n2 = writer.append_value(record.clone())?;
699        let n3 = writer.flush()?;
700        let result = writer.into_inner()?;
701
702        assert_eq!(n1 + n2 + n3, result.len());
703
704        let mut data = Vec::new();
705        zig_i64(27, &mut data)?;
706        zig_i64(3, &mut data)?;
707        data.extend(b"foo");
708        data.extend(data.clone());
709
710        // starts with magic
711        assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
712        // ends with data and sync marker
713        let last_data_byte = result.len() - 16;
714        assert_eq!(
715            &result[last_data_byte - data.len()..last_data_byte],
716            data.as_slice()
717        );
718
719        Ok(())
720    }
721
722    #[test]
723    fn test_writer_extend() -> TestResult {
724        let schema = Schema::parse_str(SCHEMA)?;
725        let mut writer = Writer::new(&schema, Vec::new())?;
726
727        let mut record = Record::new(&schema).unwrap();
728        record.put("a", 27i64);
729        record.put("b", "foo");
730        let record_copy = record.clone();
731        let records = vec![record, record_copy];
732
733        let n1 = writer.extend(records)?;
734        let n2 = writer.flush()?;
735        let result = writer.into_inner()?;
736
737        assert_eq!(n1 + n2, result.len());
738
739        let mut data = Vec::new();
740        zig_i64(27, &mut data)?;
741        zig_i64(3, &mut data)?;
742        data.extend(b"foo");
743        data.extend(data.clone());
744
745        // starts with magic
746        assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
747        // ends with data and sync marker
748        let last_data_byte = result.len() - 16;
749        assert_eq!(
750            &result[last_data_byte - data.len()..last_data_byte],
751            data.as_slice()
752        );
753
754        Ok(())
755    }
756
757    #[derive(Debug, Clone, Deserialize, Serialize)]
758    struct TestSerdeSerialize {
759        a: i64,
760        b: String,
761    }
762
763    #[test]
764    fn test_writer_append_ser() -> TestResult {
765        let schema = Schema::parse_str(SCHEMA)?;
766        let mut writer = Writer::new(&schema, Vec::new())?;
767
768        let record = TestSerdeSerialize {
769            a: 27,
770            b: "foo".to_owned(),
771        };
772
773        let n1 = writer.append_ser(record)?;
774        let n2 = writer.flush()?;
775        let result = writer.into_inner()?;
776
777        assert_eq!(n1 + n2, result.len());
778
779        let mut data = Vec::new();
780        zig_i64(27, &mut data)?;
781        zig_i64(3, &mut data)?;
782        data.extend(b"foo");
783
784        // starts with magic
785        assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
786        // ends with data and sync marker
787        let last_data_byte = result.len() - 16;
788        assert_eq!(
789            &result[last_data_byte - data.len()..last_data_byte],
790            data.as_slice()
791        );
792
793        Ok(())
794    }
795
796    #[test]
797    fn test_writer_extend_ser() -> TestResult {
798        let schema = Schema::parse_str(SCHEMA)?;
799        let mut writer = Writer::new(&schema, Vec::new())?;
800
801        let record = TestSerdeSerialize {
802            a: 27,
803            b: "foo".to_owned(),
804        };
805        let record_copy = record.clone();
806        let records = vec![record, record_copy];
807
808        let n1 = writer.extend_ser(records)?;
809        let n2 = writer.flush()?;
810        let result = writer.into_inner()?;
811
812        assert_eq!(n1 + n2, result.len());
813
814        let mut data = Vec::new();
815        zig_i64(27, &mut data)?;
816        zig_i64(3, &mut data)?;
817        data.extend(b"foo");
818        data.extend(data.clone());
819
820        // starts with magic
821        assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
822        // ends with data and sync marker
823        let last_data_byte = result.len() - 16;
824        assert_eq!(
825            &result[last_data_byte - data.len()..last_data_byte],
826            data.as_slice()
827        );
828
829        Ok(())
830    }
831
832    fn make_writer_with_codec(schema: &Schema) -> AvroResult<Writer<'_, Vec<u8>>> {
833        Writer::with_codec(
834            schema,
835            Vec::new(),
836            Codec::Deflate(DeflateSettings::default()),
837        )
838    }
839
840    fn make_writer_with_builder(schema: &Schema) -> AvroResult<Writer<'_, Vec<u8>>> {
841        Writer::builder()
842            .writer(Vec::new())
843            .schema(schema)
844            .codec(Codec::Deflate(DeflateSettings::default()))
845            .block_size(100)
846            .build()
847    }
848
849    fn check_writer(mut writer: Writer<'_, Vec<u8>>, schema: &Schema) -> TestResult {
850        let mut record = Record::new(schema).unwrap();
851        record.put("a", 27i64);
852        record.put("b", "foo");
853
854        let n1 = writer.append_value(record.clone())?;
855        let n2 = writer.append_value(record.clone())?;
856        let n3 = writer.flush()?;
857        let result = writer.into_inner()?;
858
859        assert_eq!(n1 + n2 + n3, result.len());
860
861        let mut data = Vec::new();
862        zig_i64(27, &mut data)?;
863        zig_i64(3, &mut data)?;
864        data.extend(b"foo");
865        data.extend(data.clone());
866        Codec::Deflate(DeflateSettings::default()).compress(&mut data)?;
867
868        // starts with magic
869        assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
870        // ends with data and sync marker
871        let last_data_byte = result.len() - 16;
872        assert_eq!(
873            &result[last_data_byte - data.len()..last_data_byte],
874            data.as_slice()
875        );
876
877        Ok(())
878    }
879
880    #[test]
881    fn test_writer_with_codec() -> TestResult {
882        let schema = Schema::parse_str(SCHEMA)?;
883        let writer = make_writer_with_codec(&schema)?;
884        check_writer(writer, &schema)
885    }
886
887    #[test]
888    fn test_writer_with_builder() -> TestResult {
889        let schema = Schema::parse_str(SCHEMA)?;
890        let writer = make_writer_with_builder(&schema)?;
891        check_writer(writer, &schema)
892    }
893
894    #[test]
895    fn test_logical_writer() -> TestResult {
896        const LOGICAL_TYPE_SCHEMA: &str = r#"
897        {
898          "type": "record",
899          "name": "logical_type_test",
900          "fields": [
901            {
902              "name": "a",
903              "type": [
904                "null",
905                {
906                  "type": "long",
907                  "logicalType": "timestamp-micros"
908                }
909              ]
910            }
911          ]
912        }
913        "#;
914        let codec = Codec::Deflate(DeflateSettings::default());
915        let schema = Schema::parse_str(LOGICAL_TYPE_SCHEMA)?;
916        let mut writer = Writer::builder()
917            .schema(&schema)
918            .codec(codec)
919            .writer(Vec::new())
920            .build()?;
921
922        let mut record1 = Record::new(&schema).unwrap();
923        record1.put(
924            "a",
925            Value::Union(1, Box::new(Value::TimestampMicros(1234_i64))),
926        );
927
928        let mut record2 = Record::new(&schema).unwrap();
929        record2.put("a", Value::Union(0, Box::new(Value::Null)));
930
931        let n1 = writer.append_value(record1)?;
932        let n2 = writer.append_value(record2)?;
933        let n3 = writer.flush()?;
934        let result = writer.into_inner()?;
935
936        assert_eq!(n1 + n2 + n3, result.len());
937
938        let mut data = Vec::new();
939        // byte indicating not null
940        zig_i64(1, &mut data)?;
941        zig_i64(1234, &mut data)?;
942
943        // byte indicating null
944        zig_i64(0, &mut data)?;
945        codec.compress(&mut data)?;
946
947        // starts with magic
948        assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
949        // ends with data and sync marker
950        let last_data_byte = result.len() - 16;
951        assert_eq!(
952            &result[last_data_byte - data.len()..last_data_byte],
953            data.as_slice()
954        );
955
956        Ok(())
957    }
958
959    #[test]
960    fn test_avro_3405_writer_add_metadata_success() -> TestResult {
961        let schema = Schema::parse_str(SCHEMA)?;
962        let mut writer = Writer::new(&schema, Vec::new())?;
963
964        writer.add_user_metadata("stringKey".to_string(), String::from("stringValue"))?;
965        writer.add_user_metadata("strKey".to_string(), "strValue")?;
966        writer.add_user_metadata("bytesKey".to_string(), b"bytesValue")?;
967        writer.add_user_metadata("vecKey".to_string(), vec![1, 2, 3])?;
968
969        let mut record = Record::new(&schema).unwrap();
970        record.put("a", 27i64);
971        record.put("b", "foo");
972
973        writer.append_value(record.clone())?;
974        writer.append_value(record.clone())?;
975        writer.flush()?;
976        let result = writer.into_inner()?;
977
978        assert_eq!(result.len(), 244);
979
980        Ok(())
981    }
982
983    #[test]
984    fn test_avro_3881_metadata_empty_body() -> TestResult {
985        let schema = Schema::parse_str(SCHEMA)?;
986        let mut writer = Writer::new(&schema, Vec::new())?;
987        writer.add_user_metadata("a".to_string(), "b")?;
988        let result = writer.into_inner()?;
989
990        let reader = Reader::builder(&result[..])
991            .reader_schema(&schema)
992            .build()?;
993        let mut expected = HashMap::new();
994        expected.insert("a".to_string(), vec![b'b']);
995        assert_eq!(reader.user_metadata(), &expected);
996        assert_eq!(reader.into_iter().count(), 0);
997
998        Ok(())
999    }
1000
1001    #[test]
1002    fn test_avro_3405_writer_add_metadata_failure() -> TestResult {
1003        let schema = Schema::parse_str(SCHEMA)?;
1004        let mut writer = Writer::new(&schema, Vec::new())?;
1005
1006        let mut record = Record::new(&schema).unwrap();
1007        record.put("a", 27i64);
1008        record.put("b", "foo");
1009        writer.append_value(record.clone())?;
1010
1011        match writer
1012            .add_user_metadata("stringKey".to_string(), String::from("value2"))
1013            .map_err(Error::into_details)
1014        {
1015            Err(e @ Details::FileHeaderAlreadyWritten) => {
1016                assert_eq!(e.to_string(), "The file metadata is already flushed.")
1017            }
1018            Err(e) => panic!("Unexpected error occurred while writing user metadata: {e:?}"),
1019            Ok(_) => panic!("Expected an error that metadata cannot be added after adding data"),
1020        }
1021
1022        Ok(())
1023    }
1024
1025    #[test]
1026    fn test_avro_3405_writer_add_metadata_reserved_prefix_failure() -> TestResult {
1027        let schema = Schema::parse_str(SCHEMA)?;
1028        let mut writer = Writer::new(&schema, Vec::new())?;
1029
1030        let key = "avro.stringKey".to_string();
1031        match writer
1032            .add_user_metadata(key.clone(), "value")
1033            .map_err(Error::into_details)
1034        {
1035            Err(ref e @ Details::InvalidMetadataKey(_)) => {
1036                assert_eq!(
1037                    e.to_string(),
1038                    format!(
1039                        "Metadata keys starting with 'avro.' are reserved for internal usage: {key}."
1040                    )
1041                )
1042            }
1043            Err(e) => panic!(
1044                "Unexpected error occurred while writing user metadata with reserved prefix ('avro.'): {e:?}"
1045            ),
1046            Ok(_) => {
1047                panic!("Expected an error that the metadata key cannot be prefixed with 'avro.'")
1048            }
1049        }
1050
1051        Ok(())
1052    }
1053
1054    #[test]
1055    fn test_avro_3405_writer_add_metadata_with_builder_api_success() -> TestResult {
1056        let schema = Schema::parse_str(SCHEMA)?;
1057
1058        let mut user_meta_data: HashMap<String, Value> = HashMap::new();
1059        user_meta_data.insert(
1060            "stringKey".to_string(),
1061            Value::String("stringValue".to_string()),
1062        );
1063        user_meta_data.insert("bytesKey".to_string(), Value::Bytes(b"bytesValue".to_vec()));
1064        user_meta_data.insert("vecKey".to_string(), Value::Bytes(vec![1, 2, 3]));
1065
1066        let writer: Writer<'_, Vec<u8>> = Writer::builder()
1067            .writer(Vec::new())
1068            .schema(&schema)
1069            .user_metadata(user_meta_data.clone())
1070            .build()?;
1071
1072        assert_eq!(writer.user_metadata, user_meta_data);
1073
1074        Ok(())
1075    }
1076
1077    #[test]
1078    fn avro_3894_take_aliases_into_account_when_serializing() -> TestResult {
1079        const SCHEMA: &str = r#"
1080  {
1081      "type": "record",
1082      "name": "Conference",
1083      "fields": [
1084          {"type": "string", "name": "name"},
1085          {"type": ["null", "long"], "name": "date", "aliases" : [ "time2", "time" ]}
1086      ]
1087  }"#;
1088
1089        #[derive(Debug, PartialEq, Eq, Clone, Serialize)]
1090        pub struct Conference {
1091            pub name: String,
1092            pub time: Option<i64>,
1093        }
1094
1095        let conf = Conference {
1096            name: "RustConf".to_string(),
1097            time: Some(1234567890),
1098        };
1099
1100        let schema = Schema::parse_str(SCHEMA)?;
1101        let mut writer = Writer::new(&schema, Vec::new())?;
1102
1103        let bytes = writer.append_ser(conf)?;
1104
1105        assert_eq!(182, bytes);
1106
1107        Ok(())
1108    }
1109
1110    #[test]
1111    fn avro_4014_validation_returns_a_detailed_error() -> TestResult {
1112        const SCHEMA: &str = r#"
1113  {
1114      "type": "record",
1115      "name": "Conference",
1116      "fields": [
1117          {"type": "string", "name": "name"},
1118          {"type": ["null", "long"], "name": "date", "aliases" : [ "time2", "time" ]}
1119      ]
1120  }"#;
1121
1122        #[derive(Debug, PartialEq, Clone, Serialize)]
1123        pub struct Conference {
1124            pub name: String,
1125            pub time: Option<f64>, // wrong type: f64 instead of i64
1126        }
1127
1128        let conf = Conference {
1129            name: "RustConf".to_string(),
1130            time: Some(12345678.90),
1131        };
1132
1133        let schema = Schema::parse_str(SCHEMA)?;
1134        let mut writer = Writer::new(&schema, Vec::new())?;
1135
1136        match writer.append_ser(conf) {
1137            Ok(bytes) => panic!("Expected an error, but got {bytes} bytes written"),
1138            Err(e) => {
1139                assert_eq!(
1140                    e.to_string(),
1141                    r#"Failed to serialize field 'time' for record Record(RecordSchema { name: Name { name: "Conference", .. }, fields: [RecordField { name: "name", schema: String, .. }, RecordField { name: "date", aliases: ["time2", "time"], schema: Union(UnionSchema { schemas: [Null, Long] }), .. }], .. }): Failed to serialize value of type f64 using schema Union(UnionSchema { schemas: [Null, Long] }): 12345678.9. Cause: Cannot find a Double schema in [Null, Long]"#
1142                );
1143            }
1144        }
1145        Ok(())
1146    }
1147
1148    #[test]
1149    fn avro_4063_flush_applies_to_inner_writer() -> TestResult {
1150        const SCHEMA: &str = r#"
1151        {
1152            "type": "record",
1153            "name": "ExampleSchema",
1154            "fields": [
1155                {"name": "exampleField", "type": "string"}
1156            ]
1157        }
1158        "#;
1159
1160        #[derive(Clone, Default)]
1161        struct TestBuffer(Rc<RefCell<Vec<u8>>>);
1162
1163        impl TestBuffer {
1164            fn len(&self) -> usize {
1165                self.0.borrow().len()
1166            }
1167        }
1168
1169        impl Write for TestBuffer {
1170            fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
1171                self.0.borrow_mut().write(buf)
1172            }
1173
1174            fn flush(&mut self) -> std::io::Result<()> {
1175                Ok(())
1176            }
1177        }
1178
1179        let shared_buffer = TestBuffer::default();
1180
1181        let buffered_writer = std::io::BufWriter::new(shared_buffer.clone());
1182
1183        let schema = Schema::parse_str(SCHEMA)?;
1184
1185        let mut writer = Writer::new(&schema, buffered_writer)?;
1186
1187        let mut record = Record::new(writer.schema()).unwrap();
1188        record.put("exampleField", "value");
1189
1190        writer.append_value(record)?;
1191        writer.flush()?;
1192
1193        assert_eq!(
1194            shared_buffer.len(),
1195            151,
1196            "the test buffer was not fully written to after Writer::flush was called"
1197        );
1198
1199        Ok(())
1200    }
1201
1202    #[test]
1203    fn avro_rs_310_append_unvalidated_value() -> TestResult {
1204        let schema = Schema::String;
1205        let value = Value::Int(1);
1206
1207        let mut writer = Writer::new(&schema, Vec::new())?;
1208        writer.unvalidated_append_value_ref(&value)?;
1209        writer.unvalidated_append_value(value)?;
1210        let buffer = writer.into_inner()?;
1211
1212        // Check the last two bytes for the sync marker
1213        assert_eq!(&buffer[buffer.len() - 18..buffer.len() - 16], &[2, 2]);
1214
1215        let mut writer = Writer::new(&schema, Vec::new())?;
1216        let value = Value::Int(1);
1217        let err = writer.append_value_ref(&value).unwrap_err();
1218        assert_eq!(
1219            err.to_string(),
1220            "Value Int(1) does not match schema String: Reason: Unsupported value-schema combination! Value: Int(1), schema: String"
1221        );
1222        let err = writer.append_value(value).unwrap_err();
1223        assert_eq!(
1224            err.to_string(),
1225            "Value Int(1) does not match schema String: Reason: Unsupported value-schema combination! Value: Int(1), schema: String"
1226        );
1227
1228        Ok(())
1229    }
1230
1231    #[test]
1232    fn avro_rs_469_reset_writer() -> TestResult {
1233        let schema = Schema::Boolean;
1234        let values = [true, false, true, false];
1235        let mut writer = Writer::new(&schema, Vec::new())?;
1236
1237        for value in values {
1238            writer.append_value(value)?;
1239        }
1240
1241        writer.flush()?;
1242        let first_buffer = writer.get_ref().clone();
1243
1244        writer.reset();
1245        assert_eq!(writer.get_ref().len(), 0);
1246
1247        for value in values {
1248            writer.append_value(value)?;
1249        }
1250
1251        writer.flush()?;
1252        let second_buffer = writer.get_ref().clone();
1253        assert_eq!(first_buffer.len(), second_buffer.len());
1254        // File structure:
1255        // Header: ? bytes
1256        // Sync marker: 16 bytes
1257        // Data: 6 bytes
1258        // Sync marker: 16 bytes
1259        let len = first_buffer.len();
1260        let header = len - 16 - 6 - 16;
1261        let data = header + 16;
1262        assert_eq!(
1263            first_buffer[..header],
1264            second_buffer[..header],
1265            "Written header must be the same, excluding sync marker"
1266        );
1267        assert_ne!(
1268            first_buffer[header..data],
1269            second_buffer[header..data],
1270            "Sync markers should be different"
1271        );
1272        assert_eq!(
1273            first_buffer[data..data + 6],
1274            second_buffer[data..data + 6],
1275            "Written data must be the same"
1276        );
1277        assert_ne!(
1278            first_buffer[len - 16..],
1279            second_buffer[len - 16..],
1280            "Sync markers should be different"
1281        );
1282
1283        Ok(())
1284    }
1285}