apache_avro/
writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Logic handling writing in Avro format at user level.
19use crate::{
20    AvroResult, Codec, Error,
21    encode::{encode, encode_internal, encode_to_vec},
22    error::Details,
23    headers::{HeaderBuilder, RabinFingerprintHeader},
24    schema::{AvroSchema, Name, ResolvedOwnedSchema, ResolvedSchema, Schema},
25    serde::ser_schema::SchemaAwareWriteSerializer,
26    types::Value,
27};
28use serde::Serialize;
29use std::{
30    collections::HashMap, io::Write, marker::PhantomData, mem::ManuallyDrop, ops::RangeInclusive,
31};
32
33const DEFAULT_BLOCK_SIZE: usize = 16000;
34const AVRO_OBJECT_HEADER: &[u8] = b"Obj\x01";
35
36/// Main interface for writing Avro formatted values.
37///
38/// It is critical to call flush before `Writer<W>` is dropped. Though dropping will attempt to flush
39/// the contents of the buffer, any errors that happen in the process of dropping will be ignored.
40/// Calling flush ensures that the buffer is empty and thus dropping will not even attempt file operations.
41pub struct Writer<'a, W: Write> {
42    schema: &'a Schema,
43    writer: W,
44    resolved_schema: ResolvedSchema<'a>,
45    codec: Codec,
46    block_size: usize,
47    buffer: Vec<u8>,
48    num_values: usize,
49    marker: [u8; 16],
50    has_header: bool,
51    user_metadata: HashMap<String, Value>,
52}
53
54#[bon::bon]
55impl<'a, W: Write> Writer<'a, W> {
56    #[builder]
57    pub fn builder(
58        schema: &'a Schema,
59        schemata: Option<Vec<&'a Schema>>,
60        writer: W,
61        #[builder(default = Codec::Null)] codec: Codec,
62        #[builder(default = DEFAULT_BLOCK_SIZE)] block_size: usize,
63        #[builder(default = generate_sync_marker())] marker: [u8; 16],
64        /// Has the header already been written.
65        ///
66        /// To disable writing the header, this can be set to `true`.
67        #[builder(default = false)]
68        has_header: bool,
69        #[builder(default)] user_metadata: HashMap<String, Value>,
70    ) -> AvroResult<Self> {
71        let resolved_schema = if let Some(schemata) = schemata {
72            ResolvedSchema::try_from(schemata)?
73        } else {
74            ResolvedSchema::try_from(schema)?
75        };
76        Ok(Self {
77            schema,
78            writer,
79            resolved_schema,
80            codec,
81            block_size,
82            buffer: Vec::with_capacity(block_size),
83            num_values: 0,
84            marker,
85            has_header,
86            user_metadata,
87        })
88    }
89}
90
91impl<'a, W: Write> Writer<'a, W> {
92    /// Creates a `Writer` given a `Schema` and something implementing the `io::Write` trait to write
93    /// to.
94    /// No compression `Codec` will be used.
95    pub fn new(schema: &'a Schema, writer: W) -> AvroResult<Self> {
96        Writer::with_codec(schema, writer, Codec::Null)
97    }
98
99    /// Creates a `Writer` with a specific `Codec` given a `Schema` and something implementing the
100    /// `io::Write` trait to write to.
101    pub fn with_codec(schema: &'a Schema, writer: W, codec: Codec) -> AvroResult<Self> {
102        Self::builder()
103            .schema(schema)
104            .writer(writer)
105            .codec(codec)
106            .build()
107    }
108
109    /// Creates a `Writer` with a specific `Codec` given a `Schema` and something implementing the
110    /// `io::Write` trait to write to.
111    /// If the `schema` is incomplete, i.e. contains `Schema::Ref`s then all dependencies must
112    /// be provided in `schemata`.
113    pub fn with_schemata(
114        schema: &'a Schema,
115        schemata: Vec<&'a Schema>,
116        writer: W,
117        codec: Codec,
118    ) -> AvroResult<Self> {
119        Self::builder()
120            .schema(schema)
121            .schemata(schemata)
122            .writer(writer)
123            .codec(codec)
124            .build()
125    }
126
127    /// Creates a `Writer` that will append values to already populated
128    /// `std::io::Write` using the provided `marker`
129    /// No compression `Codec` will be used.
130    pub fn append_to(schema: &'a Schema, writer: W, marker: [u8; 16]) -> AvroResult<Self> {
131        Writer::append_to_with_codec(schema, writer, Codec::Null, marker)
132    }
133
134    /// Creates a `Writer` that will append values to already populated
135    /// `std::io::Write` using the provided `marker`
136    pub fn append_to_with_codec(
137        schema: &'a Schema,
138        writer: W,
139        codec: Codec,
140        marker: [u8; 16],
141    ) -> AvroResult<Self> {
142        Self::builder()
143            .schema(schema)
144            .writer(writer)
145            .codec(codec)
146            .marker(marker)
147            .has_header(true)
148            .build()
149    }
150
151    /// Creates a `Writer` that will append values to already populated
152    /// `std::io::Write` using the provided `marker`
153    pub fn append_to_with_codec_schemata(
154        schema: &'a Schema,
155        schemata: Vec<&'a Schema>,
156        writer: W,
157        codec: Codec,
158        marker: [u8; 16],
159    ) -> AvroResult<Self> {
160        Self::builder()
161            .schema(schema)
162            .schemata(schemata)
163            .writer(writer)
164            .codec(codec)
165            .marker(marker)
166            .has_header(true)
167            .build()
168    }
169
170    /// Get a reference to the `Schema` associated to a `Writer`.
171    pub fn schema(&self) -> &'a Schema {
172        self.schema
173    }
174
175    /// Append a compatible value (implementing the `ToAvro` trait) to a `Writer`, also performing
176    /// schema validation.
177    ///
178    /// Returns the number of bytes written (it might be 0, see below).
179    ///
180    /// **NOTE**: This function is not guaranteed to perform any actual write, since it relies on
181    /// internal buffering for performance reasons. If you want to be sure the value has been
182    /// written, then call [`flush`](Writer::flush).
183    pub fn append<T: Into<Value>>(&mut self, value: T) -> AvroResult<usize> {
184        let n = self.maybe_write_header()?;
185
186        let avro = value.into();
187        self.append_value_ref(&avro).map(|m| m + n)
188    }
189
190    /// Append a compatible value to a `Writer`, also performing schema validation.
191    ///
192    /// Returns the number of bytes written (it might be 0, see below).
193    ///
194    /// **NOTE**: This function is not guaranteed to perform any actual write, since it relies on
195    /// internal buffering for performance reasons. If you want to be sure the value has been
196    /// written, then call [`flush`](Writer::flush).
197    pub fn append_value_ref(&mut self, value: &Value) -> AvroResult<usize> {
198        let n = self.maybe_write_header()?;
199
200        write_value_ref_resolved(self.schema, &self.resolved_schema, value, &mut self.buffer)?;
201        self.num_values += 1;
202
203        if self.buffer.len() >= self.block_size {
204            return self.flush().map(|b| b + n);
205        }
206
207        Ok(n)
208    }
209
210    /// Append anything implementing the `Serialize` trait to a `Writer` for
211    /// [`serde`](https://docs.serde.rs/serde/index.html) compatibility, also performing schema
212    /// validation.
213    ///
214    /// Returns the number of bytes written.
215    ///
216    /// **NOTE**: This function is not guaranteed to perform any actual write, since it relies on
217    /// internal buffering for performance reasons. If you want to be sure the value has been
218    /// written, then call [`flush`](Writer::flush).
219    pub fn append_ser<S: Serialize>(&mut self, value: S) -> AvroResult<usize> {
220        let n = self.maybe_write_header()?;
221
222        let mut serializer = SchemaAwareWriteSerializer::new(
223            &mut self.buffer,
224            self.schema,
225            self.resolved_schema.get_names(),
226            None,
227        );
228        value.serialize(&mut serializer)?;
229        self.num_values += 1;
230
231        if self.buffer.len() >= self.block_size {
232            return self.flush().map(|b| b + n);
233        }
234
235        Ok(n)
236    }
237
238    /// Extend a `Writer` with an `Iterator` of compatible values (implementing the `ToAvro`
239    /// trait), also performing schema validation.
240    ///
241    /// Returns the number of bytes written.
242    ///
243    /// **NOTE**: This function forces the written data to be flushed (an implicit
244    /// call to [`flush`](Writer::flush) is performed).
245    pub fn extend<I, T: Into<Value>>(&mut self, values: I) -> AvroResult<usize>
246    where
247        I: IntoIterator<Item = T>,
248    {
249        /*
250        https://github.com/rust-lang/rfcs/issues/811 :(
251        let mut stream = values
252            .filter_map(|value| value.serialize(&mut self.serializer).ok())
253            .map(|value| value.encode(self.schema))
254            .collect::<Option<Vec<_>>>()
255            .ok_or_else(|| err_msg("value does not match given schema"))?
256            .into_iter()
257            .fold(Vec::new(), |mut acc, stream| {
258                num_values += 1;
259                acc.extend(stream); acc
260            });
261        */
262
263        let mut num_bytes = 0;
264        for value in values {
265            num_bytes += self.append(value)?;
266        }
267        num_bytes += self.flush()?;
268
269        Ok(num_bytes)
270    }
271
272    /// Extend a `Writer` with an `Iterator` of anything implementing the `Serialize` trait for
273    /// [`serde`](https://docs.serde.rs/serde/index.html) compatibility, also performing schema
274    /// validation.
275    ///
276    /// Returns the number of bytes written.
277    ///
278    /// **NOTE**: This function forces the written data to be flushed (an implicit
279    /// call to [`flush`](Writer::flush) is performed).
280    pub fn extend_ser<I, T: Serialize>(&mut self, values: I) -> AvroResult<usize>
281    where
282        I: IntoIterator<Item = T>,
283    {
284        /*
285        https://github.com/rust-lang/rfcs/issues/811 :(
286        let mut stream = values
287            .filter_map(|value| value.serialize(&mut self.serializer).ok())
288            .map(|value| value.encode(self.schema))
289            .collect::<Option<Vec<_>>>()
290            .ok_or_else(|| err_msg("value does not match given schema"))?
291            .into_iter()
292            .fold(Vec::new(), |mut acc, stream| {
293                num_values += 1;
294                acc.extend(stream); acc
295            });
296        */
297
298        let mut num_bytes = 0;
299        for value in values {
300            num_bytes += self.append_ser(value)?;
301        }
302        num_bytes += self.flush()?;
303
304        Ok(num_bytes)
305    }
306
307    /// Extend a `Writer` by appending each `Value` from a slice, while also performing schema
308    /// validation on each value appended.
309    ///
310    /// Returns the number of bytes written.
311    ///
312    /// **NOTE**: This function forces the written data to be flushed (an implicit
313    /// call to [`flush`](Writer::flush) is performed).
314    pub fn extend_from_slice(&mut self, values: &[Value]) -> AvroResult<usize> {
315        let mut num_bytes = 0;
316        for value in values {
317            num_bytes += self.append_value_ref(value)?;
318        }
319        num_bytes += self.flush()?;
320
321        Ok(num_bytes)
322    }
323
324    /// Flush the content to the inner `Writer`.
325    ///
326    /// Call this function to make sure all the content has been written before releasing the `Writer`.
327    /// This will also write the header if it wasn't written yet and hasn't been disabled using
328    /// [`WriterBuilder::has_header`].
329    ///
330    /// Returns the number of bytes written.
331    pub fn flush(&mut self) -> AvroResult<usize> {
332        let mut num_bytes = self.maybe_write_header()?;
333        if self.num_values == 0 {
334            return Ok(num_bytes);
335        }
336
337        self.codec.compress(&mut self.buffer)?;
338
339        let num_values = self.num_values;
340        let stream_len = self.buffer.len();
341
342        num_bytes += self.append_raw(&num_values.into(), &Schema::Long)?
343            + self.append_raw(&stream_len.into(), &Schema::Long)?
344            + self
345                .writer
346                .write(self.buffer.as_ref())
347                .map_err(Details::WriteBytes)?
348            + self.append_marker()?;
349
350        self.buffer.clear();
351        self.num_values = 0;
352
353        self.writer.flush().map_err(Details::FlushWriter)?;
354
355        Ok(num_bytes)
356    }
357
358    /// Return what the `Writer` is writing to, consuming the `Writer` itself.
359    ///
360    /// **NOTE**: This function forces the written data to be flushed (an implicit
361    /// call to [`flush`](Writer::flush) is performed).
362    pub fn into_inner(mut self) -> AvroResult<W> {
363        self.maybe_write_header()?;
364        self.flush()?;
365
366        let mut this = ManuallyDrop::new(self);
367
368        // Extract every member that is not Copy and therefore should be dropped
369        let _buffer = std::mem::take(&mut this.buffer);
370        let _user_metadata = std::mem::take(&mut this.user_metadata);
371        // SAFETY: resolved schema is not accessed after this and won't be dropped because of ManuallyDrop
372        unsafe { std::ptr::drop_in_place(&mut this.resolved_schema) };
373
374        // SAFETY: double-drops are prevented by putting `this` in a ManuallyDrop that is never dropped
375        let writer = unsafe { std::ptr::read(&this.writer) };
376
377        Ok(writer)
378    }
379
380    /// Gets a reference to the underlying writer.
381    ///
382    /// **NOTE**: There is likely data still in the buffer. To have all the data
383    /// in the writer call [`flush`](Writer::flush) first.
384    pub fn get_ref(&self) -> &W {
385        &self.writer
386    }
387
388    /// Gets a mutable reference to the underlying writer.
389    ///
390    /// It is inadvisable to directly write to the underlying writer.
391    ///
392    /// **NOTE**: There is likely data still in the buffer. To have all the data
393    /// in the writer call [`flush`](Writer::flush) first.
394    pub fn get_mut(&mut self) -> &mut W {
395        &mut self.writer
396    }
397
398    /// Generate and append synchronization marker to the payload.
399    fn append_marker(&mut self) -> AvroResult<usize> {
400        // using .writer.write directly to avoid mutable borrow of self
401        // with ref borrowing of self.marker
402        self.writer
403            .write(&self.marker)
404            .map_err(|e| Details::WriteMarker(e).into())
405    }
406
407    /// Append a raw Avro Value to the payload avoiding to encode it again.
408    fn append_raw(&mut self, value: &Value, schema: &Schema) -> AvroResult<usize> {
409        self.append_bytes(encode_to_vec(value, schema)?.as_ref())
410    }
411
412    /// Append pure bytes to the payload.
413    fn append_bytes(&mut self, bytes: &[u8]) -> AvroResult<usize> {
414        self.writer
415            .write(bytes)
416            .map_err(|e| Details::WriteBytes(e).into())
417    }
418
419    /// Adds custom metadata to the file.
420    /// This method could be used only before adding the first record to the writer.
421    pub fn add_user_metadata<T: AsRef<[u8]>>(&mut self, key: String, value: T) -> AvroResult<()> {
422        if !self.has_header {
423            if key.starts_with("avro.") {
424                return Err(Details::InvalidMetadataKey(key).into());
425            }
426            self.user_metadata
427                .insert(key, Value::Bytes(value.as_ref().to_vec()));
428            Ok(())
429        } else {
430            Err(Details::FileHeaderAlreadyWritten.into())
431        }
432    }
433
434    /// Create an Avro header based on schema, codec and sync marker.
435    fn header(&self) -> Result<Vec<u8>, Error> {
436        let schema_bytes = serde_json::to_string(self.schema)
437            .map_err(Details::ConvertJsonToString)?
438            .into_bytes();
439
440        let mut metadata = HashMap::with_capacity(2);
441        metadata.insert("avro.schema", Value::Bytes(schema_bytes));
442        if self.codec != Codec::Null {
443            metadata.insert("avro.codec", self.codec.into());
444        }
445        match self.codec {
446            #[cfg(feature = "bzip")]
447            Codec::Bzip2(settings) => {
448                metadata.insert(
449                    "avro.codec.compression_level",
450                    Value::Bytes(vec![settings.compression_level]),
451                );
452            }
453            #[cfg(feature = "xz")]
454            Codec::Xz(settings) => {
455                metadata.insert(
456                    "avro.codec.compression_level",
457                    Value::Bytes(vec![settings.compression_level]),
458                );
459            }
460            #[cfg(feature = "zstandard")]
461            Codec::Zstandard(settings) => {
462                metadata.insert(
463                    "avro.codec.compression_level",
464                    Value::Bytes(vec![settings.compression_level]),
465                );
466            }
467            _ => {}
468        }
469
470        for (k, v) in &self.user_metadata {
471            metadata.insert(k.as_str(), v.clone());
472        }
473
474        let mut header = Vec::new();
475        header.extend_from_slice(AVRO_OBJECT_HEADER);
476        encode(&metadata.into(), &Schema::map(Schema::Bytes), &mut header)?;
477        header.extend_from_slice(&self.marker);
478
479        Ok(header)
480    }
481
482    fn maybe_write_header(&mut self) -> AvroResult<usize> {
483        if !self.has_header {
484            let header = self.header()?;
485            let n = self.append_bytes(header.as_ref())?;
486            self.has_header = true;
487            Ok(n)
488        } else {
489            Ok(0)
490        }
491    }
492}
493
494impl<W: Write> Drop for Writer<'_, W> {
495    /// Drop the writer, will try to flush ignoring any errors.
496    fn drop(&mut self) {
497        let _ = self.maybe_write_header();
498        let _ = self.flush();
499    }
500}
501
502/// Encode a compatible value (implementing the `ToAvro` trait) into Avro format, also performing
503/// schema validation.
504///
505/// This is an internal function which gets the bytes buffer where to write as parameter instead of
506/// creating a new one like `to_avro_datum`.
507fn write_avro_datum<T: Into<Value>, W: Write>(
508    schema: &Schema,
509    value: T,
510    writer: &mut W,
511) -> Result<(), Error> {
512    let avro = value.into();
513    if !avro.validate(schema) {
514        return Err(Details::Validation.into());
515    }
516    encode(&avro, schema, writer)?;
517    Ok(())
518}
519
520fn write_avro_datum_schemata<T: Into<Value>>(
521    schema: &Schema,
522    schemata: Vec<&Schema>,
523    value: T,
524    buffer: &mut Vec<u8>,
525) -> AvroResult<usize> {
526    let avro = value.into();
527    let rs = ResolvedSchema::try_from(schemata)?;
528    let names = rs.get_names();
529    let enclosing_namespace = schema.namespace();
530    if let Some(_err) = avro.validate_internal(schema, names, &enclosing_namespace) {
531        return Err(Details::Validation.into());
532    }
533    encode_internal(&avro, schema, names, &enclosing_namespace, buffer)
534}
535
536/// Writer that encodes messages according to the single object encoding v1 spec
537/// Uses an API similar to the current File Writer
538/// Writes all object bytes at once, and drains internal buffer
539pub struct GenericSingleObjectWriter {
540    buffer: Vec<u8>,
541    resolved: ResolvedOwnedSchema,
542}
543
544impl GenericSingleObjectWriter {
545    pub fn new_with_capacity(
546        schema: &Schema,
547        initial_buffer_cap: usize,
548    ) -> AvroResult<GenericSingleObjectWriter> {
549        let header_builder = RabinFingerprintHeader::from_schema(schema);
550        Self::new_with_capacity_and_header_builder(schema, initial_buffer_cap, header_builder)
551    }
552
553    pub fn new_with_capacity_and_header_builder<HB: HeaderBuilder>(
554        schema: &Schema,
555        initial_buffer_cap: usize,
556        header_builder: HB,
557    ) -> AvroResult<GenericSingleObjectWriter> {
558        let mut buffer = Vec::with_capacity(initial_buffer_cap);
559        let header = header_builder.build_header();
560        buffer.extend_from_slice(&header);
561
562        Ok(GenericSingleObjectWriter {
563            buffer,
564            resolved: ResolvedOwnedSchema::try_from(schema.clone())?,
565        })
566    }
567
568    const HEADER_LENGTH_RANGE: RangeInclusive<usize> = 10_usize..=20_usize;
569
570    /// Write the referenced Value to the provided Write object. Returns a result with the number of bytes written including the header
571    pub fn write_value_ref<W: Write>(&mut self, v: &Value, writer: &mut W) -> AvroResult<usize> {
572        let original_length = self.buffer.len();
573        if !Self::HEADER_LENGTH_RANGE.contains(&original_length) {
574            Err(Details::IllegalSingleObjectWriterState.into())
575        } else {
576            write_value_ref_owned_resolved(&self.resolved, v, &mut self.buffer)?;
577            writer
578                .write_all(&self.buffer)
579                .map_err(Details::WriteBytes)?;
580            let len = self.buffer.len();
581            self.buffer.truncate(original_length);
582            Ok(len)
583        }
584    }
585
586    /// Write the Value to the provided Write object. Returns a result with the number of bytes written including the header
587    pub fn write_value<W: Write>(&mut self, v: Value, writer: &mut W) -> AvroResult<usize> {
588        self.write_value_ref(&v, writer)
589    }
590}
591
592/// Writer that encodes messages according to the single object encoding v1 spec
593pub struct SpecificSingleObjectWriter<T>
594where
595    T: AvroSchema,
596{
597    inner: GenericSingleObjectWriter,
598    schema: Schema,
599    header_written: bool,
600    _model: PhantomData<T>,
601}
602
603impl<T> SpecificSingleObjectWriter<T>
604where
605    T: AvroSchema,
606{
607    pub fn with_capacity(buffer_cap: usize) -> AvroResult<SpecificSingleObjectWriter<T>> {
608        let schema = T::get_schema();
609        Ok(SpecificSingleObjectWriter {
610            inner: GenericSingleObjectWriter::new_with_capacity(&schema, buffer_cap)?,
611            schema,
612            header_written: false,
613            _model: PhantomData,
614        })
615    }
616}
617
618impl<T> SpecificSingleObjectWriter<T>
619where
620    T: AvroSchema + Into<Value>,
621{
622    /// Write the `Into<Value>` to the provided Write object. Returns a result with the number
623    /// of bytes written including the header
624    pub fn write_value<W: Write>(&mut self, data: T, writer: &mut W) -> AvroResult<usize> {
625        let v: Value = data.into();
626        self.inner.write_value_ref(&v, writer)
627    }
628}
629
630impl<T> SpecificSingleObjectWriter<T>
631where
632    T: AvroSchema + Serialize,
633{
634    /// Write the referenced `Serialize` object to the provided `Write` object. Returns a result with
635    /// the number of bytes written including the header
636    pub fn write_ref<W: Write>(&mut self, data: &T, writer: &mut W) -> AvroResult<usize> {
637        let mut bytes_written: usize = 0;
638
639        if !self.header_written {
640            bytes_written += writer
641                .write(self.inner.buffer.as_slice())
642                .map_err(Details::WriteBytes)?;
643            self.header_written = true;
644        }
645
646        bytes_written += write_avro_datum_ref(&self.schema, data, writer)?;
647
648        Ok(bytes_written)
649    }
650
651    /// Write the Serialize object to the provided Write object. Returns a result with the number
652    /// of bytes written including the header
653    pub fn write<W: Write>(&mut self, data: T, writer: &mut W) -> AvroResult<usize> {
654        self.write_ref(&data, writer)
655    }
656}
657
658fn write_value_ref_resolved(
659    schema: &Schema,
660    resolved_schema: &ResolvedSchema,
661    value: &Value,
662    buffer: &mut Vec<u8>,
663) -> AvroResult<usize> {
664    match value.validate_internal(schema, resolved_schema.get_names(), &schema.namespace()) {
665        Some(reason) => Err(Details::ValidationWithReason {
666            value: value.clone(),
667            schema: schema.clone(),
668            reason,
669        }
670        .into()),
671        None => encode_internal(
672            value,
673            schema,
674            resolved_schema.get_names(),
675            &schema.namespace(),
676            buffer,
677        ),
678    }
679}
680
681fn write_value_ref_owned_resolved(
682    resolved_schema: &ResolvedOwnedSchema,
683    value: &Value,
684    buffer: &mut Vec<u8>,
685) -> AvroResult<()> {
686    let root_schema = resolved_schema.get_root_schema();
687    if let Some(reason) = value.validate_internal(
688        root_schema,
689        resolved_schema.get_names(),
690        &root_schema.namespace(),
691    ) {
692        return Err(Details::ValidationWithReason {
693            value: value.clone(),
694            schema: root_schema.clone(),
695            reason,
696        }
697        .into());
698    }
699    encode_internal(
700        value,
701        root_schema,
702        resolved_schema.get_names(),
703        &root_schema.namespace(),
704        buffer,
705    )?;
706    Ok(())
707}
708
709/// Encode a compatible value (implementing the `ToAvro` trait) into Avro format, also
710/// performing schema validation.
711///
712/// **NOTE**: This function has a quite small niche of usage and does NOT generate headers and sync
713/// markers; use [`Writer`] to be fully Avro-compatible if you don't know what
714/// you are doing, instead.
715pub fn to_avro_datum<T: Into<Value>>(schema: &Schema, value: T) -> AvroResult<Vec<u8>> {
716    let mut buffer = Vec::new();
717    write_avro_datum(schema, value, &mut buffer)?;
718    Ok(buffer)
719}
720
721/// Write the referenced [Serialize]able object to the provided [Write] object.
722/// Returns a result with the number of bytes written.
723///
724/// **NOTE**: This function has a quite small niche of usage and does **NOT** generate headers and sync
725/// markers; use [`append_ser`](Writer::append_ser) to be fully Avro-compatible
726/// if you don't know what you are doing, instead.
727pub fn write_avro_datum_ref<T: Serialize, W: Write>(
728    schema: &Schema,
729    data: &T,
730    writer: &mut W,
731) -> AvroResult<usize> {
732    let names: HashMap<Name, &Schema> = HashMap::new();
733    let mut serializer = SchemaAwareWriteSerializer::new(writer, schema, &names, None);
734    let bytes_written = data.serialize(&mut serializer)?;
735    Ok(bytes_written)
736}
737
738/// Encode a compatible value (implementing the `ToAvro` trait) into Avro format, also
739/// performing schema validation.
740/// If the provided `schema` is incomplete then its dependencies must be
741/// provided in `schemata`
742pub fn to_avro_datum_schemata<T: Into<Value>>(
743    schema: &Schema,
744    schemata: Vec<&Schema>,
745    value: T,
746) -> AvroResult<Vec<u8>> {
747    let mut buffer = Vec::new();
748    write_avro_datum_schemata(schema, schemata, value, &mut buffer)?;
749    Ok(buffer)
750}
751
752#[cfg(not(target_arch = "wasm32"))]
753fn generate_sync_marker() -> [u8; 16] {
754    let mut marker = [0_u8; 16];
755    std::iter::repeat_with(rand::random)
756        .take(16)
757        .enumerate()
758        .for_each(|(i, n)| marker[i] = n);
759    marker
760}
761
762#[cfg(target_arch = "wasm32")]
763fn generate_sync_marker() -> [u8; 16] {
764    let mut marker = [0_u8; 16];
765    std::iter::repeat_with(quad_rand::rand)
766        .take(4)
767        .flat_map(|i| i.to_be_bytes())
768        .enumerate()
769        .for_each(|(i, n)| marker[i] = n);
770    marker
771}
772
773#[cfg(test)]
774mod tests {
775    use std::{cell::RefCell, rc::Rc};
776
777    use super::*;
778    use crate::{
779        Reader,
780        decimal::Decimal,
781        duration::{Days, Duration, Millis, Months},
782        headers::GlueSchemaUuidHeader,
783        rabin::Rabin,
784        schema::{DecimalSchema, FixedSchema, Name},
785        types::Record,
786        util::zig_i64,
787    };
788    use pretty_assertions::assert_eq;
789    use serde::{Deserialize, Serialize};
790    use uuid::Uuid;
791
792    use crate::schema::InnerDecimalSchema;
793    use crate::{codec::DeflateSettings, error::Details};
794    use apache_avro_test_helper::TestResult;
795
796    const AVRO_OBJECT_HEADER_LEN: usize = AVRO_OBJECT_HEADER.len();
797
798    const SCHEMA: &str = r#"
799    {
800      "type": "record",
801      "name": "test",
802      "fields": [
803        {
804          "name": "a",
805          "type": "long",
806          "default": 42
807        },
808        {
809          "name": "b",
810          "type": "string"
811        }
812      ]
813    }
814    "#;
815
816    const UNION_SCHEMA: &str = r#"["null", "long"]"#;
817
818    #[test]
819    fn test_to_avro_datum() -> TestResult {
820        let schema = Schema::parse_str(SCHEMA)?;
821        let mut record = Record::new(&schema).unwrap();
822        record.put("a", 27i64);
823        record.put("b", "foo");
824
825        let mut expected = Vec::new();
826        zig_i64(27, &mut expected)?;
827        zig_i64(3, &mut expected)?;
828        expected.extend([b'f', b'o', b'o']);
829
830        assert_eq!(to_avro_datum(&schema, record)?, expected);
831
832        Ok(())
833    }
834
835    #[test]
836    fn avro_rs_193_write_avro_datum_ref() -> TestResult {
837        #[derive(Serialize)]
838        struct TestStruct {
839            a: i64,
840            b: String,
841        }
842
843        let schema = Schema::parse_str(SCHEMA)?;
844        let mut writer: Vec<u8> = Vec::new();
845        let data = TestStruct {
846            a: 27,
847            b: "foo".to_string(),
848        };
849
850        let mut expected = Vec::new();
851        zig_i64(27, &mut expected)?;
852        zig_i64(3, &mut expected)?;
853        expected.extend([b'f', b'o', b'o']);
854
855        let bytes = write_avro_datum_ref(&schema, &data, &mut writer)?;
856
857        assert_eq!(bytes, expected.len());
858        assert_eq!(writer, expected);
859
860        Ok(())
861    }
862
863    #[test]
864    fn avro_rs_220_flush_write_header() -> TestResult {
865        let schema = Schema::parse_str(SCHEMA)?;
866
867        // By default flush should write the header even if nothing was added yet
868        let mut writer = Writer::new(&schema, Vec::new())?;
869        writer.flush()?;
870        let result = writer.into_inner()?;
871        assert_eq!(result.len(), 147);
872
873        // Unless the user indicates via the builder that the header has already been written
874        let mut writer = Writer::builder()
875            .has_header(true)
876            .schema(&schema)
877            .writer(Vec::new())
878            .build()?;
879        writer.flush()?;
880        let result = writer.into_inner()?;
881        assert_eq!(result.len(), 0);
882
883        Ok(())
884    }
885
886    #[test]
887    fn test_union_not_null() -> TestResult {
888        let schema = Schema::parse_str(UNION_SCHEMA)?;
889        let union = Value::Union(1, Box::new(Value::Long(3)));
890
891        let mut expected = Vec::new();
892        zig_i64(1, &mut expected)?;
893        zig_i64(3, &mut expected)?;
894
895        assert_eq!(to_avro_datum(&schema, union)?, expected);
896
897        Ok(())
898    }
899
900    #[test]
901    fn test_union_null() -> TestResult {
902        let schema = Schema::parse_str(UNION_SCHEMA)?;
903        let union = Value::Union(0, Box::new(Value::Null));
904
905        let mut expected = Vec::new();
906        zig_i64(0, &mut expected)?;
907
908        assert_eq!(to_avro_datum(&schema, union)?, expected);
909
910        Ok(())
911    }
912
913    fn logical_type_test<T: Into<Value> + Clone>(
914        schema_str: &'static str,
915
916        expected_schema: &Schema,
917        value: Value,
918
919        raw_schema: &Schema,
920        raw_value: T,
921    ) -> TestResult {
922        let schema = Schema::parse_str(schema_str)?;
923        assert_eq!(&schema, expected_schema);
924        // The serialized format should be the same as the schema.
925        let ser = to_avro_datum(&schema, value.clone())?;
926        let raw_ser = to_avro_datum(raw_schema, raw_value)?;
927        assert_eq!(ser, raw_ser);
928
929        // Should deserialize from the schema into the logical type.
930        let mut r = ser.as_slice();
931        let de = crate::from_avro_datum(&schema, &mut r, None)?;
932        assert_eq!(de, value);
933        Ok(())
934    }
935
936    #[test]
937    fn date() -> TestResult {
938        logical_type_test(
939            r#"{"type": "int", "logicalType": "date"}"#,
940            &Schema::Date,
941            Value::Date(1_i32),
942            &Schema::Int,
943            1_i32,
944        )
945    }
946
947    #[test]
948    fn time_millis() -> TestResult {
949        logical_type_test(
950            r#"{"type": "int", "logicalType": "time-millis"}"#,
951            &Schema::TimeMillis,
952            Value::TimeMillis(1_i32),
953            &Schema::Int,
954            1_i32,
955        )
956    }
957
958    #[test]
959    fn time_micros() -> TestResult {
960        logical_type_test(
961            r#"{"type": "long", "logicalType": "time-micros"}"#,
962            &Schema::TimeMicros,
963            Value::TimeMicros(1_i64),
964            &Schema::Long,
965            1_i64,
966        )
967    }
968
969    #[test]
970    fn timestamp_millis() -> TestResult {
971        logical_type_test(
972            r#"{"type": "long", "logicalType": "timestamp-millis"}"#,
973            &Schema::TimestampMillis,
974            Value::TimestampMillis(1_i64),
975            &Schema::Long,
976            1_i64,
977        )
978    }
979
980    #[test]
981    fn timestamp_micros() -> TestResult {
982        logical_type_test(
983            r#"{"type": "long", "logicalType": "timestamp-micros"}"#,
984            &Schema::TimestampMicros,
985            Value::TimestampMicros(1_i64),
986            &Schema::Long,
987            1_i64,
988        )
989    }
990
991    #[test]
992    fn decimal_fixed() -> TestResult {
993        let size = 30;
994        let fixed = FixedSchema {
995            name: Name::new("decimal")?,
996            aliases: None,
997            doc: None,
998            size,
999            default: None,
1000            attributes: Default::default(),
1001        };
1002        let inner = InnerDecimalSchema::Fixed(fixed.clone());
1003        let value = vec![0u8; size];
1004        logical_type_test(
1005            r#"{"type": {"type": "fixed", "size": 30, "name": "decimal"}, "logicalType": "decimal", "precision": 20, "scale": 5}"#,
1006            &Schema::Decimal(DecimalSchema {
1007                precision: 20,
1008                scale: 5,
1009                inner,
1010            }),
1011            Value::Decimal(Decimal::from(value.clone())),
1012            &Schema::Fixed(fixed),
1013            Value::Fixed(size, value),
1014        )
1015    }
1016
1017    #[test]
1018    fn decimal_bytes() -> TestResult {
1019        let value = vec![0u8; 10];
1020        logical_type_test(
1021            r#"{"type": "bytes", "logicalType": "decimal", "precision": 4, "scale": 3}"#,
1022            &Schema::Decimal(DecimalSchema {
1023                precision: 4,
1024                scale: 3,
1025                inner: InnerDecimalSchema::Bytes,
1026            }),
1027            Value::Decimal(Decimal::from(value.clone())),
1028            &Schema::Bytes,
1029            value,
1030        )
1031    }
1032
1033    #[test]
1034    fn duration() -> TestResult {
1035        let inner = Schema::Fixed(FixedSchema {
1036            name: Name::new("duration")?,
1037            aliases: None,
1038            doc: None,
1039            size: 12,
1040            default: None,
1041            attributes: Default::default(),
1042        });
1043        let value = Value::Duration(Duration::new(
1044            Months::new(256),
1045            Days::new(512),
1046            Millis::new(1024),
1047        ));
1048        logical_type_test(
1049            r#"{"type": {"type": "fixed", "name": "duration", "size": 12}, "logicalType": "duration"}"#,
1050            &Schema::Duration(FixedSchema {
1051                name: Name::from("duration"),
1052                aliases: None,
1053                doc: None,
1054                size: 12,
1055                default: None,
1056                attributes: Default::default(),
1057            }),
1058            value,
1059            &inner,
1060            Value::Fixed(12, vec![0, 1, 0, 0, 0, 2, 0, 0, 0, 4, 0, 0]),
1061        )
1062    }
1063
1064    #[test]
1065    fn test_writer_append() -> TestResult {
1066        let schema = Schema::parse_str(SCHEMA)?;
1067        let mut writer = Writer::new(&schema, Vec::new())?;
1068
1069        let mut record = Record::new(&schema).unwrap();
1070        record.put("a", 27i64);
1071        record.put("b", "foo");
1072
1073        let n1 = writer.append(record.clone())?;
1074        let n2 = writer.append(record.clone())?;
1075        let n3 = writer.flush()?;
1076        let result = writer.into_inner()?;
1077
1078        assert_eq!(n1 + n2 + n3, result.len());
1079
1080        let mut data = Vec::new();
1081        zig_i64(27, &mut data)?;
1082        zig_i64(3, &mut data)?;
1083        data.extend(b"foo");
1084        data.extend(data.clone());
1085
1086        // starts with magic
1087        assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1088        // ends with data and sync marker
1089        let last_data_byte = result.len() - 16;
1090        assert_eq!(
1091            &result[last_data_byte - data.len()..last_data_byte],
1092            data.as_slice()
1093        );
1094
1095        Ok(())
1096    }
1097
1098    #[test]
1099    fn test_writer_extend() -> TestResult {
1100        let schema = Schema::parse_str(SCHEMA)?;
1101        let mut writer = Writer::new(&schema, Vec::new())?;
1102
1103        let mut record = Record::new(&schema).unwrap();
1104        record.put("a", 27i64);
1105        record.put("b", "foo");
1106        let record_copy = record.clone();
1107        let records = vec![record, record_copy];
1108
1109        let n1 = writer.extend(records)?;
1110        let n2 = writer.flush()?;
1111        let result = writer.into_inner()?;
1112
1113        assert_eq!(n1 + n2, result.len());
1114
1115        let mut data = Vec::new();
1116        zig_i64(27, &mut data)?;
1117        zig_i64(3, &mut data)?;
1118        data.extend(b"foo");
1119        data.extend(data.clone());
1120
1121        // starts with magic
1122        assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1123        // ends with data and sync marker
1124        let last_data_byte = result.len() - 16;
1125        assert_eq!(
1126            &result[last_data_byte - data.len()..last_data_byte],
1127            data.as_slice()
1128        );
1129
1130        Ok(())
1131    }
1132
1133    #[derive(Debug, Clone, Deserialize, Serialize)]
1134    struct TestSerdeSerialize {
1135        a: i64,
1136        b: String,
1137    }
1138
1139    #[test]
1140    fn test_writer_append_ser() -> TestResult {
1141        let schema = Schema::parse_str(SCHEMA)?;
1142        let mut writer = Writer::new(&schema, Vec::new())?;
1143
1144        let record = TestSerdeSerialize {
1145            a: 27,
1146            b: "foo".to_owned(),
1147        };
1148
1149        let n1 = writer.append_ser(record)?;
1150        let n2 = writer.flush()?;
1151        let result = writer.into_inner()?;
1152
1153        assert_eq!(n1 + n2, result.len());
1154
1155        let mut data = Vec::new();
1156        zig_i64(27, &mut data)?;
1157        zig_i64(3, &mut data)?;
1158        data.extend(b"foo");
1159
1160        // starts with magic
1161        assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1162        // ends with data and sync marker
1163        let last_data_byte = result.len() - 16;
1164        assert_eq!(
1165            &result[last_data_byte - data.len()..last_data_byte],
1166            data.as_slice()
1167        );
1168
1169        Ok(())
1170    }
1171
1172    #[test]
1173    fn test_writer_extend_ser() -> TestResult {
1174        let schema = Schema::parse_str(SCHEMA)?;
1175        let mut writer = Writer::new(&schema, Vec::new())?;
1176
1177        let record = TestSerdeSerialize {
1178            a: 27,
1179            b: "foo".to_owned(),
1180        };
1181        let record_copy = record.clone();
1182        let records = vec![record, record_copy];
1183
1184        let n1 = writer.extend_ser(records)?;
1185        let n2 = writer.flush()?;
1186        let result = writer.into_inner()?;
1187
1188        assert_eq!(n1 + n2, result.len());
1189
1190        let mut data = Vec::new();
1191        zig_i64(27, &mut data)?;
1192        zig_i64(3, &mut data)?;
1193        data.extend(b"foo");
1194        data.extend(data.clone());
1195
1196        // starts with magic
1197        assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1198        // ends with data and sync marker
1199        let last_data_byte = result.len() - 16;
1200        assert_eq!(
1201            &result[last_data_byte - data.len()..last_data_byte],
1202            data.as_slice()
1203        );
1204
1205        Ok(())
1206    }
1207
1208    fn make_writer_with_codec(schema: &Schema) -> AvroResult<Writer<'_, Vec<u8>>> {
1209        Writer::with_codec(
1210            schema,
1211            Vec::new(),
1212            Codec::Deflate(DeflateSettings::default()),
1213        )
1214    }
1215
1216    fn make_writer_with_builder(schema: &Schema) -> AvroResult<Writer<'_, Vec<u8>>> {
1217        Writer::builder()
1218            .writer(Vec::new())
1219            .schema(schema)
1220            .codec(Codec::Deflate(DeflateSettings::default()))
1221            .block_size(100)
1222            .build()
1223    }
1224
1225    fn check_writer(mut writer: Writer<'_, Vec<u8>>, schema: &Schema) -> TestResult {
1226        let mut record = Record::new(schema).unwrap();
1227        record.put("a", 27i64);
1228        record.put("b", "foo");
1229
1230        let n1 = writer.append(record.clone())?;
1231        let n2 = writer.append(record.clone())?;
1232        let n3 = writer.flush()?;
1233        let result = writer.into_inner()?;
1234
1235        assert_eq!(n1 + n2 + n3, result.len());
1236
1237        let mut data = Vec::new();
1238        zig_i64(27, &mut data)?;
1239        zig_i64(3, &mut data)?;
1240        data.extend(b"foo");
1241        data.extend(data.clone());
1242        Codec::Deflate(DeflateSettings::default()).compress(&mut data)?;
1243
1244        // starts with magic
1245        assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1246        // ends with data and sync marker
1247        let last_data_byte = result.len() - 16;
1248        assert_eq!(
1249            &result[last_data_byte - data.len()..last_data_byte],
1250            data.as_slice()
1251        );
1252
1253        Ok(())
1254    }
1255
1256    #[test]
1257    fn test_writer_with_codec() -> TestResult {
1258        let schema = Schema::parse_str(SCHEMA)?;
1259        let writer = make_writer_with_codec(&schema)?;
1260        check_writer(writer, &schema)
1261    }
1262
1263    #[test]
1264    fn test_writer_with_builder() -> TestResult {
1265        let schema = Schema::parse_str(SCHEMA)?;
1266        let writer = make_writer_with_builder(&schema)?;
1267        check_writer(writer, &schema)
1268    }
1269
1270    #[test]
1271    fn test_logical_writer() -> TestResult {
1272        const LOGICAL_TYPE_SCHEMA: &str = r#"
1273        {
1274          "type": "record",
1275          "name": "logical_type_test",
1276          "fields": [
1277            {
1278              "name": "a",
1279              "type": [
1280                "null",
1281                {
1282                  "type": "long",
1283                  "logicalType": "timestamp-micros"
1284                }
1285              ]
1286            }
1287          ]
1288        }
1289        "#;
1290        let codec = Codec::Deflate(DeflateSettings::default());
1291        let schema = Schema::parse_str(LOGICAL_TYPE_SCHEMA)?;
1292        let mut writer = Writer::builder()
1293            .schema(&schema)
1294            .codec(codec)
1295            .writer(Vec::new())
1296            .build()?;
1297
1298        let mut record1 = Record::new(&schema).unwrap();
1299        record1.put(
1300            "a",
1301            Value::Union(1, Box::new(Value::TimestampMicros(1234_i64))),
1302        );
1303
1304        let mut record2 = Record::new(&schema).unwrap();
1305        record2.put("a", Value::Union(0, Box::new(Value::Null)));
1306
1307        let n1 = writer.append(record1)?;
1308        let n2 = writer.append(record2)?;
1309        let n3 = writer.flush()?;
1310        let result = writer.into_inner()?;
1311
1312        assert_eq!(n1 + n2 + n3, result.len());
1313
1314        let mut data = Vec::new();
1315        // byte indicating not null
1316        zig_i64(1, &mut data)?;
1317        zig_i64(1234, &mut data)?;
1318
1319        // byte indicating null
1320        zig_i64(0, &mut data)?;
1321        codec.compress(&mut data)?;
1322
1323        // starts with magic
1324        assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1325        // ends with data and sync marker
1326        let last_data_byte = result.len() - 16;
1327        assert_eq!(
1328            &result[last_data_byte - data.len()..last_data_byte],
1329            data.as_slice()
1330        );
1331
1332        Ok(())
1333    }
1334
1335    #[test]
1336    fn test_avro_3405_writer_add_metadata_success() -> TestResult {
1337        let schema = Schema::parse_str(SCHEMA)?;
1338        let mut writer = Writer::new(&schema, Vec::new())?;
1339
1340        writer.add_user_metadata("stringKey".to_string(), String::from("stringValue"))?;
1341        writer.add_user_metadata("strKey".to_string(), "strValue")?;
1342        writer.add_user_metadata("bytesKey".to_string(), b"bytesValue")?;
1343        writer.add_user_metadata("vecKey".to_string(), vec![1, 2, 3])?;
1344
1345        let mut record = Record::new(&schema).unwrap();
1346        record.put("a", 27i64);
1347        record.put("b", "foo");
1348
1349        writer.append(record.clone())?;
1350        writer.append(record.clone())?;
1351        writer.flush()?;
1352        let result = writer.into_inner()?;
1353
1354        assert_eq!(result.len(), 244);
1355
1356        Ok(())
1357    }
1358
1359    #[test]
1360    fn test_avro_3881_metadata_empty_body() -> TestResult {
1361        let schema = Schema::parse_str(SCHEMA)?;
1362        let mut writer = Writer::new(&schema, Vec::new())?;
1363        writer.add_user_metadata("a".to_string(), "b")?;
1364        let result = writer.into_inner()?;
1365
1366        let reader = Reader::with_schema(&schema, &result[..])?;
1367        let mut expected = HashMap::new();
1368        expected.insert("a".to_string(), vec![b'b']);
1369        assert_eq!(reader.user_metadata(), &expected);
1370        assert_eq!(reader.into_iter().count(), 0);
1371
1372        Ok(())
1373    }
1374
1375    #[test]
1376    fn test_avro_3405_writer_add_metadata_failure() -> TestResult {
1377        let schema = Schema::parse_str(SCHEMA)?;
1378        let mut writer = Writer::new(&schema, Vec::new())?;
1379
1380        let mut record = Record::new(&schema).unwrap();
1381        record.put("a", 27i64);
1382        record.put("b", "foo");
1383        writer.append(record.clone())?;
1384
1385        match writer
1386            .add_user_metadata("stringKey".to_string(), String::from("value2"))
1387            .map_err(Error::into_details)
1388        {
1389            Err(e @ Details::FileHeaderAlreadyWritten) => {
1390                assert_eq!(e.to_string(), "The file metadata is already flushed.")
1391            }
1392            Err(e) => panic!("Unexpected error occurred while writing user metadata: {e:?}"),
1393            Ok(_) => panic!("Expected an error that metadata cannot be added after adding data"),
1394        }
1395
1396        Ok(())
1397    }
1398
1399    #[test]
1400    fn test_avro_3405_writer_add_metadata_reserved_prefix_failure() -> TestResult {
1401        let schema = Schema::parse_str(SCHEMA)?;
1402        let mut writer = Writer::new(&schema, Vec::new())?;
1403
1404        let key = "avro.stringKey".to_string();
1405        match writer
1406            .add_user_metadata(key.clone(), "value")
1407            .map_err(Error::into_details)
1408        {
1409            Err(ref e @ Details::InvalidMetadataKey(_)) => {
1410                assert_eq!(
1411                    e.to_string(),
1412                    format!(
1413                        "Metadata keys starting with 'avro.' are reserved for internal usage: {key}."
1414                    )
1415                )
1416            }
1417            Err(e) => panic!(
1418                "Unexpected error occurred while writing user metadata with reserved prefix ('avro.'): {e:?}"
1419            ),
1420            Ok(_) => {
1421                panic!("Expected an error that the metadata key cannot be prefixed with 'avro.'")
1422            }
1423        }
1424
1425        Ok(())
1426    }
1427
1428    #[test]
1429    fn test_avro_3405_writer_add_metadata_with_builder_api_success() -> TestResult {
1430        let schema = Schema::parse_str(SCHEMA)?;
1431
1432        let mut user_meta_data: HashMap<String, Value> = HashMap::new();
1433        user_meta_data.insert(
1434            "stringKey".to_string(),
1435            Value::String("stringValue".to_string()),
1436        );
1437        user_meta_data.insert("bytesKey".to_string(), Value::Bytes(b"bytesValue".to_vec()));
1438        user_meta_data.insert("vecKey".to_string(), Value::Bytes(vec![1, 2, 3]));
1439
1440        let writer: Writer<'_, Vec<u8>> = Writer::builder()
1441            .writer(Vec::new())
1442            .schema(&schema)
1443            .user_metadata(user_meta_data.clone())
1444            .build()?;
1445
1446        assert_eq!(writer.user_metadata, user_meta_data);
1447
1448        Ok(())
1449    }
1450
1451    #[derive(Serialize, Clone)]
1452    struct TestSingleObjectWriter {
1453        a: i64,
1454        b: f64,
1455        c: Vec<String>,
1456    }
1457
1458    impl AvroSchema for TestSingleObjectWriter {
1459        fn get_schema() -> Schema {
1460            let schema = r#"
1461            {
1462                "type":"record",
1463                "name":"TestSingleObjectWrtierSerialize",
1464                "fields":[
1465                    {
1466                        "name":"a",
1467                        "type":"long"
1468                    },
1469                    {
1470                        "name":"b",
1471                        "type":"double"
1472                    },
1473                    {
1474                        "name":"c",
1475                        "type":{
1476                            "type":"array",
1477                            "items":"string"
1478                        }
1479                    }
1480                ]
1481            }
1482            "#;
1483            Schema::parse_str(schema).unwrap()
1484        }
1485    }
1486
1487    impl From<TestSingleObjectWriter> for Value {
1488        fn from(obj: TestSingleObjectWriter) -> Value {
1489            Value::Record(vec![
1490                ("a".into(), obj.a.into()),
1491                ("b".into(), obj.b.into()),
1492                (
1493                    "c".into(),
1494                    Value::Array(obj.c.into_iter().map(|s| s.into()).collect()),
1495                ),
1496            ])
1497        }
1498    }
1499
1500    #[test]
1501    fn test_single_object_writer() -> TestResult {
1502        let mut buf: Vec<u8> = Vec::new();
1503        let obj = TestSingleObjectWriter {
1504            a: 300,
1505            b: 34.555,
1506            c: vec!["cat".into(), "dog".into()],
1507        };
1508        let mut writer = GenericSingleObjectWriter::new_with_capacity(
1509            &TestSingleObjectWriter::get_schema(),
1510            1024,
1511        )
1512        .expect("Should resolve schema");
1513        let value = obj.into();
1514        let written_bytes = writer
1515            .write_value_ref(&value, &mut buf)
1516            .expect("Error serializing properly");
1517
1518        assert!(buf.len() > 10, "no bytes written");
1519        assert_eq!(buf.len(), written_bytes);
1520        assert_eq!(buf[0], 0xC3);
1521        assert_eq!(buf[1], 0x01);
1522        assert_eq!(
1523            &buf[2..10],
1524            &TestSingleObjectWriter::get_schema()
1525                .fingerprint::<Rabin>()
1526                .bytes[..]
1527        );
1528        let mut msg_binary = Vec::new();
1529        encode(
1530            &value,
1531            &TestSingleObjectWriter::get_schema(),
1532            &mut msg_binary,
1533        )
1534        .expect("encode should have failed by here as a dependency of any writing");
1535        assert_eq!(&buf[10..], &msg_binary[..]);
1536
1537        Ok(())
1538    }
1539
1540    #[test]
1541    fn test_single_object_writer_with_header_builder() -> TestResult {
1542        let mut buf: Vec<u8> = Vec::new();
1543        let obj = TestSingleObjectWriter {
1544            a: 300,
1545            b: 34.555,
1546            c: vec!["cat".into(), "dog".into()],
1547        };
1548        let schema_uuid = Uuid::parse_str("b2f1cf00-0434-013e-439a-125eb8485a5f")?;
1549        let header_builder = GlueSchemaUuidHeader::from_uuid(schema_uuid);
1550        let mut writer = GenericSingleObjectWriter::new_with_capacity_and_header_builder(
1551            &TestSingleObjectWriter::get_schema(),
1552            1024,
1553            header_builder,
1554        )
1555        .expect("Should resolve schema");
1556        let value = obj.into();
1557        writer
1558            .write_value_ref(&value, &mut buf)
1559            .expect("Error serializing properly");
1560
1561        assert_eq!(buf[0], 0x03);
1562        assert_eq!(buf[1], 0x00);
1563        assert_eq!(buf[2..18], schema_uuid.into_bytes()[..]);
1564        Ok(())
1565    }
1566
1567    #[test]
1568    fn test_writer_parity() -> TestResult {
1569        let obj1 = TestSingleObjectWriter {
1570            a: 300,
1571            b: 34.555,
1572            c: vec!["cat".into(), "dog".into()],
1573        };
1574
1575        let mut buf1: Vec<u8> = Vec::new();
1576        let mut buf2: Vec<u8> = Vec::new();
1577        let mut buf3: Vec<u8> = Vec::new();
1578
1579        let mut generic_writer = GenericSingleObjectWriter::new_with_capacity(
1580            &TestSingleObjectWriter::get_schema(),
1581            1024,
1582        )
1583        .expect("Should resolve schema");
1584        let mut specific_writer =
1585            SpecificSingleObjectWriter::<TestSingleObjectWriter>::with_capacity(1024)
1586                .expect("Resolved should pass");
1587        specific_writer
1588            .write(obj1.clone(), &mut buf1)
1589            .expect("Serialization expected");
1590        specific_writer
1591            .write_value(obj1.clone(), &mut buf2)
1592            .expect("Serialization expected");
1593        generic_writer
1594            .write_value(obj1.into(), &mut buf3)
1595            .expect("Serialization expected");
1596        assert_eq!(buf1, buf2);
1597        assert_eq!(buf1, buf3);
1598
1599        Ok(())
1600    }
1601
1602    #[test]
1603    fn avro_3894_take_aliases_into_account_when_serializing() -> TestResult {
1604        const SCHEMA: &str = r#"
1605  {
1606      "type": "record",
1607      "name": "Conference",
1608      "fields": [
1609          {"type": "string", "name": "name"},
1610          {"type": ["null", "long"], "name": "date", "aliases" : [ "time2", "time" ]}
1611      ]
1612  }"#;
1613
1614        #[derive(Debug, PartialEq, Eq, Clone, Serialize)]
1615        pub struct Conference {
1616            pub name: String,
1617            pub time: Option<i64>,
1618        }
1619
1620        let conf = Conference {
1621            name: "RustConf".to_string(),
1622            time: Some(1234567890),
1623        };
1624
1625        let schema = Schema::parse_str(SCHEMA)?;
1626        let mut writer = Writer::new(&schema, Vec::new())?;
1627
1628        let bytes = writer.append_ser(conf)?;
1629
1630        assert_eq!(182, bytes);
1631
1632        Ok(())
1633    }
1634
1635    #[test]
1636    fn avro_4014_validation_returns_a_detailed_error() -> TestResult {
1637        const SCHEMA: &str = r#"
1638  {
1639      "type": "record",
1640      "name": "Conference",
1641      "fields": [
1642          {"type": "string", "name": "name"},
1643          {"type": ["null", "long"], "name": "date", "aliases" : [ "time2", "time" ]}
1644      ]
1645  }"#;
1646
1647        #[derive(Debug, PartialEq, Clone, Serialize)]
1648        pub struct Conference {
1649            pub name: String,
1650            pub time: Option<f64>, // wrong type: f64 instead of i64
1651        }
1652
1653        let conf = Conference {
1654            name: "RustConf".to_string(),
1655            time: Some(12345678.90),
1656        };
1657
1658        let schema = Schema::parse_str(SCHEMA)?;
1659        let mut writer = Writer::new(&schema, Vec::new())?;
1660
1661        match writer.append_ser(conf) {
1662            Ok(bytes) => panic!("Expected an error, but got {bytes} bytes written"),
1663            Err(e) => {
1664                assert_eq!(
1665                    e.to_string(),
1666                    r#"Failed to serialize field 'time' for record Record(RecordSchema { name: Name { name: "Conference", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "name", doc: None, aliases: None, default: None, schema: String, order: Ascending, position: 0, custom_attributes: {} }, RecordField { name: "date", doc: None, aliases: Some(["time2", "time"]), default: None, schema: Union(UnionSchema { schemas: [Null, Long], variant_index: {Null: 0, Long: 1} }), order: Ascending, position: 1, custom_attributes: {} }], lookup: {"date": 1, "name": 0, "time": 1, "time2": 1}, attributes: {} }): Failed to serialize value of type f64 using schema Union(UnionSchema { schemas: [Null, Long], variant_index: {Null: 0, Long: 1} }): 12345678.9. Cause: Cannot find a Double schema in [Null, Long]"#
1667                );
1668            }
1669        }
1670        Ok(())
1671    }
1672
1673    #[test]
1674    fn avro_4063_flush_applies_to_inner_writer() -> TestResult {
1675        const SCHEMA: &str = r#"
1676        {
1677            "type": "record",
1678            "name": "ExampleSchema",
1679            "fields": [
1680                {"name": "exampleField", "type": "string"}
1681            ]
1682        }
1683        "#;
1684
1685        #[derive(Clone, Default)]
1686        struct TestBuffer(Rc<RefCell<Vec<u8>>>);
1687
1688        impl TestBuffer {
1689            fn len(&self) -> usize {
1690                self.0.borrow().len()
1691            }
1692        }
1693
1694        impl Write for TestBuffer {
1695            fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
1696                self.0.borrow_mut().write(buf)
1697            }
1698
1699            fn flush(&mut self) -> std::io::Result<()> {
1700                Ok(())
1701            }
1702        }
1703
1704        let shared_buffer = TestBuffer::default();
1705
1706        let buffered_writer = std::io::BufWriter::new(shared_buffer.clone());
1707
1708        let schema = Schema::parse_str(SCHEMA)?;
1709
1710        let mut writer = Writer::new(&schema, buffered_writer)?;
1711
1712        let mut record = Record::new(writer.schema()).unwrap();
1713        record.put("exampleField", "value");
1714
1715        writer.append(record)?;
1716        writer.flush()?;
1717
1718        assert_eq!(
1719            shared_buffer.len(),
1720            151,
1721            "the test buffer was not fully written to after Writer::flush was called"
1722        );
1723
1724        Ok(())
1725    }
1726}