apache_avro/
writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Logic handling writing in Avro format at user level.
19use crate::{
20    AvroResult, Codec, Error,
21    encode::{encode, encode_internal, encode_to_vec},
22    error::Details,
23    headers::{HeaderBuilder, RabinFingerprintHeader},
24    schema::{NamesRef, ResolvedOwnedSchema, ResolvedSchema, Schema},
25    serde::{AvroSchema, ser_schema::SchemaAwareWriteSerializer},
26    types::Value,
27};
28use serde::Serialize;
29use std::{
30    collections::HashMap, io::Write, marker::PhantomData, mem::ManuallyDrop, ops::RangeInclusive,
31};
32
33const DEFAULT_BLOCK_SIZE: usize = 16000;
34const AVRO_OBJECT_HEADER: &[u8] = b"Obj\x01";
35
36/// Main interface for writing Avro formatted values.
37///
38/// It is critical to call flush before `Writer<W>` is dropped. Though dropping will attempt to flush
39/// the contents of the buffer, any errors that happen in the process of dropping will be ignored.
40/// Calling flush ensures that the buffer is empty and thus dropping will not even attempt file operations.
41pub struct Writer<'a, W: Write> {
42    schema: &'a Schema,
43    writer: W,
44    resolved_schema: ResolvedSchema<'a>,
45    codec: Codec,
46    block_size: usize,
47    buffer: Vec<u8>,
48    num_values: usize,
49    marker: [u8; 16],
50    has_header: bool,
51    user_metadata: HashMap<String, Value>,
52}
53
54#[bon::bon]
55impl<'a, W: Write> Writer<'a, W> {
56    #[builder]
57    pub fn builder(
58        schema: &'a Schema,
59        schemata: Option<Vec<&'a Schema>>,
60        writer: W,
61        #[builder(default = Codec::Null)] codec: Codec,
62        #[builder(default = DEFAULT_BLOCK_SIZE)] block_size: usize,
63        #[builder(default = generate_sync_marker())] marker: [u8; 16],
64        /// Has the header already been written.
65        ///
66        /// To disable writing the header, this can be set to `true`.
67        #[builder(default = false)]
68        has_header: bool,
69        #[builder(default)] user_metadata: HashMap<String, Value>,
70    ) -> AvroResult<Self> {
71        let resolved_schema = if let Some(schemata) = schemata {
72            ResolvedSchema::try_from(schemata)?
73        } else {
74            ResolvedSchema::try_from(schema)?
75        };
76        Ok(Self {
77            schema,
78            writer,
79            resolved_schema,
80            codec,
81            block_size,
82            buffer: Vec::with_capacity(block_size),
83            num_values: 0,
84            marker,
85            has_header,
86            user_metadata,
87        })
88    }
89}
90
91impl<'a, W: Write> Writer<'a, W> {
92    /// Creates a `Writer` given a `Schema` and something implementing the `io::Write` trait to write
93    /// to.
94    /// No compression `Codec` will be used.
95    pub fn new(schema: &'a Schema, writer: W) -> AvroResult<Self> {
96        Writer::with_codec(schema, writer, Codec::Null)
97    }
98
99    /// Creates a `Writer` with a specific `Codec` given a `Schema` and something implementing the
100    /// `io::Write` trait to write to.
101    pub fn with_codec(schema: &'a Schema, writer: W, codec: Codec) -> AvroResult<Self> {
102        Self::builder()
103            .schema(schema)
104            .writer(writer)
105            .codec(codec)
106            .build()
107    }
108
109    /// Creates a `Writer` with a specific `Codec` given a `Schema` and something implementing the
110    /// `io::Write` trait to write to.
111    /// If the `schema` is incomplete, i.e. contains `Schema::Ref`s then all dependencies must
112    /// be provided in `schemata`.
113    pub fn with_schemata(
114        schema: &'a Schema,
115        schemata: Vec<&'a Schema>,
116        writer: W,
117        codec: Codec,
118    ) -> AvroResult<Self> {
119        Self::builder()
120            .schema(schema)
121            .schemata(schemata)
122            .writer(writer)
123            .codec(codec)
124            .build()
125    }
126
127    /// Creates a `Writer` that will append values to already populated
128    /// `std::io::Write` using the provided `marker`
129    /// No compression `Codec` will be used.
130    pub fn append_to(schema: &'a Schema, writer: W, marker: [u8; 16]) -> AvroResult<Self> {
131        Writer::append_to_with_codec(schema, writer, Codec::Null, marker)
132    }
133
134    /// Creates a `Writer` that will append values to already populated
135    /// `std::io::Write` using the provided `marker`
136    pub fn append_to_with_codec(
137        schema: &'a Schema,
138        writer: W,
139        codec: Codec,
140        marker: [u8; 16],
141    ) -> AvroResult<Self> {
142        Self::builder()
143            .schema(schema)
144            .writer(writer)
145            .codec(codec)
146            .marker(marker)
147            .has_header(true)
148            .build()
149    }
150
151    /// Creates a `Writer` that will append values to already populated
152    /// `std::io::Write` using the provided `marker`
153    pub fn append_to_with_codec_schemata(
154        schema: &'a Schema,
155        schemata: Vec<&'a Schema>,
156        writer: W,
157        codec: Codec,
158        marker: [u8; 16],
159    ) -> AvroResult<Self> {
160        Self::builder()
161            .schema(schema)
162            .schemata(schemata)
163            .writer(writer)
164            .codec(codec)
165            .marker(marker)
166            .has_header(true)
167            .build()
168    }
169
170    /// Get a reference to the `Schema` associated to a `Writer`.
171    pub fn schema(&self) -> &'a Schema {
172        self.schema
173    }
174
175    /// Deprecated. Use [`Writer::append_value`] instead.
176    #[deprecated(since = "0.22.0", note = "Use `Writer::append_value` instead")]
177    pub fn append<T: Into<Value>>(&mut self, value: T) -> AvroResult<usize> {
178        self.append_value(value)
179    }
180
181    /// Append a value to the `Writer`, also performs schema validation.
182    ///
183    /// Returns the number of bytes written (it might be 0, see below).
184    ///
185    /// **NOTE**: This function is not guaranteed to perform any actual write, since it relies on
186    /// internal buffering for performance reasons. If you want to be sure the value has been
187    /// written, then call [`flush`](Writer::flush).
188    pub fn append_value<T: Into<Value>>(&mut self, value: T) -> AvroResult<usize> {
189        let avro = value.into();
190        self.append_value_ref(&avro)
191    }
192
193    /// Append a compatible value to a `Writer`, also performs schema validation.
194    ///
195    /// Returns the number of bytes written (it might be 0, see below).
196    ///
197    /// **NOTE**: This function is not guaranteed to perform any actual write, since it relies on
198    /// internal buffering for performance reasons. If you want to be sure the value has been
199    /// written, then call [`flush`](Writer::flush).
200    pub fn append_value_ref(&mut self, value: &Value) -> AvroResult<usize> {
201        if let Some(reason) = value.validate_internal(
202            self.schema,
203            self.resolved_schema.get_names(),
204            &self.schema.namespace(),
205        ) {
206            return Err(Details::ValidationWithReason {
207                value: value.clone(),
208                schema: self.schema.clone(),
209                reason,
210            }
211            .into());
212        }
213        self.unvalidated_append_value_ref(value)
214    }
215
216    /// Append a compatible value to a `Writer`.
217    ///
218    /// This function does **not** validate that the provided value matches the schema. If it does
219    /// not match, the file will contain corrupt data. Use [`Writer::append_value`] to have the
220    /// value validated during write or use [`Value::validate`] to validate the value.
221    ///
222    /// Returns the number of bytes written (it might be 0, see below).
223    ///
224    /// **NOTE**: This function is not guaranteed to perform any actual write, since it relies on
225    /// internal buffering for performance reasons. If you want to be sure the value has been
226    /// written, then call [`flush`](Writer::flush).
227    pub fn unvalidated_append_value<T: Into<Value>>(&mut self, value: T) -> AvroResult<usize> {
228        let value = value.into();
229        self.unvalidated_append_value_ref(&value)
230    }
231
232    /// Append a compatible value to a `Writer`.
233    ///
234    /// This function does **not** validate that the provided value matches the schema. If it does
235    /// not match, the file will contain corrupt data. Use [`Writer::append_value_ref`] to have the
236    /// value validated during write or use [`Value::validate`] to validate the value.
237    ///
238    /// Returns the number of bytes written (it might be 0, see below).
239    ///
240    /// **NOTE**: This function is not guaranteed to perform any actual write, since it relies on
241    /// internal buffering for performance reasons. If you want to be sure the value has been
242    /// written, then call [`flush`](Writer::flush).
243    pub fn unvalidated_append_value_ref(&mut self, value: &Value) -> AvroResult<usize> {
244        let n = self.maybe_write_header()?;
245        encode_internal(
246            value,
247            self.schema,
248            self.resolved_schema.get_names(),
249            &self.schema.namespace(),
250            &mut self.buffer,
251        )?;
252
253        self.num_values += 1;
254
255        if self.buffer.len() >= self.block_size {
256            return self.flush().map(|b| b + n);
257        }
258
259        Ok(n)
260    }
261
262    /// Append anything implementing the `Serialize` trait to a `Writer` for
263    /// [`serde`](https://docs.serde.rs/serde/index.html) compatibility, also performing schema
264    /// validation.
265    ///
266    /// Returns the number of bytes written.
267    ///
268    /// **NOTE**: This function is not guaranteed to perform any actual write, since it relies on
269    /// internal buffering for performance reasons. If you want to be sure the value has been
270    /// written, then call [`flush`](Writer::flush).
271    pub fn append_ser<S: Serialize>(&mut self, value: S) -> AvroResult<usize> {
272        let n = self.maybe_write_header()?;
273
274        let mut serializer = SchemaAwareWriteSerializer::new(
275            &mut self.buffer,
276            self.schema,
277            self.resolved_schema.get_names(),
278            None,
279        );
280        value.serialize(&mut serializer)?;
281        self.num_values += 1;
282
283        if self.buffer.len() >= self.block_size {
284            return self.flush().map(|b| b + n);
285        }
286
287        Ok(n)
288    }
289
290    /// Extend a `Writer` with an `Iterator` of values, also performs schema validation.
291    ///
292    /// Returns the number of bytes written.
293    ///
294    /// **NOTE**: This function forces the written data to be flushed (an implicit
295    /// call to [`flush`](Writer::flush) is performed).
296    pub fn extend<I, T: Into<Value>>(&mut self, values: I) -> AvroResult<usize>
297    where
298        I: IntoIterator<Item = T>,
299    {
300        /*
301        https://github.com/rust-lang/rfcs/issues/811 :(
302        let mut stream = values
303            .filter_map(|value| value.serialize(&mut self.serializer).ok())
304            .map(|value| value.encode(self.schema))
305            .collect::<Option<Vec<_>>>()
306            .ok_or_else(|| err_msg("value does not match given schema"))?
307            .into_iter()
308            .fold(Vec::new(), |mut acc, stream| {
309                num_values += 1;
310                acc.extend(stream); acc
311            });
312        */
313
314        let mut num_bytes = 0;
315        for value in values {
316            num_bytes += self.append_value(value)?;
317        }
318        num_bytes += self.flush()?;
319
320        Ok(num_bytes)
321    }
322
323    /// Extend a `Writer` with an `Iterator` of anything implementing the `Serialize` trait for
324    /// [`serde`](https://docs.serde.rs/serde/index.html) compatibility, also performing schema
325    /// validation.
326    ///
327    /// Returns the number of bytes written.
328    ///
329    /// **NOTE**: This function forces the written data to be flushed (an implicit
330    /// call to [`flush`](Writer::flush) is performed).
331    pub fn extend_ser<I, T: Serialize>(&mut self, values: I) -> AvroResult<usize>
332    where
333        I: IntoIterator<Item = T>,
334    {
335        /*
336        https://github.com/rust-lang/rfcs/issues/811 :(
337        let mut stream = values
338            .filter_map(|value| value.serialize(&mut self.serializer).ok())
339            .map(|value| value.encode(self.schema))
340            .collect::<Option<Vec<_>>>()
341            .ok_or_else(|| err_msg("value does not match given schema"))?
342            .into_iter()
343            .fold(Vec::new(), |mut acc, stream| {
344                num_values += 1;
345                acc.extend(stream); acc
346            });
347        */
348
349        let mut num_bytes = 0;
350        for value in values {
351            num_bytes += self.append_ser(value)?;
352        }
353        num_bytes += self.flush()?;
354
355        Ok(num_bytes)
356    }
357
358    /// Extend a `Writer` by appending each `Value` from a slice, while also performing schema
359    /// validation on each value appended.
360    ///
361    /// Returns the number of bytes written.
362    ///
363    /// **NOTE**: This function forces the written data to be flushed (an implicit
364    /// call to [`flush`](Writer::flush) is performed).
365    pub fn extend_from_slice(&mut self, values: &[Value]) -> AvroResult<usize> {
366        let mut num_bytes = 0;
367        for value in values {
368            num_bytes += self.append_value_ref(value)?;
369        }
370        num_bytes += self.flush()?;
371
372        Ok(num_bytes)
373    }
374
375    /// Flush the content to the inner `Writer`.
376    ///
377    /// Call this function to make sure all the content has been written before releasing the `Writer`.
378    /// This will also write the header if it wasn't written yet and hasn't been disabled using
379    /// [`WriterBuilder::has_header`].
380    ///
381    /// Returns the number of bytes written.
382    pub fn flush(&mut self) -> AvroResult<usize> {
383        let mut num_bytes = self.maybe_write_header()?;
384        if self.num_values == 0 {
385            return Ok(num_bytes);
386        }
387
388        self.codec.compress(&mut self.buffer)?;
389
390        let num_values = self.num_values;
391        let stream_len = self.buffer.len();
392
393        num_bytes += self.append_raw(&num_values.try_into()?, &Schema::Long)?
394            + self.append_raw(&stream_len.try_into()?, &Schema::Long)?
395            + self
396                .writer
397                .write(self.buffer.as_ref())
398                .map_err(Details::WriteBytes)?
399            + self.append_marker()?;
400
401        self.buffer.clear();
402        self.num_values = 0;
403
404        self.writer.flush().map_err(Details::FlushWriter)?;
405
406        Ok(num_bytes)
407    }
408
409    /// Return what the `Writer` is writing to, consuming the `Writer` itself.
410    ///
411    /// **NOTE**: This function forces the written data to be flushed (an implicit
412    /// call to [`flush`](Writer::flush) is performed).
413    pub fn into_inner(mut self) -> AvroResult<W> {
414        self.maybe_write_header()?;
415        self.flush()?;
416
417        let mut this = ManuallyDrop::new(self);
418
419        // Extract every member that is not Copy and therefore should be dropped
420        let _buffer = std::mem::take(&mut this.buffer);
421        let _user_metadata = std::mem::take(&mut this.user_metadata);
422        // SAFETY: resolved schema is not accessed after this and won't be dropped because of ManuallyDrop
423        unsafe { std::ptr::drop_in_place(&mut this.resolved_schema) };
424
425        // SAFETY: double-drops are prevented by putting `this` in a ManuallyDrop that is never dropped
426        let writer = unsafe { std::ptr::read(&this.writer) };
427
428        Ok(writer)
429    }
430
431    /// Gets a reference to the underlying writer.
432    ///
433    /// **NOTE**: There is likely data still in the buffer. To have all the data
434    /// in the writer call [`flush`](Writer::flush) first.
435    pub fn get_ref(&self) -> &W {
436        &self.writer
437    }
438
439    /// Gets a mutable reference to the underlying writer.
440    ///
441    /// It is inadvisable to directly write to the underlying writer.
442    ///
443    /// **NOTE**: There is likely data still in the buffer. To have all the data
444    /// in the writer call [`flush`](Writer::flush) first.
445    pub fn get_mut(&mut self) -> &mut W {
446        &mut self.writer
447    }
448
449    /// Generate and append synchronization marker to the payload.
450    fn append_marker(&mut self) -> AvroResult<usize> {
451        // using .writer.write directly to avoid mutable borrow of self
452        // with ref borrowing of self.marker
453        self.writer
454            .write(&self.marker)
455            .map_err(|e| Details::WriteMarker(e).into())
456    }
457
458    /// Append a raw Avro Value to the payload avoiding to encode it again.
459    fn append_raw(&mut self, value: &Value, schema: &Schema) -> AvroResult<usize> {
460        self.append_bytes(encode_to_vec(value, schema)?.as_ref())
461    }
462
463    /// Append pure bytes to the payload.
464    fn append_bytes(&mut self, bytes: &[u8]) -> AvroResult<usize> {
465        self.writer
466            .write(bytes)
467            .map_err(|e| Details::WriteBytes(e).into())
468    }
469
470    /// Adds custom metadata to the file.
471    /// This method could be used only before adding the first record to the writer.
472    pub fn add_user_metadata<T: AsRef<[u8]>>(&mut self, key: String, value: T) -> AvroResult<()> {
473        if !self.has_header {
474            if key.starts_with("avro.") {
475                return Err(Details::InvalidMetadataKey(key).into());
476            }
477            self.user_metadata
478                .insert(key, Value::Bytes(value.as_ref().to_vec()));
479            Ok(())
480        } else {
481            Err(Details::FileHeaderAlreadyWritten.into())
482        }
483    }
484
485    /// Create an Avro header based on schema, codec and sync marker.
486    fn header(&self) -> Result<Vec<u8>, Error> {
487        let schema_bytes = serde_json::to_string(self.schema)
488            .map_err(Details::ConvertJsonToString)?
489            .into_bytes();
490
491        let mut metadata = HashMap::with_capacity(2);
492        metadata.insert("avro.schema", Value::Bytes(schema_bytes));
493        if self.codec != Codec::Null {
494            metadata.insert("avro.codec", self.codec.into());
495        }
496        match self.codec {
497            #[cfg(feature = "bzip")]
498            Codec::Bzip2(settings) => {
499                metadata.insert(
500                    "avro.codec.compression_level",
501                    Value::Bytes(vec![settings.compression_level]),
502                );
503            }
504            #[cfg(feature = "xz")]
505            Codec::Xz(settings) => {
506                metadata.insert(
507                    "avro.codec.compression_level",
508                    Value::Bytes(vec![settings.compression_level]),
509                );
510            }
511            #[cfg(feature = "zstandard")]
512            Codec::Zstandard(settings) => {
513                metadata.insert(
514                    "avro.codec.compression_level",
515                    Value::Bytes(vec![settings.compression_level]),
516                );
517            }
518            _ => {}
519        }
520
521        for (k, v) in &self.user_metadata {
522            metadata.insert(k.as_str(), v.clone());
523        }
524
525        let mut header = Vec::new();
526        header.extend_from_slice(AVRO_OBJECT_HEADER);
527        encode(
528            &metadata.into(),
529            &Schema::map(Schema::Bytes).build(),
530            &mut header,
531        )?;
532        header.extend_from_slice(&self.marker);
533
534        Ok(header)
535    }
536
537    fn maybe_write_header(&mut self) -> AvroResult<usize> {
538        if !self.has_header {
539            let header = self.header()?;
540            let n = self.append_bytes(header.as_ref())?;
541            self.has_header = true;
542            Ok(n)
543        } else {
544            Ok(0)
545        }
546    }
547}
548
549/// A buffer that can be cleared.
550pub trait Clearable {
551    /// Clear the buffer.
552    fn clear(&mut self);
553}
554
555impl Clearable for Vec<u8> {
556    fn clear(&mut self) {
557        Vec::clear(self);
558    }
559}
560
561impl<'a, W: Clearable + Write> Writer<'a, W> {
562    /// Reset the writer.
563    ///
564    /// This will clear the underlying writer, the internal buffer, and the user metadata.
565    /// It will also generate a new sync marker.
566    ///
567    /// # Example
568    /// ```
569    /// # use apache_avro::{Writer, Schema, Error};
570    /// # let schema = Schema::Boolean;
571    /// # let values = [true, false];
572    /// # fn send(_: &Vec<u8>) {}
573    /// let mut writer = Writer::new(&schema, Vec::new())?;
574    ///
575    /// // Write some values
576    /// for value in values {
577    ///     writer.append_value(value)?;
578    /// }
579    ///
580    /// // Flush the buffer and only then do something with buffer
581    /// writer.flush()?;
582    /// send(writer.get_ref());
583    ///
584    /// // Reset the writer
585    /// writer.reset();
586    ///
587    /// // Write some values again
588    /// for value in values {
589    ///     writer.append_value(value)?;
590    /// }
591    ///
592    /// # Ok::<(), Error>(())
593    /// ```
594    ///
595    /// # Warning
596    /// Any data that has been appended but not yet flushed will be silently
597    /// discarded. Call [`flush`](Writer::flush) before `reset()` if you need
598    /// to preserve in-flight records.
599    pub fn reset(&mut self) {
600        self.buffer.clear();
601        self.writer.clear();
602        self.has_header = false;
603        self.num_values = 0;
604        self.user_metadata.clear();
605        self.marker = generate_sync_marker();
606    }
607}
608
609impl<W: Write> Drop for Writer<'_, W> {
610    /// Drop the writer, will try to flush ignoring any errors.
611    fn drop(&mut self) {
612        let _ = self.maybe_write_header();
613        let _ = self.flush();
614    }
615}
616
617/// Encode a value into raw Avro data, also performs schema validation.
618///
619/// This is an internal function which gets the bytes buffer where to write as parameter instead of
620/// creating a new one like `to_avro_datum`.
621fn write_avro_datum<T: Into<Value>, W: Write>(
622    schema: &Schema,
623    value: T,
624    writer: &mut W,
625) -> Result<(), Error> {
626    let avro = value.into();
627    if !avro.validate(schema) {
628        return Err(Details::Validation.into());
629    }
630    encode(&avro, schema, writer)?;
631    Ok(())
632}
633
634fn write_avro_datum_schemata<T: Into<Value>>(
635    schema: &Schema,
636    schemata: Vec<&Schema>,
637    value: T,
638    buffer: &mut Vec<u8>,
639) -> AvroResult<usize> {
640    let avro = value.into();
641    let rs = ResolvedSchema::try_from(schemata)?;
642    let names = rs.get_names();
643    let enclosing_namespace = schema.namespace();
644    if let Some(_err) = avro.validate_internal(schema, names, &enclosing_namespace) {
645        return Err(Details::Validation.into());
646    }
647    encode_internal(&avro, schema, names, &enclosing_namespace, buffer)
648}
649
650/// Writer that encodes messages according to the single object encoding v1 spec
651/// Uses an API similar to the current File Writer
652/// Writes all object bytes at once, and drains internal buffer
653pub struct GenericSingleObjectWriter {
654    buffer: Vec<u8>,
655    resolved: ResolvedOwnedSchema,
656}
657
658impl GenericSingleObjectWriter {
659    pub fn new_with_capacity(
660        schema: &Schema,
661        initial_buffer_cap: usize,
662    ) -> AvroResult<GenericSingleObjectWriter> {
663        let header_builder = RabinFingerprintHeader::from_schema(schema);
664        Self::new_with_capacity_and_header_builder(schema, initial_buffer_cap, header_builder)
665    }
666
667    pub fn new_with_capacity_and_header_builder<HB: HeaderBuilder>(
668        schema: &Schema,
669        initial_buffer_cap: usize,
670        header_builder: HB,
671    ) -> AvroResult<GenericSingleObjectWriter> {
672        let mut buffer = Vec::with_capacity(initial_buffer_cap);
673        let header = header_builder.build_header();
674        buffer.extend_from_slice(&header);
675
676        Ok(GenericSingleObjectWriter {
677            buffer,
678            resolved: ResolvedOwnedSchema::try_from(schema.clone())?,
679        })
680    }
681
682    const HEADER_LENGTH_RANGE: RangeInclusive<usize> = 10_usize..=20_usize;
683
684    /// Write the referenced Value to the provided Write object. Returns a result with the number of bytes written including the header
685    pub fn write_value_ref<W: Write>(&mut self, v: &Value, writer: &mut W) -> AvroResult<usize> {
686        let original_length = self.buffer.len();
687        if !Self::HEADER_LENGTH_RANGE.contains(&original_length) {
688            Err(Details::IllegalSingleObjectWriterState.into())
689        } else {
690            write_value_ref_owned_resolved(&self.resolved, v, &mut self.buffer)?;
691            writer
692                .write_all(&self.buffer)
693                .map_err(Details::WriteBytes)?;
694            let len = self.buffer.len();
695            self.buffer.truncate(original_length);
696            Ok(len)
697        }
698    }
699
700    /// Write the Value to the provided Write object. Returns a result with the number of bytes written including the header
701    pub fn write_value<W: Write>(&mut self, v: Value, writer: &mut W) -> AvroResult<usize> {
702        self.write_value_ref(&v, writer)
703    }
704}
705
706/// Writer that encodes messages according to the single object encoding v1 spec
707pub struct SpecificSingleObjectWriter<T>
708where
709    T: AvroSchema,
710{
711    resolved: ResolvedOwnedSchema,
712    header: Vec<u8>,
713    _model: PhantomData<T>,
714}
715
716impl<T> SpecificSingleObjectWriter<T>
717where
718    T: AvroSchema,
719{
720    pub fn new() -> AvroResult<Self> {
721        let schema = T::get_schema();
722        let header = RabinFingerprintHeader::from_schema(&schema).build_header();
723        let resolved = ResolvedOwnedSchema::new(schema)?;
724        // We don't use Self::new_with_header_builder as that would mean calling T::get_schema() twice
725        Ok(Self {
726            resolved,
727            header,
728            _model: PhantomData,
729        })
730    }
731
732    pub fn new_with_header_builder(header_builder: impl HeaderBuilder) -> AvroResult<Self> {
733        let header = header_builder.build_header();
734        let resolved = ResolvedOwnedSchema::new(T::get_schema())?;
735        Ok(Self {
736            resolved,
737            header,
738            _model: PhantomData,
739        })
740    }
741
742    /// Deprecated. Use [`SpecificSingleObjectWriter::new`] instead.
743    #[deprecated(since = "0.22.0", note = "Use new() instead")]
744    pub fn with_capacity(_buffer_cap: usize) -> AvroResult<Self> {
745        Self::new()
746    }
747}
748
749impl<T> SpecificSingleObjectWriter<T>
750where
751    T: AvroSchema + Into<Value>,
752{
753    /// Write the value to the writer
754    ///
755    /// Returns the number of bytes written.
756    ///
757    /// Each call writes a complete single-object encoded message (header + data),
758    /// making each message independently decodable.
759    pub fn write_value<W: Write>(&self, data: T, writer: &mut W) -> AvroResult<usize> {
760        writer
761            .write_all(&self.header)
762            .map_err(Details::WriteBytes)?;
763        let value: Value = data.into();
764        let bytes = write_value_ref_owned_resolved(&self.resolved, &value, writer)?;
765        Ok(bytes + self.header.len())
766    }
767}
768
769impl<T> SpecificSingleObjectWriter<T>
770where
771    T: AvroSchema + Serialize,
772{
773    /// Write the object to the writer.
774    ///
775    /// Returns the number of bytes written.
776    ///
777    /// Each call writes a complete single-object encoded message (header + data),
778    /// making each message independently decodable.
779    pub fn write_ref<W: Write>(&self, data: &T, writer: &mut W) -> AvroResult<usize> {
780        writer
781            .write_all(&self.header)
782            .map_err(Details::WriteBytes)?;
783
784        let bytes = write_avro_datum_ref(
785            self.resolved.get_root_schema(),
786            self.resolved.get_names(),
787            data,
788            writer,
789        )?;
790
791        Ok(bytes + self.header.len())
792    }
793
794    /// Write the object to the writer.
795    ///
796    /// Returns the number of bytes written.
797    ///
798    /// Each call writes a complete single-object encoded message (header + data),
799    /// making each message independently decodable.
800    pub fn write<W: Write>(&self, data: T, writer: &mut W) -> AvroResult<usize> {
801        self.write_ref(&data, writer)
802    }
803}
804
805fn write_value_ref_owned_resolved<W: Write>(
806    resolved_schema: &ResolvedOwnedSchema,
807    value: &Value,
808    writer: &mut W,
809) -> AvroResult<usize> {
810    let root_schema = resolved_schema.get_root_schema();
811    if let Some(reason) = value.validate_internal(
812        root_schema,
813        resolved_schema.get_names(),
814        &root_schema.namespace(),
815    ) {
816        return Err(Details::ValidationWithReason {
817            value: value.clone(),
818            schema: root_schema.clone(),
819            reason,
820        }
821        .into());
822    }
823    encode_internal(
824        value,
825        root_schema,
826        resolved_schema.get_names(),
827        &root_schema.namespace(),
828        writer,
829    )
830}
831
832/// Encode a value into raw Avro data, also performs schema validation.
833///
834/// **NOTE**: This function has a quite small niche of usage and does NOT generate headers and sync
835/// markers; use [`Writer`] to be fully Avro-compatible if you don't know what
836/// you are doing, instead.
837pub fn to_avro_datum<T: Into<Value>>(schema: &Schema, value: T) -> AvroResult<Vec<u8>> {
838    let mut buffer = Vec::new();
839    write_avro_datum(schema, value, &mut buffer)?;
840    Ok(buffer)
841}
842
843/// Write the referenced [Serialize]able object to the provided [Write] object.
844///
845/// Returns a result with the number of bytes written.
846///
847/// **NOTE**: This function has a quite small niche of usage and does **NOT** generate headers and sync
848/// markers; use [`append_ser`](Writer::append_ser) to be fully Avro-compatible
849/// if you don't know what you are doing, instead.
850pub fn write_avro_datum_ref<T: Serialize, W: Write>(
851    schema: &Schema,
852    names: &NamesRef,
853    data: &T,
854    writer: &mut W,
855) -> AvroResult<usize> {
856    let mut serializer = SchemaAwareWriteSerializer::new(writer, schema, names, None);
857    data.serialize(&mut serializer)
858}
859
860/// Encode a value into raw Avro data, also performs schema validation.
861///
862/// If the provided `schema` is incomplete then its dependencies must be
863/// provided in `schemata`
864pub fn to_avro_datum_schemata<T: Into<Value>>(
865    schema: &Schema,
866    schemata: Vec<&Schema>,
867    value: T,
868) -> AvroResult<Vec<u8>> {
869    let mut buffer = Vec::new();
870    write_avro_datum_schemata(schema, schemata, value, &mut buffer)?;
871    Ok(buffer)
872}
873
874#[cfg(not(target_arch = "wasm32"))]
875fn generate_sync_marker() -> [u8; 16] {
876    rand::random()
877}
878
879#[cfg(target_arch = "wasm32")]
880fn generate_sync_marker() -> [u8; 16] {
881    let mut marker = [0_u8; 16];
882    std::iter::repeat_with(quad_rand::rand)
883        .take(4)
884        .flat_map(|i| i.to_be_bytes())
885        .enumerate()
886        .for_each(|(i, n)| marker[i] = n);
887    marker
888}
889
890#[cfg(test)]
891mod tests {
892    use std::{cell::RefCell, rc::Rc};
893
894    use super::*;
895    use crate::{
896        Reader,
897        decimal::Decimal,
898        duration::{Days, Duration, Millis, Months},
899        headers::GlueSchemaUuidHeader,
900        rabin::Rabin,
901        schema::{DecimalSchema, FixedSchema, Name},
902        types::Record,
903        util::zig_i64,
904    };
905    use pretty_assertions::assert_eq;
906    use serde::{Deserialize, Serialize};
907    use uuid::Uuid;
908
909    use crate::schema::InnerDecimalSchema;
910    use crate::{codec::DeflateSettings, error::Details};
911    use apache_avro_test_helper::TestResult;
912
913    const AVRO_OBJECT_HEADER_LEN: usize = AVRO_OBJECT_HEADER.len();
914
915    const SCHEMA: &str = r#"
916    {
917      "type": "record",
918      "name": "test",
919      "fields": [
920        {
921          "name": "a",
922          "type": "long",
923          "default": 42
924        },
925        {
926          "name": "b",
927          "type": "string"
928        }
929      ]
930    }
931    "#;
932
933    const UNION_SCHEMA: &str = r#"["null", "long"]"#;
934
935    #[test]
936    fn test_to_avro_datum() -> TestResult {
937        let schema = Schema::parse_str(SCHEMA)?;
938        let mut record = Record::new(&schema).unwrap();
939        record.put("a", 27i64);
940        record.put("b", "foo");
941
942        let mut expected = Vec::new();
943        zig_i64(27, &mut expected)?;
944        zig_i64(3, &mut expected)?;
945        expected.extend([b'f', b'o', b'o']);
946
947        assert_eq!(to_avro_datum(&schema, record)?, expected);
948
949        Ok(())
950    }
951
952    #[test]
953    fn avro_rs_193_write_avro_datum_ref() -> TestResult {
954        #[derive(Serialize)]
955        struct TestStruct {
956            a: i64,
957            b: String,
958        }
959
960        let schema = Schema::parse_str(SCHEMA)?;
961        let mut writer: Vec<u8> = Vec::new();
962        let data = TestStruct {
963            a: 27,
964            b: "foo".to_string(),
965        };
966
967        let mut expected = Vec::new();
968        zig_i64(27, &mut expected)?;
969        zig_i64(3, &mut expected)?;
970        expected.extend([b'f', b'o', b'o']);
971
972        let bytes = write_avro_datum_ref(&schema, &HashMap::new(), &data, &mut writer)?;
973
974        assert_eq!(bytes, expected.len());
975        assert_eq!(writer, expected);
976
977        Ok(())
978    }
979
980    #[test]
981    fn avro_rs_220_flush_write_header() -> TestResult {
982        let schema = Schema::parse_str(SCHEMA)?;
983
984        // By default flush should write the header even if nothing was added yet
985        let mut writer = Writer::new(&schema, Vec::new())?;
986        writer.flush()?;
987        let result = writer.into_inner()?;
988        assert_eq!(result.len(), 147);
989
990        // Unless the user indicates via the builder that the header has already been written
991        let mut writer = Writer::builder()
992            .has_header(true)
993            .schema(&schema)
994            .writer(Vec::new())
995            .build()?;
996        writer.flush()?;
997        let result = writer.into_inner()?;
998        assert_eq!(result.len(), 0);
999
1000        Ok(())
1001    }
1002
1003    #[test]
1004    fn test_union_not_null() -> TestResult {
1005        let schema = Schema::parse_str(UNION_SCHEMA)?;
1006        let union = Value::Union(1, Box::new(Value::Long(3)));
1007
1008        let mut expected = Vec::new();
1009        zig_i64(1, &mut expected)?;
1010        zig_i64(3, &mut expected)?;
1011
1012        assert_eq!(to_avro_datum(&schema, union)?, expected);
1013
1014        Ok(())
1015    }
1016
1017    #[test]
1018    fn test_union_null() -> TestResult {
1019        let schema = Schema::parse_str(UNION_SCHEMA)?;
1020        let union = Value::Union(0, Box::new(Value::Null));
1021
1022        let mut expected = Vec::new();
1023        zig_i64(0, &mut expected)?;
1024
1025        assert_eq!(to_avro_datum(&schema, union)?, expected);
1026
1027        Ok(())
1028    }
1029
1030    fn logical_type_test<T: Into<Value> + Clone>(
1031        schema_str: &'static str,
1032
1033        expected_schema: &Schema,
1034        value: Value,
1035
1036        raw_schema: &Schema,
1037        raw_value: T,
1038    ) -> TestResult {
1039        let schema = Schema::parse_str(schema_str)?;
1040        assert_eq!(&schema, expected_schema);
1041        // The serialized format should be the same as the schema.
1042        let ser = to_avro_datum(&schema, value.clone())?;
1043        let raw_ser = to_avro_datum(raw_schema, raw_value)?;
1044        assert_eq!(ser, raw_ser);
1045
1046        // Should deserialize from the schema into the logical type.
1047        let mut r = ser.as_slice();
1048        let de = crate::from_avro_datum(&schema, &mut r, None)?;
1049        assert_eq!(de, value);
1050        Ok(())
1051    }
1052
1053    #[test]
1054    fn date() -> TestResult {
1055        logical_type_test(
1056            r#"{"type": "int", "logicalType": "date"}"#,
1057            &Schema::Date,
1058            Value::Date(1_i32),
1059            &Schema::Int,
1060            1_i32,
1061        )
1062    }
1063
1064    #[test]
1065    fn time_millis() -> TestResult {
1066        logical_type_test(
1067            r#"{"type": "int", "logicalType": "time-millis"}"#,
1068            &Schema::TimeMillis,
1069            Value::TimeMillis(1_i32),
1070            &Schema::Int,
1071            1_i32,
1072        )
1073    }
1074
1075    #[test]
1076    fn time_micros() -> TestResult {
1077        logical_type_test(
1078            r#"{"type": "long", "logicalType": "time-micros"}"#,
1079            &Schema::TimeMicros,
1080            Value::TimeMicros(1_i64),
1081            &Schema::Long,
1082            1_i64,
1083        )
1084    }
1085
1086    #[test]
1087    fn timestamp_millis() -> TestResult {
1088        logical_type_test(
1089            r#"{"type": "long", "logicalType": "timestamp-millis"}"#,
1090            &Schema::TimestampMillis,
1091            Value::TimestampMillis(1_i64),
1092            &Schema::Long,
1093            1_i64,
1094        )
1095    }
1096
1097    #[test]
1098    fn timestamp_micros() -> TestResult {
1099        logical_type_test(
1100            r#"{"type": "long", "logicalType": "timestamp-micros"}"#,
1101            &Schema::TimestampMicros,
1102            Value::TimestampMicros(1_i64),
1103            &Schema::Long,
1104            1_i64,
1105        )
1106    }
1107
1108    #[test]
1109    fn decimal_fixed() -> TestResult {
1110        let size = 30;
1111        let fixed = FixedSchema {
1112            name: Name::new("decimal")?,
1113            aliases: None,
1114            doc: None,
1115            size,
1116            attributes: Default::default(),
1117        };
1118        let inner = InnerDecimalSchema::Fixed(fixed.clone());
1119        let value = vec![0u8; size];
1120        logical_type_test(
1121            r#"{"type": {"type": "fixed", "size": 30, "name": "decimal"}, "logicalType": "decimal", "precision": 20, "scale": 5}"#,
1122            &Schema::Decimal(DecimalSchema {
1123                precision: 20,
1124                scale: 5,
1125                inner,
1126            }),
1127            Value::Decimal(Decimal::from(value.clone())),
1128            &Schema::Fixed(fixed),
1129            Value::Fixed(size, value),
1130        )
1131    }
1132
1133    #[test]
1134    fn decimal_bytes() -> TestResult {
1135        let value = vec![0u8; 10];
1136        logical_type_test(
1137            r#"{"type": "bytes", "logicalType": "decimal", "precision": 4, "scale": 3}"#,
1138            &Schema::Decimal(DecimalSchema {
1139                precision: 4,
1140                scale: 3,
1141                inner: InnerDecimalSchema::Bytes,
1142            }),
1143            Value::Decimal(Decimal::from(value.clone())),
1144            &Schema::Bytes,
1145            value,
1146        )
1147    }
1148
1149    #[test]
1150    fn duration() -> TestResult {
1151        let inner = Schema::Fixed(FixedSchema {
1152            name: Name::new("duration")?,
1153            aliases: None,
1154            doc: None,
1155            size: 12,
1156            attributes: Default::default(),
1157        });
1158        let value = Value::Duration(Duration::new(
1159            Months::new(256),
1160            Days::new(512),
1161            Millis::new(1024),
1162        ));
1163        logical_type_test(
1164            r#"{"type": {"type": "fixed", "name": "duration", "size": 12}, "logicalType": "duration"}"#,
1165            &Schema::Duration(FixedSchema {
1166                name: Name::try_from("duration").expect("Name is valid"),
1167                aliases: None,
1168                doc: None,
1169                size: 12,
1170                attributes: Default::default(),
1171            }),
1172            value,
1173            &inner,
1174            Value::Fixed(12, vec![0, 1, 0, 0, 0, 2, 0, 0, 0, 4, 0, 0]),
1175        )
1176    }
1177
1178    #[test]
1179    fn test_writer_append() -> TestResult {
1180        let schema = Schema::parse_str(SCHEMA)?;
1181        let mut writer = Writer::new(&schema, Vec::new())?;
1182
1183        let mut record = Record::new(&schema).unwrap();
1184        record.put("a", 27i64);
1185        record.put("b", "foo");
1186
1187        let n1 = writer.append_value(record.clone())?;
1188        let n2 = writer.append_value(record.clone())?;
1189        let n3 = writer.flush()?;
1190        let result = writer.into_inner()?;
1191
1192        assert_eq!(n1 + n2 + n3, result.len());
1193
1194        let mut data = Vec::new();
1195        zig_i64(27, &mut data)?;
1196        zig_i64(3, &mut data)?;
1197        data.extend(b"foo");
1198        data.extend(data.clone());
1199
1200        // starts with magic
1201        assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1202        // ends with data and sync marker
1203        let last_data_byte = result.len() - 16;
1204        assert_eq!(
1205            &result[last_data_byte - data.len()..last_data_byte],
1206            data.as_slice()
1207        );
1208
1209        Ok(())
1210    }
1211
1212    #[test]
1213    fn test_writer_extend() -> TestResult {
1214        let schema = Schema::parse_str(SCHEMA)?;
1215        let mut writer = Writer::new(&schema, Vec::new())?;
1216
1217        let mut record = Record::new(&schema).unwrap();
1218        record.put("a", 27i64);
1219        record.put("b", "foo");
1220        let record_copy = record.clone();
1221        let records = vec![record, record_copy];
1222
1223        let n1 = writer.extend(records)?;
1224        let n2 = writer.flush()?;
1225        let result = writer.into_inner()?;
1226
1227        assert_eq!(n1 + n2, result.len());
1228
1229        let mut data = Vec::new();
1230        zig_i64(27, &mut data)?;
1231        zig_i64(3, &mut data)?;
1232        data.extend(b"foo");
1233        data.extend(data.clone());
1234
1235        // starts with magic
1236        assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1237        // ends with data and sync marker
1238        let last_data_byte = result.len() - 16;
1239        assert_eq!(
1240            &result[last_data_byte - data.len()..last_data_byte],
1241            data.as_slice()
1242        );
1243
1244        Ok(())
1245    }
1246
1247    #[derive(Debug, Clone, Deserialize, Serialize)]
1248    struct TestSerdeSerialize {
1249        a: i64,
1250        b: String,
1251    }
1252
1253    #[test]
1254    fn test_writer_append_ser() -> TestResult {
1255        let schema = Schema::parse_str(SCHEMA)?;
1256        let mut writer = Writer::new(&schema, Vec::new())?;
1257
1258        let record = TestSerdeSerialize {
1259            a: 27,
1260            b: "foo".to_owned(),
1261        };
1262
1263        let n1 = writer.append_ser(record)?;
1264        let n2 = writer.flush()?;
1265        let result = writer.into_inner()?;
1266
1267        assert_eq!(n1 + n2, result.len());
1268
1269        let mut data = Vec::new();
1270        zig_i64(27, &mut data)?;
1271        zig_i64(3, &mut data)?;
1272        data.extend(b"foo");
1273
1274        // starts with magic
1275        assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1276        // ends with data and sync marker
1277        let last_data_byte = result.len() - 16;
1278        assert_eq!(
1279            &result[last_data_byte - data.len()..last_data_byte],
1280            data.as_slice()
1281        );
1282
1283        Ok(())
1284    }
1285
1286    #[test]
1287    fn test_writer_extend_ser() -> TestResult {
1288        let schema = Schema::parse_str(SCHEMA)?;
1289        let mut writer = Writer::new(&schema, Vec::new())?;
1290
1291        let record = TestSerdeSerialize {
1292            a: 27,
1293            b: "foo".to_owned(),
1294        };
1295        let record_copy = record.clone();
1296        let records = vec![record, record_copy];
1297
1298        let n1 = writer.extend_ser(records)?;
1299        let n2 = writer.flush()?;
1300        let result = writer.into_inner()?;
1301
1302        assert_eq!(n1 + n2, result.len());
1303
1304        let mut data = Vec::new();
1305        zig_i64(27, &mut data)?;
1306        zig_i64(3, &mut data)?;
1307        data.extend(b"foo");
1308        data.extend(data.clone());
1309
1310        // starts with magic
1311        assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1312        // ends with data and sync marker
1313        let last_data_byte = result.len() - 16;
1314        assert_eq!(
1315            &result[last_data_byte - data.len()..last_data_byte],
1316            data.as_slice()
1317        );
1318
1319        Ok(())
1320    }
1321
1322    fn make_writer_with_codec(schema: &Schema) -> AvroResult<Writer<'_, Vec<u8>>> {
1323        Writer::with_codec(
1324            schema,
1325            Vec::new(),
1326            Codec::Deflate(DeflateSettings::default()),
1327        )
1328    }
1329
1330    fn make_writer_with_builder(schema: &Schema) -> AvroResult<Writer<'_, Vec<u8>>> {
1331        Writer::builder()
1332            .writer(Vec::new())
1333            .schema(schema)
1334            .codec(Codec::Deflate(DeflateSettings::default()))
1335            .block_size(100)
1336            .build()
1337    }
1338
1339    fn check_writer(mut writer: Writer<'_, Vec<u8>>, schema: &Schema) -> TestResult {
1340        let mut record = Record::new(schema).unwrap();
1341        record.put("a", 27i64);
1342        record.put("b", "foo");
1343
1344        let n1 = writer.append_value(record.clone())?;
1345        let n2 = writer.append_value(record.clone())?;
1346        let n3 = writer.flush()?;
1347        let result = writer.into_inner()?;
1348
1349        assert_eq!(n1 + n2 + n3, result.len());
1350
1351        let mut data = Vec::new();
1352        zig_i64(27, &mut data)?;
1353        zig_i64(3, &mut data)?;
1354        data.extend(b"foo");
1355        data.extend(data.clone());
1356        Codec::Deflate(DeflateSettings::default()).compress(&mut data)?;
1357
1358        // starts with magic
1359        assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1360        // ends with data and sync marker
1361        let last_data_byte = result.len() - 16;
1362        assert_eq!(
1363            &result[last_data_byte - data.len()..last_data_byte],
1364            data.as_slice()
1365        );
1366
1367        Ok(())
1368    }
1369
1370    #[test]
1371    fn test_writer_with_codec() -> TestResult {
1372        let schema = Schema::parse_str(SCHEMA)?;
1373        let writer = make_writer_with_codec(&schema)?;
1374        check_writer(writer, &schema)
1375    }
1376
1377    #[test]
1378    fn test_writer_with_builder() -> TestResult {
1379        let schema = Schema::parse_str(SCHEMA)?;
1380        let writer = make_writer_with_builder(&schema)?;
1381        check_writer(writer, &schema)
1382    }
1383
1384    #[test]
1385    fn test_logical_writer() -> TestResult {
1386        const LOGICAL_TYPE_SCHEMA: &str = r#"
1387        {
1388          "type": "record",
1389          "name": "logical_type_test",
1390          "fields": [
1391            {
1392              "name": "a",
1393              "type": [
1394                "null",
1395                {
1396                  "type": "long",
1397                  "logicalType": "timestamp-micros"
1398                }
1399              ]
1400            }
1401          ]
1402        }
1403        "#;
1404        let codec = Codec::Deflate(DeflateSettings::default());
1405        let schema = Schema::parse_str(LOGICAL_TYPE_SCHEMA)?;
1406        let mut writer = Writer::builder()
1407            .schema(&schema)
1408            .codec(codec)
1409            .writer(Vec::new())
1410            .build()?;
1411
1412        let mut record1 = Record::new(&schema).unwrap();
1413        record1.put(
1414            "a",
1415            Value::Union(1, Box::new(Value::TimestampMicros(1234_i64))),
1416        );
1417
1418        let mut record2 = Record::new(&schema).unwrap();
1419        record2.put("a", Value::Union(0, Box::new(Value::Null)));
1420
1421        let n1 = writer.append_value(record1)?;
1422        let n2 = writer.append_value(record2)?;
1423        let n3 = writer.flush()?;
1424        let result = writer.into_inner()?;
1425
1426        assert_eq!(n1 + n2 + n3, result.len());
1427
1428        let mut data = Vec::new();
1429        // byte indicating not null
1430        zig_i64(1, &mut data)?;
1431        zig_i64(1234, &mut data)?;
1432
1433        // byte indicating null
1434        zig_i64(0, &mut data)?;
1435        codec.compress(&mut data)?;
1436
1437        // starts with magic
1438        assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1439        // ends with data and sync marker
1440        let last_data_byte = result.len() - 16;
1441        assert_eq!(
1442            &result[last_data_byte - data.len()..last_data_byte],
1443            data.as_slice()
1444        );
1445
1446        Ok(())
1447    }
1448
1449    #[test]
1450    fn test_avro_3405_writer_add_metadata_success() -> TestResult {
1451        let schema = Schema::parse_str(SCHEMA)?;
1452        let mut writer = Writer::new(&schema, Vec::new())?;
1453
1454        writer.add_user_metadata("stringKey".to_string(), String::from("stringValue"))?;
1455        writer.add_user_metadata("strKey".to_string(), "strValue")?;
1456        writer.add_user_metadata("bytesKey".to_string(), b"bytesValue")?;
1457        writer.add_user_metadata("vecKey".to_string(), vec![1, 2, 3])?;
1458
1459        let mut record = Record::new(&schema).unwrap();
1460        record.put("a", 27i64);
1461        record.put("b", "foo");
1462
1463        writer.append_value(record.clone())?;
1464        writer.append_value(record.clone())?;
1465        writer.flush()?;
1466        let result = writer.into_inner()?;
1467
1468        assert_eq!(result.len(), 244);
1469
1470        Ok(())
1471    }
1472
1473    #[test]
1474    fn test_avro_3881_metadata_empty_body() -> TestResult {
1475        let schema = Schema::parse_str(SCHEMA)?;
1476        let mut writer = Writer::new(&schema, Vec::new())?;
1477        writer.add_user_metadata("a".to_string(), "b")?;
1478        let result = writer.into_inner()?;
1479
1480        let reader = Reader::builder(&result[..])
1481            .reader_schema(&schema)
1482            .build()?;
1483        let mut expected = HashMap::new();
1484        expected.insert("a".to_string(), vec![b'b']);
1485        assert_eq!(reader.user_metadata(), &expected);
1486        assert_eq!(reader.into_iter().count(), 0);
1487
1488        Ok(())
1489    }
1490
1491    #[test]
1492    fn test_avro_3405_writer_add_metadata_failure() -> TestResult {
1493        let schema = Schema::parse_str(SCHEMA)?;
1494        let mut writer = Writer::new(&schema, Vec::new())?;
1495
1496        let mut record = Record::new(&schema).unwrap();
1497        record.put("a", 27i64);
1498        record.put("b", "foo");
1499        writer.append_value(record.clone())?;
1500
1501        match writer
1502            .add_user_metadata("stringKey".to_string(), String::from("value2"))
1503            .map_err(Error::into_details)
1504        {
1505            Err(e @ Details::FileHeaderAlreadyWritten) => {
1506                assert_eq!(e.to_string(), "The file metadata is already flushed.")
1507            }
1508            Err(e) => panic!("Unexpected error occurred while writing user metadata: {e:?}"),
1509            Ok(_) => panic!("Expected an error that metadata cannot be added after adding data"),
1510        }
1511
1512        Ok(())
1513    }
1514
1515    #[test]
1516    fn test_avro_3405_writer_add_metadata_reserved_prefix_failure() -> TestResult {
1517        let schema = Schema::parse_str(SCHEMA)?;
1518        let mut writer = Writer::new(&schema, Vec::new())?;
1519
1520        let key = "avro.stringKey".to_string();
1521        match writer
1522            .add_user_metadata(key.clone(), "value")
1523            .map_err(Error::into_details)
1524        {
1525            Err(ref e @ Details::InvalidMetadataKey(_)) => {
1526                assert_eq!(
1527                    e.to_string(),
1528                    format!(
1529                        "Metadata keys starting with 'avro.' are reserved for internal usage: {key}."
1530                    )
1531                )
1532            }
1533            Err(e) => panic!(
1534                "Unexpected error occurred while writing user metadata with reserved prefix ('avro.'): {e:?}"
1535            ),
1536            Ok(_) => {
1537                panic!("Expected an error that the metadata key cannot be prefixed with 'avro.'")
1538            }
1539        }
1540
1541        Ok(())
1542    }
1543
1544    #[test]
1545    fn test_avro_3405_writer_add_metadata_with_builder_api_success() -> TestResult {
1546        let schema = Schema::parse_str(SCHEMA)?;
1547
1548        let mut user_meta_data: HashMap<String, Value> = HashMap::new();
1549        user_meta_data.insert(
1550            "stringKey".to_string(),
1551            Value::String("stringValue".to_string()),
1552        );
1553        user_meta_data.insert("bytesKey".to_string(), Value::Bytes(b"bytesValue".to_vec()));
1554        user_meta_data.insert("vecKey".to_string(), Value::Bytes(vec![1, 2, 3]));
1555
1556        let writer: Writer<'_, Vec<u8>> = Writer::builder()
1557            .writer(Vec::new())
1558            .schema(&schema)
1559            .user_metadata(user_meta_data.clone())
1560            .build()?;
1561
1562        assert_eq!(writer.user_metadata, user_meta_data);
1563
1564        Ok(())
1565    }
1566
1567    #[derive(Serialize, Clone)]
1568    struct TestSingleObjectWriter {
1569        a: i64,
1570        b: f64,
1571        c: Vec<String>,
1572    }
1573
1574    impl AvroSchema for TestSingleObjectWriter {
1575        fn get_schema() -> Schema {
1576            let schema = r#"
1577            {
1578                "type":"record",
1579                "name":"TestSingleObjectWrtierSerialize",
1580                "fields":[
1581                    {
1582                        "name":"a",
1583                        "type":"long"
1584                    },
1585                    {
1586                        "name":"b",
1587                        "type":"double"
1588                    },
1589                    {
1590                        "name":"c",
1591                        "type":{
1592                            "type":"array",
1593                            "items":"string"
1594                        }
1595                    }
1596                ]
1597            }
1598            "#;
1599            Schema::parse_str(schema).unwrap()
1600        }
1601    }
1602
1603    impl From<TestSingleObjectWriter> for Value {
1604        fn from(obj: TestSingleObjectWriter) -> Value {
1605            Value::Record(vec![
1606                ("a".into(), obj.a.into()),
1607                ("b".into(), obj.b.into()),
1608                (
1609                    "c".into(),
1610                    Value::Array(obj.c.into_iter().map(|s| s.into()).collect()),
1611                ),
1612            ])
1613        }
1614    }
1615
1616    #[test]
1617    fn test_single_object_writer() -> TestResult {
1618        let mut buf: Vec<u8> = Vec::new();
1619        let obj = TestSingleObjectWriter {
1620            a: 300,
1621            b: 34.555,
1622            c: vec!["cat".into(), "dog".into()],
1623        };
1624        let mut writer = GenericSingleObjectWriter::new_with_capacity(
1625            &TestSingleObjectWriter::get_schema(),
1626            1024,
1627        )
1628        .expect("Should resolve schema");
1629        let value = obj.into();
1630        let written_bytes = writer
1631            .write_value_ref(&value, &mut buf)
1632            .expect("Error serializing properly");
1633
1634        assert!(buf.len() > 10, "no bytes written");
1635        assert_eq!(buf.len(), written_bytes);
1636        assert_eq!(buf[0], 0xC3);
1637        assert_eq!(buf[1], 0x01);
1638        assert_eq!(
1639            &buf[2..10],
1640            &TestSingleObjectWriter::get_schema()
1641                .fingerprint::<Rabin>()
1642                .bytes[..]
1643        );
1644        let mut msg_binary = Vec::new();
1645        encode(
1646            &value,
1647            &TestSingleObjectWriter::get_schema(),
1648            &mut msg_binary,
1649        )
1650        .expect("encode should have failed by here as a dependency of any writing");
1651        assert_eq!(&buf[10..], &msg_binary[..]);
1652
1653        Ok(())
1654    }
1655
1656    #[test]
1657    fn test_single_object_writer_with_header_builder() -> TestResult {
1658        let mut buf: Vec<u8> = Vec::new();
1659        let obj = TestSingleObjectWriter {
1660            a: 300,
1661            b: 34.555,
1662            c: vec!["cat".into(), "dog".into()],
1663        };
1664        let schema_uuid = Uuid::parse_str("b2f1cf00-0434-013e-439a-125eb8485a5f")?;
1665        let header_builder = GlueSchemaUuidHeader::from_uuid(schema_uuid);
1666        let mut writer = GenericSingleObjectWriter::new_with_capacity_and_header_builder(
1667            &TestSingleObjectWriter::get_schema(),
1668            1024,
1669            header_builder,
1670        )
1671        .expect("Should resolve schema");
1672        let value = obj.into();
1673        writer
1674            .write_value_ref(&value, &mut buf)
1675            .expect("Error serializing properly");
1676
1677        assert_eq!(buf[0], 0x03);
1678        assert_eq!(buf[1], 0x00);
1679        assert_eq!(buf[2..18], schema_uuid.into_bytes()[..]);
1680        Ok(())
1681    }
1682
1683    #[test]
1684    fn test_writer_parity() -> TestResult {
1685        let obj1 = TestSingleObjectWriter {
1686            a: 300,
1687            b: 34.555,
1688            c: vec!["cat".into(), "dog".into()],
1689        };
1690
1691        let mut buf1: Vec<u8> = Vec::new();
1692        let mut buf2: Vec<u8> = Vec::new();
1693        let mut buf3: Vec<u8> = Vec::new();
1694        let mut buf4: Vec<u8> = Vec::new();
1695
1696        let mut generic_writer = GenericSingleObjectWriter::new_with_capacity(
1697            &TestSingleObjectWriter::get_schema(),
1698            1024,
1699        )
1700        .expect("Should resolve schema");
1701        let specific_writer = SpecificSingleObjectWriter::<TestSingleObjectWriter>::new()
1702            .expect("Resolved should pass");
1703        specific_writer
1704            .write_ref(&obj1, &mut buf1)
1705            .expect("Serialization expected");
1706        specific_writer
1707            .write_ref(&obj1, &mut buf2)
1708            .expect("Serialization expected");
1709        specific_writer
1710            .write_value(obj1.clone(), &mut buf3)
1711            .expect("Serialization expected");
1712
1713        generic_writer
1714            .write_value(obj1.into(), &mut buf4)
1715            .expect("Serialization expected");
1716
1717        assert_eq!(buf1, buf2);
1718        assert_eq!(buf2, buf3);
1719        assert_eq!(buf3, buf4);
1720
1721        Ok(())
1722    }
1723
1724    #[test]
1725    fn avro_3894_take_aliases_into_account_when_serializing() -> TestResult {
1726        const SCHEMA: &str = r#"
1727  {
1728      "type": "record",
1729      "name": "Conference",
1730      "fields": [
1731          {"type": "string", "name": "name"},
1732          {"type": ["null", "long"], "name": "date", "aliases" : [ "time2", "time" ]}
1733      ]
1734  }"#;
1735
1736        #[derive(Debug, PartialEq, Eq, Clone, Serialize)]
1737        pub struct Conference {
1738            pub name: String,
1739            pub time: Option<i64>,
1740        }
1741
1742        let conf = Conference {
1743            name: "RustConf".to_string(),
1744            time: Some(1234567890),
1745        };
1746
1747        let schema = Schema::parse_str(SCHEMA)?;
1748        let mut writer = Writer::new(&schema, Vec::new())?;
1749
1750        let bytes = writer.append_ser(conf)?;
1751
1752        assert_eq!(182, bytes);
1753
1754        Ok(())
1755    }
1756
1757    #[test]
1758    fn avro_4014_validation_returns_a_detailed_error() -> TestResult {
1759        const SCHEMA: &str = r#"
1760  {
1761      "type": "record",
1762      "name": "Conference",
1763      "fields": [
1764          {"type": "string", "name": "name"},
1765          {"type": ["null", "long"], "name": "date", "aliases" : [ "time2", "time" ]}
1766      ]
1767  }"#;
1768
1769        #[derive(Debug, PartialEq, Clone, Serialize)]
1770        pub struct Conference {
1771            pub name: String,
1772            pub time: Option<f64>, // wrong type: f64 instead of i64
1773        }
1774
1775        let conf = Conference {
1776            name: "RustConf".to_string(),
1777            time: Some(12345678.90),
1778        };
1779
1780        let schema = Schema::parse_str(SCHEMA)?;
1781        let mut writer = Writer::new(&schema, Vec::new())?;
1782
1783        match writer.append_ser(conf) {
1784            Ok(bytes) => panic!("Expected an error, but got {bytes} bytes written"),
1785            Err(e) => {
1786                assert_eq!(
1787                    e.to_string(),
1788                    r#"Failed to serialize field 'time' for record Record(RecordSchema { name: Name { name: "Conference", namespace: None }, fields: [RecordField { name: "name", schema: String, .. }, RecordField { name: "date", aliases: ["time2", "time"], schema: Union(UnionSchema { schemas: [Null, Long] }), .. }], .. }): Failed to serialize value of type f64 using schema Union(UnionSchema { schemas: [Null, Long] }): 12345678.9. Cause: Cannot find a Double schema in [Null, Long]"#
1789                );
1790            }
1791        }
1792        Ok(())
1793    }
1794
1795    #[test]
1796    fn avro_4063_flush_applies_to_inner_writer() -> TestResult {
1797        const SCHEMA: &str = r#"
1798        {
1799            "type": "record",
1800            "name": "ExampleSchema",
1801            "fields": [
1802                {"name": "exampleField", "type": "string"}
1803            ]
1804        }
1805        "#;
1806
1807        #[derive(Clone, Default)]
1808        struct TestBuffer(Rc<RefCell<Vec<u8>>>);
1809
1810        impl TestBuffer {
1811            fn len(&self) -> usize {
1812                self.0.borrow().len()
1813            }
1814        }
1815
1816        impl Write for TestBuffer {
1817            fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
1818                self.0.borrow_mut().write(buf)
1819            }
1820
1821            fn flush(&mut self) -> std::io::Result<()> {
1822                Ok(())
1823            }
1824        }
1825
1826        let shared_buffer = TestBuffer::default();
1827
1828        let buffered_writer = std::io::BufWriter::new(shared_buffer.clone());
1829
1830        let schema = Schema::parse_str(SCHEMA)?;
1831
1832        let mut writer = Writer::new(&schema, buffered_writer)?;
1833
1834        let mut record = Record::new(writer.schema()).unwrap();
1835        record.put("exampleField", "value");
1836
1837        writer.append_value(record)?;
1838        writer.flush()?;
1839
1840        assert_eq!(
1841            shared_buffer.len(),
1842            151,
1843            "the test buffer was not fully written to after Writer::flush was called"
1844        );
1845
1846        Ok(())
1847    }
1848
1849    #[test]
1850    fn avro_rs_439_specific_single_object_writer_ref() -> TestResult {
1851        #[derive(Serialize)]
1852        struct Recursive {
1853            field: bool,
1854            recurse: Option<Box<Recursive>>,
1855        }
1856
1857        impl AvroSchema for Recursive {
1858            fn get_schema() -> Schema {
1859                Schema::parse_str(
1860                    r#"{
1861                    "name": "Recursive",
1862                    "type": "record",
1863                    "fields": [
1864                        { "name": "field", "type": "boolean" },
1865                        { "name": "recurse", "type": ["null", "Recursive"] }
1866                    ]
1867                }"#,
1868                )
1869                .unwrap()
1870            }
1871        }
1872
1873        let mut buffer = Vec::new();
1874        let writer = SpecificSingleObjectWriter::new()?;
1875
1876        writer.write(
1877            Recursive {
1878                field: true,
1879                recurse: Some(Box::new(Recursive {
1880                    field: false,
1881                    recurse: None,
1882                })),
1883            },
1884            &mut buffer,
1885        )?;
1886        assert_eq!(
1887            buffer,
1888            &[195, 1, 83, 223, 43, 26, 181, 179, 227, 224, 1, 2, 0, 0][..]
1889        );
1890
1891        Ok(())
1892    }
1893
1894    #[test]
1895    fn avro_rs_310_append_unvalidated_value() -> TestResult {
1896        let schema = Schema::String;
1897        let value = Value::Int(1);
1898
1899        let mut writer = Writer::new(&schema, Vec::new())?;
1900        writer.unvalidated_append_value_ref(&value)?;
1901        writer.unvalidated_append_value(value)?;
1902        let buffer = writer.into_inner()?;
1903
1904        // Check the last two bytes for the sync marker
1905        assert_eq!(&buffer[buffer.len() - 18..buffer.len() - 16], &[2, 2]);
1906
1907        let mut writer = Writer::new(&schema, Vec::new())?;
1908        let value = Value::Int(1);
1909        let err = writer.append_value_ref(&value).unwrap_err();
1910        assert_eq!(
1911            err.to_string(),
1912            "Value Int(1) does not match schema String: Reason: Unsupported value-schema combination! Value: Int(1), schema: String"
1913        );
1914        let err = writer.append_value(value).unwrap_err();
1915        assert_eq!(
1916            err.to_string(),
1917            "Value Int(1) does not match schema String: Reason: Unsupported value-schema combination! Value: Int(1), schema: String"
1918        );
1919
1920        Ok(())
1921    }
1922
1923    #[test]
1924    fn avro_rs_469_reset_writer() -> TestResult {
1925        let schema = Schema::Boolean;
1926        let values = [true, false, true, false];
1927        let mut writer = Writer::new(&schema, Vec::new())?;
1928
1929        for value in values {
1930            writer.append_value(value)?;
1931        }
1932
1933        writer.flush()?;
1934        let first_buffer = writer.get_ref().clone();
1935
1936        writer.reset();
1937        assert_eq!(writer.get_ref().len(), 0);
1938
1939        for value in values {
1940            writer.append_value(value)?;
1941        }
1942
1943        writer.flush()?;
1944        let second_buffer = writer.get_ref().clone();
1945        assert_eq!(first_buffer.len(), second_buffer.len());
1946        // File structure:
1947        // Header: ? bytes
1948        // Sync marker: 16 bytes
1949        // Data: 6 bytes
1950        // Sync marker: 16 bytes
1951        let len = first_buffer.len();
1952        let header = len - 16 - 6 - 16;
1953        let data = header + 16;
1954        assert_eq!(
1955            first_buffer[..header],
1956            second_buffer[..header],
1957            "Written header must be the same, excluding sync marker"
1958        );
1959        assert_ne!(
1960            first_buffer[header..data],
1961            second_buffer[header..data],
1962            "Sync markers should be different"
1963        );
1964        assert_eq!(
1965            first_buffer[data..data + 6],
1966            second_buffer[data..data + 6],
1967            "Written data must be the same"
1968        );
1969        assert_ne!(
1970            first_buffer[len - 16..],
1971            second_buffer[len - 16..],
1972            "Sync markers should be different"
1973        );
1974
1975        Ok(())
1976    }
1977}