apache_avro/
writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Logic handling writing in Avro format at user level.
19use crate::{
20    AvroResult, Codec, Error,
21    encode::{encode, encode_internal, encode_to_vec},
22    error::Details,
23    headers::{HeaderBuilder, RabinFingerprintHeader},
24    schema::{AvroSchema, Name, ResolvedOwnedSchema, ResolvedSchema, Schema},
25    serde::ser_schema::SchemaAwareWriteSerializer,
26    types::Value,
27};
28use serde::Serialize;
29use std::{
30    collections::HashMap, io::Write, marker::PhantomData, mem::ManuallyDrop, ops::RangeInclusive,
31};
32
33const DEFAULT_BLOCK_SIZE: usize = 16000;
34const AVRO_OBJECT_HEADER: &[u8] = b"Obj\x01";
35
36/// Main interface for writing Avro formatted values.
37///
38/// It is critical to call flush before `Writer<W>` is dropped. Though dropping will attempt to flush
39/// the contents of the buffer, any errors that happen in the process of dropping will be ignored.
40/// Calling flush ensures that the buffer is empty and thus dropping will not even attempt file operations.
41pub struct Writer<'a, W: Write> {
42    schema: &'a Schema,
43    writer: W,
44    resolved_schema: ResolvedSchema<'a>,
45    codec: Codec,
46    block_size: usize,
47    buffer: Vec<u8>,
48    num_values: usize,
49    marker: [u8; 16],
50    has_header: bool,
51    user_metadata: HashMap<String, Value>,
52}
53
54#[bon::bon]
55impl<'a, W: Write> Writer<'a, W> {
56    #[builder]
57    pub fn builder(
58        schema: &'a Schema,
59        schemata: Option<Vec<&'a Schema>>,
60        writer: W,
61        #[builder(default = Codec::Null)] codec: Codec,
62        #[builder(default = DEFAULT_BLOCK_SIZE)] block_size: usize,
63        #[builder(default = generate_sync_marker())] marker: [u8; 16],
64        /// Has the header already been written.
65        ///
66        /// To disable writing the header, this can be set to `true`.
67        #[builder(default = false)]
68        has_header: bool,
69        #[builder(default)] user_metadata: HashMap<String, Value>,
70    ) -> AvroResult<Self> {
71        let resolved_schema = if let Some(schemata) = schemata {
72            ResolvedSchema::try_from(schemata)?
73        } else {
74            ResolvedSchema::try_from(schema)?
75        };
76        Ok(Self {
77            schema,
78            writer,
79            resolved_schema,
80            codec,
81            block_size,
82            buffer: Vec::with_capacity(block_size),
83            num_values: 0,
84            marker,
85            has_header,
86            user_metadata,
87        })
88    }
89}
90
91impl<'a, W: Write> Writer<'a, W> {
92    /// Creates a `Writer` given a `Schema` and something implementing the `io::Write` trait to write
93    /// to.
94    /// No compression `Codec` will be used.
95    pub fn new(schema: &'a Schema, writer: W) -> AvroResult<Self> {
96        Writer::with_codec(schema, writer, Codec::Null)
97    }
98
99    /// Creates a `Writer` with a specific `Codec` given a `Schema` and something implementing the
100    /// `io::Write` trait to write to.
101    pub fn with_codec(schema: &'a Schema, writer: W, codec: Codec) -> AvroResult<Self> {
102        Self::builder()
103            .schema(schema)
104            .writer(writer)
105            .codec(codec)
106            .build()
107    }
108
109    /// Creates a `Writer` with a specific `Codec` given a `Schema` and something implementing the
110    /// `io::Write` trait to write to.
111    /// If the `schema` is incomplete, i.e. contains `Schema::Ref`s then all dependencies must
112    /// be provided in `schemata`.
113    pub fn with_schemata(
114        schema: &'a Schema,
115        schemata: Vec<&'a Schema>,
116        writer: W,
117        codec: Codec,
118    ) -> AvroResult<Self> {
119        Self::builder()
120            .schema(schema)
121            .schemata(schemata)
122            .writer(writer)
123            .codec(codec)
124            .build()
125    }
126
127    /// Creates a `Writer` that will append values to already populated
128    /// `std::io::Write` using the provided `marker`
129    /// No compression `Codec` will be used.
130    pub fn append_to(schema: &'a Schema, writer: W, marker: [u8; 16]) -> AvroResult<Self> {
131        Writer::append_to_with_codec(schema, writer, Codec::Null, marker)
132    }
133
134    /// Creates a `Writer` that will append values to already populated
135    /// `std::io::Write` using the provided `marker`
136    pub fn append_to_with_codec(
137        schema: &'a Schema,
138        writer: W,
139        codec: Codec,
140        marker: [u8; 16],
141    ) -> AvroResult<Self> {
142        Self::builder()
143            .schema(schema)
144            .writer(writer)
145            .codec(codec)
146            .marker(marker)
147            .has_header(true)
148            .build()
149    }
150
151    /// Creates a `Writer` that will append values to already populated
152    /// `std::io::Write` using the provided `marker`
153    pub fn append_to_with_codec_schemata(
154        schema: &'a Schema,
155        schemata: Vec<&'a Schema>,
156        writer: W,
157        codec: Codec,
158        marker: [u8; 16],
159    ) -> AvroResult<Self> {
160        Self::builder()
161            .schema(schema)
162            .schemata(schemata)
163            .writer(writer)
164            .codec(codec)
165            .marker(marker)
166            .has_header(true)
167            .build()
168    }
169
170    /// Get a reference to the `Schema` associated to a `Writer`.
171    pub fn schema(&self) -> &'a Schema {
172        self.schema
173    }
174
175    /// Append a compatible value (implementing the `ToAvro` trait) to a `Writer`, also performing
176    /// schema validation.
177    ///
178    /// Returns the number of bytes written (it might be 0, see below).
179    ///
180    /// **NOTE**: This function is not guaranteed to perform any actual write, since it relies on
181    /// internal buffering for performance reasons. If you want to be sure the value has been
182    /// written, then call [`flush`](Writer::flush).
183    pub fn append<T: Into<Value>>(&mut self, value: T) -> AvroResult<usize> {
184        let n = self.maybe_write_header()?;
185
186        let avro = value.into();
187        self.append_value_ref(&avro).map(|m| m + n)
188    }
189
190    /// Append a compatible value to a `Writer`, also performing schema validation.
191    ///
192    /// Returns the number of bytes written (it might be 0, see below).
193    ///
194    /// **NOTE**: This function is not guaranteed to perform any actual write, since it relies on
195    /// internal buffering for performance reasons. If you want to be sure the value has been
196    /// written, then call [`flush`](Writer::flush).
197    pub fn append_value_ref(&mut self, value: &Value) -> AvroResult<usize> {
198        let n = self.maybe_write_header()?;
199
200        write_value_ref_resolved(self.schema, &self.resolved_schema, value, &mut self.buffer)?;
201        self.num_values += 1;
202
203        if self.buffer.len() >= self.block_size {
204            return self.flush().map(|b| b + n);
205        }
206
207        Ok(n)
208    }
209
210    /// Append anything implementing the `Serialize` trait to a `Writer` for
211    /// [`serde`](https://docs.serde.rs/serde/index.html) compatibility, also performing schema
212    /// validation.
213    ///
214    /// Returns the number of bytes written.
215    ///
216    /// **NOTE**: This function is not guaranteed to perform any actual write, since it relies on
217    /// internal buffering for performance reasons. If you want to be sure the value has been
218    /// written, then call [`flush`](Writer::flush).
219    pub fn append_ser<S: Serialize>(&mut self, value: S) -> AvroResult<usize> {
220        let n = self.maybe_write_header()?;
221
222        let mut serializer = SchemaAwareWriteSerializer::new(
223            &mut self.buffer,
224            self.schema,
225            self.resolved_schema.get_names(),
226            None,
227        );
228        value.serialize(&mut serializer)?;
229        self.num_values += 1;
230
231        if self.buffer.len() >= self.block_size {
232            return self.flush().map(|b| b + n);
233        }
234
235        Ok(n)
236    }
237
238    /// Extend a `Writer` with an `Iterator` of compatible values (implementing the `ToAvro`
239    /// trait), also performing schema validation.
240    ///
241    /// Returns the number of bytes written.
242    ///
243    /// **NOTE**: This function forces the written data to be flushed (an implicit
244    /// call to [`flush`](Writer::flush) is performed).
245    pub fn extend<I, T: Into<Value>>(&mut self, values: I) -> AvroResult<usize>
246    where
247        I: IntoIterator<Item = T>,
248    {
249        /*
250        https://github.com/rust-lang/rfcs/issues/811 :(
251        let mut stream = values
252            .filter_map(|value| value.serialize(&mut self.serializer).ok())
253            .map(|value| value.encode(self.schema))
254            .collect::<Option<Vec<_>>>()
255            .ok_or_else(|| err_msg("value does not match given schema"))?
256            .into_iter()
257            .fold(Vec::new(), |mut acc, stream| {
258                num_values += 1;
259                acc.extend(stream); acc
260            });
261        */
262
263        let mut num_bytes = 0;
264        for value in values {
265            num_bytes += self.append(value)?;
266        }
267        num_bytes += self.flush()?;
268
269        Ok(num_bytes)
270    }
271
272    /// Extend a `Writer` with an `Iterator` of anything implementing the `Serialize` trait for
273    /// [`serde`](https://docs.serde.rs/serde/index.html) compatibility, also performing schema
274    /// validation.
275    ///
276    /// Returns the number of bytes written.
277    ///
278    /// **NOTE**: This function forces the written data to be flushed (an implicit
279    /// call to [`flush`](Writer::flush) is performed).
280    pub fn extend_ser<I, T: Serialize>(&mut self, values: I) -> AvroResult<usize>
281    where
282        I: IntoIterator<Item = T>,
283    {
284        /*
285        https://github.com/rust-lang/rfcs/issues/811 :(
286        let mut stream = values
287            .filter_map(|value| value.serialize(&mut self.serializer).ok())
288            .map(|value| value.encode(self.schema))
289            .collect::<Option<Vec<_>>>()
290            .ok_or_else(|| err_msg("value does not match given schema"))?
291            .into_iter()
292            .fold(Vec::new(), |mut acc, stream| {
293                num_values += 1;
294                acc.extend(stream); acc
295            });
296        */
297
298        let mut num_bytes = 0;
299        for value in values {
300            num_bytes += self.append_ser(value)?;
301        }
302        num_bytes += self.flush()?;
303
304        Ok(num_bytes)
305    }
306
307    /// Extend a `Writer` by appending each `Value` from a slice, while also performing schema
308    /// validation on each value appended.
309    ///
310    /// Returns the number of bytes written.
311    ///
312    /// **NOTE**: This function forces the written data to be flushed (an implicit
313    /// call to [`flush`](Writer::flush) is performed).
314    pub fn extend_from_slice(&mut self, values: &[Value]) -> AvroResult<usize> {
315        let mut num_bytes = 0;
316        for value in values {
317            num_bytes += self.append_value_ref(value)?;
318        }
319        num_bytes += self.flush()?;
320
321        Ok(num_bytes)
322    }
323
324    /// Flush the content to the inner `Writer`.
325    ///
326    /// Call this function to make sure all the content has been written before releasing the `Writer`.
327    /// This will also write the header if it wasn't written yet and hasn't been disabled using
328    /// [`WriterBuilder::has_header`].
329    ///
330    /// Returns the number of bytes written.
331    pub fn flush(&mut self) -> AvroResult<usize> {
332        let mut num_bytes = self.maybe_write_header()?;
333        if self.num_values == 0 {
334            return Ok(num_bytes);
335        }
336
337        self.codec.compress(&mut self.buffer)?;
338
339        let num_values = self.num_values;
340        let stream_len = self.buffer.len();
341
342        num_bytes += self.append_raw(&num_values.into(), &Schema::Long)?
343            + self.append_raw(&stream_len.into(), &Schema::Long)?
344            + self
345                .writer
346                .write(self.buffer.as_ref())
347                .map_err(Details::WriteBytes)?
348            + self.append_marker()?;
349
350        self.buffer.clear();
351        self.num_values = 0;
352
353        self.writer.flush().map_err(Details::FlushWriter)?;
354
355        Ok(num_bytes)
356    }
357
358    /// Return what the `Writer` is writing to, consuming the `Writer` itself.
359    ///
360    /// **NOTE**: This function forces the written data to be flushed (an implicit
361    /// call to [`flush`](Writer::flush) is performed).
362    pub fn into_inner(mut self) -> AvroResult<W> {
363        self.maybe_write_header()?;
364        self.flush()?;
365
366        let mut this = ManuallyDrop::new(self);
367
368        // Extract every member that is not Copy and therefore should be dropped
369        let _buffer = std::mem::take(&mut this.buffer);
370        let _user_metadata = std::mem::take(&mut this.user_metadata);
371        // SAFETY: resolved schema is not accessed after this and won't be dropped because of ManuallyDrop
372        unsafe { std::ptr::drop_in_place(&mut this.resolved_schema) };
373
374        // SAFETY: double-drops are prevented by putting `this` in a ManuallyDrop that is never dropped
375        let writer = unsafe { std::ptr::read(&this.writer) };
376
377        Ok(writer)
378    }
379
380    /// Gets a reference to the underlying writer.
381    ///
382    /// **NOTE**: There is likely data still in the buffer. To have all the data
383    /// in the writer call [`flush`](Writer::flush) first.
384    pub fn get_ref(&self) -> &W {
385        &self.writer
386    }
387
388    /// Gets a mutable reference to the underlying writer.
389    ///
390    /// It is inadvisable to directly write to the underlying writer.
391    ///
392    /// **NOTE**: There is likely data still in the buffer. To have all the data
393    /// in the writer call [`flush`](Writer::flush) first.
394    pub fn get_mut(&mut self) -> &mut W {
395        &mut self.writer
396    }
397
398    /// Generate and append synchronization marker to the payload.
399    fn append_marker(&mut self) -> AvroResult<usize> {
400        // using .writer.write directly to avoid mutable borrow of self
401        // with ref borrowing of self.marker
402        self.writer
403            .write(&self.marker)
404            .map_err(|e| Details::WriteMarker(e).into())
405    }
406
407    /// Append a raw Avro Value to the payload avoiding to encode it again.
408    fn append_raw(&mut self, value: &Value, schema: &Schema) -> AvroResult<usize> {
409        self.append_bytes(encode_to_vec(value, schema)?.as_ref())
410    }
411
412    /// Append pure bytes to the payload.
413    fn append_bytes(&mut self, bytes: &[u8]) -> AvroResult<usize> {
414        self.writer
415            .write(bytes)
416            .map_err(|e| Details::WriteBytes(e).into())
417    }
418
419    /// Adds custom metadata to the file.
420    /// This method could be used only before adding the first record to the writer.
421    pub fn add_user_metadata<T: AsRef<[u8]>>(&mut self, key: String, value: T) -> AvroResult<()> {
422        if !self.has_header {
423            if key.starts_with("avro.") {
424                return Err(Details::InvalidMetadataKey(key).into());
425            }
426            self.user_metadata
427                .insert(key, Value::Bytes(value.as_ref().to_vec()));
428            Ok(())
429        } else {
430            Err(Details::FileHeaderAlreadyWritten.into())
431        }
432    }
433
434    /// Create an Avro header based on schema, codec and sync marker.
435    fn header(&self) -> Result<Vec<u8>, Error> {
436        let schema_bytes = serde_json::to_string(self.schema)
437            .map_err(Details::ConvertJsonToString)?
438            .into_bytes();
439
440        let mut metadata = HashMap::with_capacity(2);
441        metadata.insert("avro.schema", Value::Bytes(schema_bytes));
442        if self.codec != Codec::Null {
443            metadata.insert("avro.codec", self.codec.into());
444        }
445        match self.codec {
446            #[cfg(feature = "bzip")]
447            Codec::Bzip2(settings) => {
448                metadata.insert(
449                    "avro.codec.compression_level",
450                    Value::Bytes(vec![settings.compression_level]),
451                );
452            }
453            #[cfg(feature = "xz")]
454            Codec::Xz(settings) => {
455                metadata.insert(
456                    "avro.codec.compression_level",
457                    Value::Bytes(vec![settings.compression_level]),
458                );
459            }
460            #[cfg(feature = "zstandard")]
461            Codec::Zstandard(settings) => {
462                metadata.insert(
463                    "avro.codec.compression_level",
464                    Value::Bytes(vec![settings.compression_level]),
465                );
466            }
467            _ => {}
468        }
469
470        for (k, v) in &self.user_metadata {
471            metadata.insert(k.as_str(), v.clone());
472        }
473
474        let mut header = Vec::new();
475        header.extend_from_slice(AVRO_OBJECT_HEADER);
476        encode(&metadata.into(), &Schema::map(Schema::Bytes), &mut header)?;
477        header.extend_from_slice(&self.marker);
478
479        Ok(header)
480    }
481
482    fn maybe_write_header(&mut self) -> AvroResult<usize> {
483        if !self.has_header {
484            let header = self.header()?;
485            let n = self.append_bytes(header.as_ref())?;
486            self.has_header = true;
487            Ok(n)
488        } else {
489            Ok(0)
490        }
491    }
492}
493
494impl<W: Write> Drop for Writer<'_, W> {
495    /// Drop the writer, will try to flush ignoring any errors.
496    fn drop(&mut self) {
497        let _ = self.maybe_write_header();
498        let _ = self.flush();
499    }
500}
501
502/// Encode a compatible value (implementing the `ToAvro` trait) into Avro format, also performing
503/// schema validation.
504///
505/// This is an internal function which gets the bytes buffer where to write as parameter instead of
506/// creating a new one like `to_avro_datum`.
507fn write_avro_datum<T: Into<Value>, W: Write>(
508    schema: &Schema,
509    value: T,
510    writer: &mut W,
511) -> Result<(), Error> {
512    let avro = value.into();
513    if !avro.validate(schema) {
514        return Err(Details::Validation.into());
515    }
516    encode(&avro, schema, writer)?;
517    Ok(())
518}
519
520fn write_avro_datum_schemata<T: Into<Value>>(
521    schema: &Schema,
522    schemata: Vec<&Schema>,
523    value: T,
524    buffer: &mut Vec<u8>,
525) -> AvroResult<usize> {
526    let avro = value.into();
527    let rs = ResolvedSchema::try_from(schemata)?;
528    let names = rs.get_names();
529    let enclosing_namespace = schema.namespace();
530    if let Some(_err) = avro.validate_internal(schema, names, &enclosing_namespace) {
531        return Err(Details::Validation.into());
532    }
533    encode_internal(&avro, schema, names, &enclosing_namespace, buffer)
534}
535
536/// Writer that encodes messages according to the single object encoding v1 spec
537/// Uses an API similar to the current File Writer
538/// Writes all object bytes at once, and drains internal buffer
539pub struct GenericSingleObjectWriter {
540    buffer: Vec<u8>,
541    resolved: ResolvedOwnedSchema,
542}
543
544impl GenericSingleObjectWriter {
545    pub fn new_with_capacity(
546        schema: &Schema,
547        initial_buffer_cap: usize,
548    ) -> AvroResult<GenericSingleObjectWriter> {
549        let header_builder = RabinFingerprintHeader::from_schema(schema);
550        Self::new_with_capacity_and_header_builder(schema, initial_buffer_cap, header_builder)
551    }
552
553    pub fn new_with_capacity_and_header_builder<HB: HeaderBuilder>(
554        schema: &Schema,
555        initial_buffer_cap: usize,
556        header_builder: HB,
557    ) -> AvroResult<GenericSingleObjectWriter> {
558        let mut buffer = Vec::with_capacity(initial_buffer_cap);
559        let header = header_builder.build_header();
560        buffer.extend_from_slice(&header);
561
562        Ok(GenericSingleObjectWriter {
563            buffer,
564            resolved: ResolvedOwnedSchema::try_from(schema.clone())?,
565        })
566    }
567
568    const HEADER_LENGTH_RANGE: RangeInclusive<usize> = 10_usize..=20_usize;
569
570    /// Write the referenced Value to the provided Write object. Returns a result with the number of bytes written including the header
571    pub fn write_value_ref<W: Write>(&mut self, v: &Value, writer: &mut W) -> AvroResult<usize> {
572        let original_length = self.buffer.len();
573        if !Self::HEADER_LENGTH_RANGE.contains(&original_length) {
574            Err(Details::IllegalSingleObjectWriterState.into())
575        } else {
576            write_value_ref_owned_resolved(&self.resolved, v, &mut self.buffer)?;
577            writer
578                .write_all(&self.buffer)
579                .map_err(Details::WriteBytes)?;
580            let len = self.buffer.len();
581            self.buffer.truncate(original_length);
582            Ok(len)
583        }
584    }
585
586    /// Write the Value to the provided Write object. Returns a result with the number of bytes written including the header
587    pub fn write_value<W: Write>(&mut self, v: Value, writer: &mut W) -> AvroResult<usize> {
588        self.write_value_ref(&v, writer)
589    }
590}
591
592/// Writer that encodes messages according to the single object encoding v1 spec
593pub struct SpecificSingleObjectWriter<T>
594where
595    T: AvroSchema,
596{
597    inner: GenericSingleObjectWriter,
598    schema: Schema,
599    header_written: bool,
600    _model: PhantomData<T>,
601}
602
603impl<T> SpecificSingleObjectWriter<T>
604where
605    T: AvroSchema,
606{
607    pub fn with_capacity(buffer_cap: usize) -> AvroResult<SpecificSingleObjectWriter<T>> {
608        let schema = T::get_schema();
609        Ok(SpecificSingleObjectWriter {
610            inner: GenericSingleObjectWriter::new_with_capacity(&schema, buffer_cap)?,
611            schema,
612            header_written: false,
613            _model: PhantomData,
614        })
615    }
616}
617
618impl<T> SpecificSingleObjectWriter<T>
619where
620    T: AvroSchema + Into<Value>,
621{
622    /// Write the `Into<Value>` to the provided Write object. Returns a result with the number
623    /// of bytes written including the header
624    pub fn write_value<W: Write>(&mut self, data: T, writer: &mut W) -> AvroResult<usize> {
625        let v: Value = data.into();
626        self.inner.write_value_ref(&v, writer)
627    }
628}
629
630impl<T> SpecificSingleObjectWriter<T>
631where
632    T: AvroSchema + Serialize,
633{
634    /// Write the referenced `Serialize` object to the provided `Write` object. Returns a result with
635    /// the number of bytes written including the header
636    pub fn write_ref<W: Write>(&mut self, data: &T, writer: &mut W) -> AvroResult<usize> {
637        let mut bytes_written: usize = 0;
638
639        if !self.header_written {
640            bytes_written += writer
641                .write(self.inner.buffer.as_slice())
642                .map_err(Details::WriteBytes)?;
643            self.header_written = true;
644        }
645
646        bytes_written += write_avro_datum_ref(&self.schema, data, writer)?;
647
648        Ok(bytes_written)
649    }
650
651    /// Write the Serialize object to the provided Write object. Returns a result with the number
652    /// of bytes written including the header
653    pub fn write<W: Write>(&mut self, data: T, writer: &mut W) -> AvroResult<usize> {
654        self.write_ref(&data, writer)
655    }
656}
657
658fn write_value_ref_resolved(
659    schema: &Schema,
660    resolved_schema: &ResolvedSchema,
661    value: &Value,
662    buffer: &mut Vec<u8>,
663) -> AvroResult<usize> {
664    match value.validate_internal(schema, resolved_schema.get_names(), &schema.namespace()) {
665        Some(reason) => Err(Details::ValidationWithReason {
666            value: value.clone(),
667            schema: schema.clone(),
668            reason,
669        }
670        .into()),
671        None => encode_internal(
672            value,
673            schema,
674            resolved_schema.get_names(),
675            &schema.namespace(),
676            buffer,
677        ),
678    }
679}
680
681fn write_value_ref_owned_resolved(
682    resolved_schema: &ResolvedOwnedSchema,
683    value: &Value,
684    buffer: &mut Vec<u8>,
685) -> AvroResult<()> {
686    let root_schema = resolved_schema.get_root_schema();
687    if let Some(reason) = value.validate_internal(
688        root_schema,
689        resolved_schema.get_names(),
690        &root_schema.namespace(),
691    ) {
692        return Err(Details::ValidationWithReason {
693            value: value.clone(),
694            schema: root_schema.clone(),
695            reason,
696        }
697        .into());
698    }
699    encode_internal(
700        value,
701        root_schema,
702        resolved_schema.get_names(),
703        &root_schema.namespace(),
704        buffer,
705    )?;
706    Ok(())
707}
708
709/// Encode a compatible value (implementing the `ToAvro` trait) into Avro format, also
710/// performing schema validation.
711///
712/// **NOTE**: This function has a quite small niche of usage and does NOT generate headers and sync
713/// markers; use [`Writer`] to be fully Avro-compatible if you don't know what
714/// you are doing, instead.
715pub fn to_avro_datum<T: Into<Value>>(schema: &Schema, value: T) -> AvroResult<Vec<u8>> {
716    let mut buffer = Vec::new();
717    write_avro_datum(schema, value, &mut buffer)?;
718    Ok(buffer)
719}
720
721/// Write the referenced [Serialize]able object to the provided [Write] object.
722/// Returns a result with the number of bytes written.
723///
724/// **NOTE**: This function has a quite small niche of usage and does **NOT** generate headers and sync
725/// markers; use [`append_ser`](Writer::append_ser) to be fully Avro-compatible
726/// if you don't know what you are doing, instead.
727pub fn write_avro_datum_ref<T: Serialize, W: Write>(
728    schema: &Schema,
729    data: &T,
730    writer: &mut W,
731) -> AvroResult<usize> {
732    let names: HashMap<Name, &Schema> = HashMap::new();
733    let mut serializer = SchemaAwareWriteSerializer::new(writer, schema, &names, None);
734    let bytes_written = data.serialize(&mut serializer)?;
735    Ok(bytes_written)
736}
737
738/// Encode a compatible value (implementing the `ToAvro` trait) into Avro format, also
739/// performing schema validation.
740/// If the provided `schema` is incomplete then its dependencies must be
741/// provided in `schemata`
742pub fn to_avro_datum_schemata<T: Into<Value>>(
743    schema: &Schema,
744    schemata: Vec<&Schema>,
745    value: T,
746) -> AvroResult<Vec<u8>> {
747    let mut buffer = Vec::new();
748    write_avro_datum_schemata(schema, schemata, value, &mut buffer)?;
749    Ok(buffer)
750}
751
752#[cfg(not(target_arch = "wasm32"))]
753fn generate_sync_marker() -> [u8; 16] {
754    let mut marker = [0_u8; 16];
755    std::iter::repeat_with(rand::random)
756        .take(16)
757        .enumerate()
758        .for_each(|(i, n)| marker[i] = n);
759    marker
760}
761
762#[cfg(target_arch = "wasm32")]
763fn generate_sync_marker() -> [u8; 16] {
764    let mut marker = [0_u8; 16];
765    std::iter::repeat_with(quad_rand::rand)
766        .take(4)
767        .flat_map(|i| i.to_be_bytes())
768        .enumerate()
769        .for_each(|(i, n)| marker[i] = n);
770    marker
771}
772
773#[cfg(test)]
774mod tests {
775    use std::{cell::RefCell, rc::Rc};
776
777    use super::*;
778    use crate::{
779        Reader,
780        decimal::Decimal,
781        duration::{Days, Duration, Millis, Months},
782        headers::GlueSchemaUuidHeader,
783        rabin::Rabin,
784        schema::{DecimalSchema, FixedSchema, Name},
785        types::Record,
786        util::zig_i64,
787    };
788    use pretty_assertions::assert_eq;
789    use serde::{Deserialize, Serialize};
790    use uuid::Uuid;
791
792    use crate::schema::InnerDecimalSchema;
793    use crate::{codec::DeflateSettings, error::Details};
794    use apache_avro_test_helper::TestResult;
795
796    const AVRO_OBJECT_HEADER_LEN: usize = AVRO_OBJECT_HEADER.len();
797
798    const SCHEMA: &str = r#"
799    {
800      "type": "record",
801      "name": "test",
802      "fields": [
803        {
804          "name": "a",
805          "type": "long",
806          "default": 42
807        },
808        {
809          "name": "b",
810          "type": "string"
811        }
812      ]
813    }
814    "#;
815
816    const UNION_SCHEMA: &str = r#"["null", "long"]"#;
817
818    #[test]
819    fn test_to_avro_datum() -> TestResult {
820        let schema = Schema::parse_str(SCHEMA)?;
821        let mut record = Record::new(&schema).unwrap();
822        record.put("a", 27i64);
823        record.put("b", "foo");
824
825        let mut expected = Vec::new();
826        zig_i64(27, &mut expected)?;
827        zig_i64(3, &mut expected)?;
828        expected.extend([b'f', b'o', b'o']);
829
830        assert_eq!(to_avro_datum(&schema, record)?, expected);
831
832        Ok(())
833    }
834
835    #[test]
836    fn avro_rs_193_write_avro_datum_ref() -> TestResult {
837        #[derive(Serialize)]
838        struct TestStruct {
839            a: i64,
840            b: String,
841        }
842
843        let schema = Schema::parse_str(SCHEMA)?;
844        let mut writer: Vec<u8> = Vec::new();
845        let data = TestStruct {
846            a: 27,
847            b: "foo".to_string(),
848        };
849
850        let mut expected = Vec::new();
851        zig_i64(27, &mut expected)?;
852        zig_i64(3, &mut expected)?;
853        expected.extend([b'f', b'o', b'o']);
854
855        let bytes = write_avro_datum_ref(&schema, &data, &mut writer)?;
856
857        assert_eq!(bytes, expected.len());
858        assert_eq!(writer, expected);
859
860        Ok(())
861    }
862
863    #[test]
864    fn avro_rs_220_flush_write_header() -> TestResult {
865        let schema = Schema::parse_str(SCHEMA)?;
866
867        // By default flush should write the header even if nothing was added yet
868        let mut writer = Writer::new(&schema, Vec::new())?;
869        writer.flush()?;
870        let result = writer.into_inner()?;
871        assert_eq!(result.len(), 147);
872
873        // Unless the user indicates via the builder that the header has already been written
874        let mut writer = Writer::builder()
875            .has_header(true)
876            .schema(&schema)
877            .writer(Vec::new())
878            .build()?;
879        writer.flush()?;
880        let result = writer.into_inner()?;
881        assert_eq!(result.len(), 0);
882
883        Ok(())
884    }
885
886    #[test]
887    fn test_union_not_null() -> TestResult {
888        let schema = Schema::parse_str(UNION_SCHEMA)?;
889        let union = Value::Union(1, Box::new(Value::Long(3)));
890
891        let mut expected = Vec::new();
892        zig_i64(1, &mut expected)?;
893        zig_i64(3, &mut expected)?;
894
895        assert_eq!(to_avro_datum(&schema, union)?, expected);
896
897        Ok(())
898    }
899
900    #[test]
901    fn test_union_null() -> TestResult {
902        let schema = Schema::parse_str(UNION_SCHEMA)?;
903        let union = Value::Union(0, Box::new(Value::Null));
904
905        let mut expected = Vec::new();
906        zig_i64(0, &mut expected)?;
907
908        assert_eq!(to_avro_datum(&schema, union)?, expected);
909
910        Ok(())
911    }
912
913    fn logical_type_test<T: Into<Value> + Clone>(
914        schema_str: &'static str,
915
916        expected_schema: &Schema,
917        value: Value,
918
919        raw_schema: &Schema,
920        raw_value: T,
921    ) -> TestResult {
922        let schema = Schema::parse_str(schema_str)?;
923        assert_eq!(&schema, expected_schema);
924        // The serialized format should be the same as the schema.
925        let ser = to_avro_datum(&schema, value.clone())?;
926        let raw_ser = to_avro_datum(raw_schema, raw_value)?;
927        assert_eq!(ser, raw_ser);
928
929        // Should deserialize from the schema into the logical type.
930        let mut r = ser.as_slice();
931        let de = crate::from_avro_datum(&schema, &mut r, None)?;
932        assert_eq!(de, value);
933        Ok(())
934    }
935
936    #[test]
937    fn date() -> TestResult {
938        logical_type_test(
939            r#"{"type": "int", "logicalType": "date"}"#,
940            &Schema::Date,
941            Value::Date(1_i32),
942            &Schema::Int,
943            1_i32,
944        )
945    }
946
947    #[test]
948    fn time_millis() -> TestResult {
949        logical_type_test(
950            r#"{"type": "int", "logicalType": "time-millis"}"#,
951            &Schema::TimeMillis,
952            Value::TimeMillis(1_i32),
953            &Schema::Int,
954            1_i32,
955        )
956    }
957
958    #[test]
959    fn time_micros() -> TestResult {
960        logical_type_test(
961            r#"{"type": "long", "logicalType": "time-micros"}"#,
962            &Schema::TimeMicros,
963            Value::TimeMicros(1_i64),
964            &Schema::Long,
965            1_i64,
966        )
967    }
968
969    #[test]
970    fn timestamp_millis() -> TestResult {
971        logical_type_test(
972            r#"{"type": "long", "logicalType": "timestamp-millis"}"#,
973            &Schema::TimestampMillis,
974            Value::TimestampMillis(1_i64),
975            &Schema::Long,
976            1_i64,
977        )
978    }
979
980    #[test]
981    fn timestamp_micros() -> TestResult {
982        logical_type_test(
983            r#"{"type": "long", "logicalType": "timestamp-micros"}"#,
984            &Schema::TimestampMicros,
985            Value::TimestampMicros(1_i64),
986            &Schema::Long,
987            1_i64,
988        )
989    }
990
991    #[test]
992    fn decimal_fixed() -> TestResult {
993        let size = 30;
994        let fixed = FixedSchema {
995            name: Name::new("decimal")?,
996            aliases: None,
997            doc: None,
998            size,
999            default: None,
1000            attributes: Default::default(),
1001        };
1002        let inner = InnerDecimalSchema::Fixed(fixed.clone());
1003        let value = vec![0u8; size];
1004        logical_type_test(
1005            r#"{"type": {"type": "fixed", "size": 30, "name": "decimal"}, "logicalType": "decimal", "precision": 20, "scale": 5}"#,
1006            &Schema::Decimal(DecimalSchema {
1007                precision: 20,
1008                scale: 5,
1009                inner,
1010            }),
1011            Value::Decimal(Decimal::from(value.clone())),
1012            &Schema::Fixed(fixed),
1013            Value::Fixed(size, value),
1014        )
1015    }
1016
1017    #[test]
1018    fn decimal_bytes() -> TestResult {
1019        let value = vec![0u8; 10];
1020        logical_type_test(
1021            r#"{"type": "bytes", "logicalType": "decimal", "precision": 4, "scale": 3}"#,
1022            &Schema::Decimal(DecimalSchema {
1023                precision: 4,
1024                scale: 3,
1025                inner: InnerDecimalSchema::Bytes,
1026            }),
1027            Value::Decimal(Decimal::from(value.clone())),
1028            &Schema::Bytes,
1029            value,
1030        )
1031    }
1032
1033    #[test]
1034    fn duration() -> TestResult {
1035        let inner = Schema::Fixed(FixedSchema {
1036            name: Name::new("duration")?,
1037            aliases: None,
1038            doc: None,
1039            size: 12,
1040            default: None,
1041            attributes: Default::default(),
1042        });
1043        let value = Value::Duration(Duration::new(
1044            Months::new(256),
1045            Days::new(512),
1046            Millis::new(1024),
1047        ));
1048        logical_type_test(
1049            r#"{"type": {"type": "fixed", "name": "duration", "size": 12}, "logicalType": "duration"}"#,
1050            &Schema::Duration,
1051            value,
1052            &inner,
1053            Value::Fixed(12, vec![0, 1, 0, 0, 0, 2, 0, 0, 0, 4, 0, 0]),
1054        )
1055    }
1056
1057    #[test]
1058    fn test_writer_append() -> TestResult {
1059        let schema = Schema::parse_str(SCHEMA)?;
1060        let mut writer = Writer::new(&schema, Vec::new())?;
1061
1062        let mut record = Record::new(&schema).unwrap();
1063        record.put("a", 27i64);
1064        record.put("b", "foo");
1065
1066        let n1 = writer.append(record.clone())?;
1067        let n2 = writer.append(record.clone())?;
1068        let n3 = writer.flush()?;
1069        let result = writer.into_inner()?;
1070
1071        assert_eq!(n1 + n2 + n3, result.len());
1072
1073        let mut data = Vec::new();
1074        zig_i64(27, &mut data)?;
1075        zig_i64(3, &mut data)?;
1076        data.extend(b"foo");
1077        data.extend(data.clone());
1078
1079        // starts with magic
1080        assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1081        // ends with data and sync marker
1082        let last_data_byte = result.len() - 16;
1083        assert_eq!(
1084            &result[last_data_byte - data.len()..last_data_byte],
1085            data.as_slice()
1086        );
1087
1088        Ok(())
1089    }
1090
1091    #[test]
1092    fn test_writer_extend() -> TestResult {
1093        let schema = Schema::parse_str(SCHEMA)?;
1094        let mut writer = Writer::new(&schema, Vec::new())?;
1095
1096        let mut record = Record::new(&schema).unwrap();
1097        record.put("a", 27i64);
1098        record.put("b", "foo");
1099        let record_copy = record.clone();
1100        let records = vec![record, record_copy];
1101
1102        let n1 = writer.extend(records)?;
1103        let n2 = writer.flush()?;
1104        let result = writer.into_inner()?;
1105
1106        assert_eq!(n1 + n2, result.len());
1107
1108        let mut data = Vec::new();
1109        zig_i64(27, &mut data)?;
1110        zig_i64(3, &mut data)?;
1111        data.extend(b"foo");
1112        data.extend(data.clone());
1113
1114        // starts with magic
1115        assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1116        // ends with data and sync marker
1117        let last_data_byte = result.len() - 16;
1118        assert_eq!(
1119            &result[last_data_byte - data.len()..last_data_byte],
1120            data.as_slice()
1121        );
1122
1123        Ok(())
1124    }
1125
1126    #[derive(Debug, Clone, Deserialize, Serialize)]
1127    struct TestSerdeSerialize {
1128        a: i64,
1129        b: String,
1130    }
1131
1132    #[test]
1133    fn test_writer_append_ser() -> TestResult {
1134        let schema = Schema::parse_str(SCHEMA)?;
1135        let mut writer = Writer::new(&schema, Vec::new())?;
1136
1137        let record = TestSerdeSerialize {
1138            a: 27,
1139            b: "foo".to_owned(),
1140        };
1141
1142        let n1 = writer.append_ser(record)?;
1143        let n2 = writer.flush()?;
1144        let result = writer.into_inner()?;
1145
1146        assert_eq!(n1 + n2, result.len());
1147
1148        let mut data = Vec::new();
1149        zig_i64(27, &mut data)?;
1150        zig_i64(3, &mut data)?;
1151        data.extend(b"foo");
1152
1153        // starts with magic
1154        assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1155        // ends with data and sync marker
1156        let last_data_byte = result.len() - 16;
1157        assert_eq!(
1158            &result[last_data_byte - data.len()..last_data_byte],
1159            data.as_slice()
1160        );
1161
1162        Ok(())
1163    }
1164
1165    #[test]
1166    fn test_writer_extend_ser() -> TestResult {
1167        let schema = Schema::parse_str(SCHEMA)?;
1168        let mut writer = Writer::new(&schema, Vec::new())?;
1169
1170        let record = TestSerdeSerialize {
1171            a: 27,
1172            b: "foo".to_owned(),
1173        };
1174        let record_copy = record.clone();
1175        let records = vec![record, record_copy];
1176
1177        let n1 = writer.extend_ser(records)?;
1178        let n2 = writer.flush()?;
1179        let result = writer.into_inner()?;
1180
1181        assert_eq!(n1 + n2, result.len());
1182
1183        let mut data = Vec::new();
1184        zig_i64(27, &mut data)?;
1185        zig_i64(3, &mut data)?;
1186        data.extend(b"foo");
1187        data.extend(data.clone());
1188
1189        // starts with magic
1190        assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1191        // ends with data and sync marker
1192        let last_data_byte = result.len() - 16;
1193        assert_eq!(
1194            &result[last_data_byte - data.len()..last_data_byte],
1195            data.as_slice()
1196        );
1197
1198        Ok(())
1199    }
1200
1201    fn make_writer_with_codec(schema: &Schema) -> AvroResult<Writer<'_, Vec<u8>>> {
1202        Writer::with_codec(
1203            schema,
1204            Vec::new(),
1205            Codec::Deflate(DeflateSettings::default()),
1206        )
1207    }
1208
1209    fn make_writer_with_builder(schema: &Schema) -> AvroResult<Writer<'_, Vec<u8>>> {
1210        Writer::builder()
1211            .writer(Vec::new())
1212            .schema(schema)
1213            .codec(Codec::Deflate(DeflateSettings::default()))
1214            .block_size(100)
1215            .build()
1216    }
1217
1218    fn check_writer(mut writer: Writer<'_, Vec<u8>>, schema: &Schema) -> TestResult {
1219        let mut record = Record::new(schema).unwrap();
1220        record.put("a", 27i64);
1221        record.put("b", "foo");
1222
1223        let n1 = writer.append(record.clone())?;
1224        let n2 = writer.append(record.clone())?;
1225        let n3 = writer.flush()?;
1226        let result = writer.into_inner()?;
1227
1228        assert_eq!(n1 + n2 + n3, result.len());
1229
1230        let mut data = Vec::new();
1231        zig_i64(27, &mut data)?;
1232        zig_i64(3, &mut data)?;
1233        data.extend(b"foo");
1234        data.extend(data.clone());
1235        Codec::Deflate(DeflateSettings::default()).compress(&mut data)?;
1236
1237        // starts with magic
1238        assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1239        // ends with data and sync marker
1240        let last_data_byte = result.len() - 16;
1241        assert_eq!(
1242            &result[last_data_byte - data.len()..last_data_byte],
1243            data.as_slice()
1244        );
1245
1246        Ok(())
1247    }
1248
1249    #[test]
1250    fn test_writer_with_codec() -> TestResult {
1251        let schema = Schema::parse_str(SCHEMA)?;
1252        let writer = make_writer_with_codec(&schema)?;
1253        check_writer(writer, &schema)
1254    }
1255
1256    #[test]
1257    fn test_writer_with_builder() -> TestResult {
1258        let schema = Schema::parse_str(SCHEMA)?;
1259        let writer = make_writer_with_builder(&schema)?;
1260        check_writer(writer, &schema)
1261    }
1262
1263    #[test]
1264    fn test_logical_writer() -> TestResult {
1265        const LOGICAL_TYPE_SCHEMA: &str = r#"
1266        {
1267          "type": "record",
1268          "name": "logical_type_test",
1269          "fields": [
1270            {
1271              "name": "a",
1272              "type": [
1273                "null",
1274                {
1275                  "type": "long",
1276                  "logicalType": "timestamp-micros"
1277                }
1278              ]
1279            }
1280          ]
1281        }
1282        "#;
1283        let codec = Codec::Deflate(DeflateSettings::default());
1284        let schema = Schema::parse_str(LOGICAL_TYPE_SCHEMA)?;
1285        let mut writer = Writer::builder()
1286            .schema(&schema)
1287            .codec(codec)
1288            .writer(Vec::new())
1289            .build()?;
1290
1291        let mut record1 = Record::new(&schema).unwrap();
1292        record1.put(
1293            "a",
1294            Value::Union(1, Box::new(Value::TimestampMicros(1234_i64))),
1295        );
1296
1297        let mut record2 = Record::new(&schema).unwrap();
1298        record2.put("a", Value::Union(0, Box::new(Value::Null)));
1299
1300        let n1 = writer.append(record1)?;
1301        let n2 = writer.append(record2)?;
1302        let n3 = writer.flush()?;
1303        let result = writer.into_inner()?;
1304
1305        assert_eq!(n1 + n2 + n3, result.len());
1306
1307        let mut data = Vec::new();
1308        // byte indicating not null
1309        zig_i64(1, &mut data)?;
1310        zig_i64(1234, &mut data)?;
1311
1312        // byte indicating null
1313        zig_i64(0, &mut data)?;
1314        codec.compress(&mut data)?;
1315
1316        // starts with magic
1317        assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1318        // ends with data and sync marker
1319        let last_data_byte = result.len() - 16;
1320        assert_eq!(
1321            &result[last_data_byte - data.len()..last_data_byte],
1322            data.as_slice()
1323        );
1324
1325        Ok(())
1326    }
1327
1328    #[test]
1329    fn test_avro_3405_writer_add_metadata_success() -> TestResult {
1330        let schema = Schema::parse_str(SCHEMA)?;
1331        let mut writer = Writer::new(&schema, Vec::new())?;
1332
1333        writer.add_user_metadata("stringKey".to_string(), String::from("stringValue"))?;
1334        writer.add_user_metadata("strKey".to_string(), "strValue")?;
1335        writer.add_user_metadata("bytesKey".to_string(), b"bytesValue")?;
1336        writer.add_user_metadata("vecKey".to_string(), vec![1, 2, 3])?;
1337
1338        let mut record = Record::new(&schema).unwrap();
1339        record.put("a", 27i64);
1340        record.put("b", "foo");
1341
1342        writer.append(record.clone())?;
1343        writer.append(record.clone())?;
1344        writer.flush()?;
1345        let result = writer.into_inner()?;
1346
1347        assert_eq!(result.len(), 244);
1348
1349        Ok(())
1350    }
1351
1352    #[test]
1353    fn test_avro_3881_metadata_empty_body() -> TestResult {
1354        let schema = Schema::parse_str(SCHEMA)?;
1355        let mut writer = Writer::new(&schema, Vec::new())?;
1356        writer.add_user_metadata("a".to_string(), "b")?;
1357        let result = writer.into_inner()?;
1358
1359        let reader = Reader::with_schema(&schema, &result[..])?;
1360        let mut expected = HashMap::new();
1361        expected.insert("a".to_string(), vec![b'b']);
1362        assert_eq!(reader.user_metadata(), &expected);
1363        assert_eq!(reader.into_iter().count(), 0);
1364
1365        Ok(())
1366    }
1367
1368    #[test]
1369    fn test_avro_3405_writer_add_metadata_failure() -> TestResult {
1370        let schema = Schema::parse_str(SCHEMA)?;
1371        let mut writer = Writer::new(&schema, Vec::new())?;
1372
1373        let mut record = Record::new(&schema).unwrap();
1374        record.put("a", 27i64);
1375        record.put("b", "foo");
1376        writer.append(record.clone())?;
1377
1378        match writer
1379            .add_user_metadata("stringKey".to_string(), String::from("value2"))
1380            .map_err(Error::into_details)
1381        {
1382            Err(e @ Details::FileHeaderAlreadyWritten) => {
1383                assert_eq!(e.to_string(), "The file metadata is already flushed.")
1384            }
1385            Err(e) => panic!("Unexpected error occurred while writing user metadata: {e:?}"),
1386            Ok(_) => panic!("Expected an error that metadata cannot be added after adding data"),
1387        }
1388
1389        Ok(())
1390    }
1391
1392    #[test]
1393    fn test_avro_3405_writer_add_metadata_reserved_prefix_failure() -> TestResult {
1394        let schema = Schema::parse_str(SCHEMA)?;
1395        let mut writer = Writer::new(&schema, Vec::new())?;
1396
1397        let key = "avro.stringKey".to_string();
1398        match writer
1399            .add_user_metadata(key.clone(), "value")
1400            .map_err(Error::into_details)
1401        {
1402            Err(ref e @ Details::InvalidMetadataKey(_)) => {
1403                assert_eq!(
1404                    e.to_string(),
1405                    format!(
1406                        "Metadata keys starting with 'avro.' are reserved for internal usage: {key}."
1407                    )
1408                )
1409            }
1410            Err(e) => panic!(
1411                "Unexpected error occurred while writing user metadata with reserved prefix ('avro.'): {e:?}"
1412            ),
1413            Ok(_) => {
1414                panic!("Expected an error that the metadata key cannot be prefixed with 'avro.'")
1415            }
1416        }
1417
1418        Ok(())
1419    }
1420
1421    #[test]
1422    fn test_avro_3405_writer_add_metadata_with_builder_api_success() -> TestResult {
1423        let schema = Schema::parse_str(SCHEMA)?;
1424
1425        let mut user_meta_data: HashMap<String, Value> = HashMap::new();
1426        user_meta_data.insert(
1427            "stringKey".to_string(),
1428            Value::String("stringValue".to_string()),
1429        );
1430        user_meta_data.insert("bytesKey".to_string(), Value::Bytes(b"bytesValue".to_vec()));
1431        user_meta_data.insert("vecKey".to_string(), Value::Bytes(vec![1, 2, 3]));
1432
1433        let writer: Writer<'_, Vec<u8>> = Writer::builder()
1434            .writer(Vec::new())
1435            .schema(&schema)
1436            .user_metadata(user_meta_data.clone())
1437            .build()?;
1438
1439        assert_eq!(writer.user_metadata, user_meta_data);
1440
1441        Ok(())
1442    }
1443
1444    #[derive(Serialize, Clone)]
1445    struct TestSingleObjectWriter {
1446        a: i64,
1447        b: f64,
1448        c: Vec<String>,
1449    }
1450
1451    impl AvroSchema for TestSingleObjectWriter {
1452        fn get_schema() -> Schema {
1453            let schema = r#"
1454            {
1455                "type":"record",
1456                "name":"TestSingleObjectWrtierSerialize",
1457                "fields":[
1458                    {
1459                        "name":"a",
1460                        "type":"long"
1461                    },
1462                    {
1463                        "name":"b",
1464                        "type":"double"
1465                    },
1466                    {
1467                        "name":"c",
1468                        "type":{
1469                            "type":"array",
1470                            "items":"string"
1471                        }
1472                    }
1473                ]
1474            }
1475            "#;
1476            Schema::parse_str(schema).unwrap()
1477        }
1478    }
1479
1480    impl From<TestSingleObjectWriter> for Value {
1481        fn from(obj: TestSingleObjectWriter) -> Value {
1482            Value::Record(vec![
1483                ("a".into(), obj.a.into()),
1484                ("b".into(), obj.b.into()),
1485                (
1486                    "c".into(),
1487                    Value::Array(obj.c.into_iter().map(|s| s.into()).collect()),
1488                ),
1489            ])
1490        }
1491    }
1492
1493    #[test]
1494    fn test_single_object_writer() -> TestResult {
1495        let mut buf: Vec<u8> = Vec::new();
1496        let obj = TestSingleObjectWriter {
1497            a: 300,
1498            b: 34.555,
1499            c: vec!["cat".into(), "dog".into()],
1500        };
1501        let mut writer = GenericSingleObjectWriter::new_with_capacity(
1502            &TestSingleObjectWriter::get_schema(),
1503            1024,
1504        )
1505        .expect("Should resolve schema");
1506        let value = obj.into();
1507        let written_bytes = writer
1508            .write_value_ref(&value, &mut buf)
1509            .expect("Error serializing properly");
1510
1511        assert!(buf.len() > 10, "no bytes written");
1512        assert_eq!(buf.len(), written_bytes);
1513        assert_eq!(buf[0], 0xC3);
1514        assert_eq!(buf[1], 0x01);
1515        assert_eq!(
1516            &buf[2..10],
1517            &TestSingleObjectWriter::get_schema()
1518                .fingerprint::<Rabin>()
1519                .bytes[..]
1520        );
1521        let mut msg_binary = Vec::new();
1522        encode(
1523            &value,
1524            &TestSingleObjectWriter::get_schema(),
1525            &mut msg_binary,
1526        )
1527        .expect("encode should have failed by here as a dependency of any writing");
1528        assert_eq!(&buf[10..], &msg_binary[..]);
1529
1530        Ok(())
1531    }
1532
1533    #[test]
1534    fn test_single_object_writer_with_header_builder() -> TestResult {
1535        let mut buf: Vec<u8> = Vec::new();
1536        let obj = TestSingleObjectWriter {
1537            a: 300,
1538            b: 34.555,
1539            c: vec!["cat".into(), "dog".into()],
1540        };
1541        let schema_uuid = Uuid::parse_str("b2f1cf00-0434-013e-439a-125eb8485a5f")?;
1542        let header_builder = GlueSchemaUuidHeader::from_uuid(schema_uuid);
1543        let mut writer = GenericSingleObjectWriter::new_with_capacity_and_header_builder(
1544            &TestSingleObjectWriter::get_schema(),
1545            1024,
1546            header_builder,
1547        )
1548        .expect("Should resolve schema");
1549        let value = obj.into();
1550        writer
1551            .write_value_ref(&value, &mut buf)
1552            .expect("Error serializing properly");
1553
1554        assert_eq!(buf[0], 0x03);
1555        assert_eq!(buf[1], 0x00);
1556        assert_eq!(buf[2..18], schema_uuid.into_bytes()[..]);
1557        Ok(())
1558    }
1559
1560    #[test]
1561    fn test_writer_parity() -> TestResult {
1562        let obj1 = TestSingleObjectWriter {
1563            a: 300,
1564            b: 34.555,
1565            c: vec!["cat".into(), "dog".into()],
1566        };
1567
1568        let mut buf1: Vec<u8> = Vec::new();
1569        let mut buf2: Vec<u8> = Vec::new();
1570        let mut buf3: Vec<u8> = Vec::new();
1571
1572        let mut generic_writer = GenericSingleObjectWriter::new_with_capacity(
1573            &TestSingleObjectWriter::get_schema(),
1574            1024,
1575        )
1576        .expect("Should resolve schema");
1577        let mut specific_writer =
1578            SpecificSingleObjectWriter::<TestSingleObjectWriter>::with_capacity(1024)
1579                .expect("Resolved should pass");
1580        specific_writer
1581            .write(obj1.clone(), &mut buf1)
1582            .expect("Serialization expected");
1583        specific_writer
1584            .write_value(obj1.clone(), &mut buf2)
1585            .expect("Serialization expected");
1586        generic_writer
1587            .write_value(obj1.into(), &mut buf3)
1588            .expect("Serialization expected");
1589        assert_eq!(buf1, buf2);
1590        assert_eq!(buf1, buf3);
1591
1592        Ok(())
1593    }
1594
1595    #[test]
1596    fn avro_3894_take_aliases_into_account_when_serializing() -> TestResult {
1597        const SCHEMA: &str = r#"
1598  {
1599      "type": "record",
1600      "name": "Conference",
1601      "fields": [
1602          {"type": "string", "name": "name"},
1603          {"type": ["null", "long"], "name": "date", "aliases" : [ "time2", "time" ]}
1604      ]
1605  }"#;
1606
1607        #[derive(Debug, PartialEq, Eq, Clone, Serialize)]
1608        pub struct Conference {
1609            pub name: String,
1610            pub time: Option<i64>,
1611        }
1612
1613        let conf = Conference {
1614            name: "RustConf".to_string(),
1615            time: Some(1234567890),
1616        };
1617
1618        let schema = Schema::parse_str(SCHEMA)?;
1619        let mut writer = Writer::new(&schema, Vec::new())?;
1620
1621        let bytes = writer.append_ser(conf)?;
1622
1623        assert_eq!(182, bytes);
1624
1625        Ok(())
1626    }
1627
1628    #[test]
1629    fn avro_4014_validation_returns_a_detailed_error() -> TestResult {
1630        const SCHEMA: &str = r#"
1631  {
1632      "type": "record",
1633      "name": "Conference",
1634      "fields": [
1635          {"type": "string", "name": "name"},
1636          {"type": ["null", "long"], "name": "date", "aliases" : [ "time2", "time" ]}
1637      ]
1638  }"#;
1639
1640        #[derive(Debug, PartialEq, Clone, Serialize)]
1641        pub struct Conference {
1642            pub name: String,
1643            pub time: Option<f64>, // wrong type: f64 instead of i64
1644        }
1645
1646        let conf = Conference {
1647            name: "RustConf".to_string(),
1648            time: Some(12345678.90),
1649        };
1650
1651        let schema = Schema::parse_str(SCHEMA)?;
1652        let mut writer = Writer::new(&schema, Vec::new())?;
1653
1654        match writer.append_ser(conf) {
1655            Ok(bytes) => panic!("Expected an error, but got {bytes} bytes written"),
1656            Err(e) => {
1657                assert_eq!(
1658                    e.to_string(),
1659                    r#"Failed to serialize field 'time' for record Record(RecordSchema { name: Name { name: "Conference", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "name", doc: None, aliases: None, default: None, schema: String, order: Ascending, position: 0, custom_attributes: {} }, RecordField { name: "date", doc: None, aliases: Some(["time2", "time"]), default: None, schema: Union(UnionSchema { schemas: [Null, Long], variant_index: {Null: 0, Long: 1} }), order: Ascending, position: 1, custom_attributes: {} }], lookup: {"date": 1, "name": 0, "time": 1, "time2": 1}, attributes: {} }): Failed to serialize value of type f64 using schema Union(UnionSchema { schemas: [Null, Long], variant_index: {Null: 0, Long: 1} }): 12345678.9. Cause: Cannot find a Double schema in [Null, Long]"#
1660                );
1661            }
1662        }
1663        Ok(())
1664    }
1665
1666    #[test]
1667    fn avro_4063_flush_applies_to_inner_writer() -> TestResult {
1668        const SCHEMA: &str = r#"
1669        {
1670            "type": "record",
1671            "name": "ExampleSchema",
1672            "fields": [
1673                {"name": "exampleField", "type": "string"}
1674            ]
1675        }
1676        "#;
1677
1678        #[derive(Clone, Default)]
1679        struct TestBuffer(Rc<RefCell<Vec<u8>>>);
1680
1681        impl TestBuffer {
1682            fn len(&self) -> usize {
1683                self.0.borrow().len()
1684            }
1685        }
1686
1687        impl Write for TestBuffer {
1688            fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
1689                self.0.borrow_mut().write(buf)
1690            }
1691
1692            fn flush(&mut self) -> std::io::Result<()> {
1693                Ok(())
1694            }
1695        }
1696
1697        let shared_buffer = TestBuffer::default();
1698
1699        let buffered_writer = std::io::BufWriter::new(shared_buffer.clone());
1700
1701        let schema = Schema::parse_str(SCHEMA)?;
1702
1703        let mut writer = Writer::new(&schema, buffered_writer)?;
1704
1705        let mut record = Record::new(writer.schema()).unwrap();
1706        record.put("exampleField", "value");
1707
1708        writer.append(record)?;
1709        writer.flush()?;
1710
1711        assert_eq!(
1712            shared_buffer.len(),
1713            151,
1714            "the test buffer was not fully written to after Writer::flush was called"
1715        );
1716
1717        Ok(())
1718    }
1719}