apache_avro/
writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Logic handling writing in Avro format at user level.
19use crate::{
20    AvroResult, Codec, Error,
21    encode::{encode, encode_internal, encode_to_vec},
22    error::Details,
23    headers::{HeaderBuilder, RabinFingerprintHeader},
24    schema::{AvroSchema, Name, ResolvedOwnedSchema, ResolvedSchema, Schema},
25    ser_schema::SchemaAwareWriteSerializer,
26    types::Value,
27};
28use serde::Serialize;
29use std::{
30    collections::HashMap, io::Write, marker::PhantomData, mem::ManuallyDrop, ops::RangeInclusive,
31};
32
33const DEFAULT_BLOCK_SIZE: usize = 16000;
34const AVRO_OBJECT_HEADER: &[u8] = b"Obj\x01";
35
36/// Main interface for writing Avro formatted values.
37///
38/// It is critical to call flush before `Writer<W>` is dropped. Though dropping will attempt to flush
39/// the contents of the buffer, any errors that happen in the process of dropping will be ignored.
40/// Calling flush ensures that the buffer is empty and thus dropping will not even attempt file operations.
41#[derive(bon::Builder)]
42pub struct Writer<'a, W: Write> {
43    schema: &'a Schema,
44    writer: W,
45    #[builder(skip)]
46    resolved_schema: Option<ResolvedSchema<'a>>,
47    #[builder(default = Codec::Null)]
48    codec: Codec,
49    #[builder(default = DEFAULT_BLOCK_SIZE)]
50    block_size: usize,
51    #[builder(skip = Vec::with_capacity(block_size))]
52    buffer: Vec<u8>,
53    #[builder(skip)]
54    num_values: usize,
55    #[builder(default = generate_sync_marker())]
56    marker: [u8; 16],
57    /// Has the header already been written.
58    ///
59    /// To disable writing the header, this can be set to `true`.
60    #[builder(default = false)]
61    has_header: bool,
62    #[builder(default)]
63    user_metadata: HashMap<String, Value>,
64}
65
66impl<'a, W: Write> Writer<'a, W> {
67    /// Creates a `Writer` given a `Schema` and something implementing the `io::Write` trait to write
68    /// to.
69    /// No compression `Codec` will be used.
70    pub fn new(schema: &'a Schema, writer: W) -> Self {
71        Writer::with_codec(schema, writer, Codec::Null)
72    }
73
74    /// Creates a `Writer` with a specific `Codec` given a `Schema` and something implementing the
75    /// `io::Write` trait to write to.
76    pub fn with_codec(schema: &'a Schema, writer: W, codec: Codec) -> Self {
77        let mut w = Self::builder()
78            .schema(schema)
79            .writer(writer)
80            .codec(codec)
81            .build();
82        w.resolved_schema = ResolvedSchema::try_from(schema).ok();
83        w
84    }
85
86    /// Creates a `Writer` with a specific `Codec` given a `Schema` and something implementing the
87    /// `io::Write` trait to write to.
88    /// If the `schema` is incomplete, i.e. contains `Schema::Ref`s then all dependencies must
89    /// be provided in `schemata`.
90    pub fn with_schemata(
91        schema: &'a Schema,
92        schemata: Vec<&'a Schema>,
93        writer: W,
94        codec: Codec,
95    ) -> Self {
96        let mut w = Self::builder()
97            .schema(schema)
98            .writer(writer)
99            .codec(codec)
100            .build();
101        w.resolved_schema = ResolvedSchema::try_from(schemata).ok();
102        w
103    }
104
105    /// Creates a `Writer` that will append values to already populated
106    /// `std::io::Write` using the provided `marker`
107    /// No compression `Codec` will be used.
108    pub fn append_to(schema: &'a Schema, writer: W, marker: [u8; 16]) -> Self {
109        Writer::append_to_with_codec(schema, writer, Codec::Null, marker)
110    }
111
112    /// Creates a `Writer` that will append values to already populated
113    /// `std::io::Write` using the provided `marker`
114    pub fn append_to_with_codec(
115        schema: &'a Schema,
116        writer: W,
117        codec: Codec,
118        marker: [u8; 16],
119    ) -> Self {
120        let mut w = Self::builder()
121            .schema(schema)
122            .writer(writer)
123            .codec(codec)
124            .marker(marker)
125            .has_header(true)
126            .build();
127        w.resolved_schema = ResolvedSchema::try_from(schema).ok();
128        w
129    }
130
131    /// Creates a `Writer` that will append values to already populated
132    /// `std::io::Write` using the provided `marker`
133    pub fn append_to_with_codec_schemata(
134        schema: &'a Schema,
135        schemata: Vec<&'a Schema>,
136        writer: W,
137        codec: Codec,
138        marker: [u8; 16],
139    ) -> Self {
140        let mut w = Self::builder()
141            .schema(schema)
142            .writer(writer)
143            .codec(codec)
144            .marker(marker)
145            .has_header(true)
146            .build();
147        w.resolved_schema = ResolvedSchema::try_from(schemata).ok();
148        w
149    }
150
151    /// Get a reference to the `Schema` associated to a `Writer`.
152    pub fn schema(&self) -> &'a Schema {
153        self.schema
154    }
155
156    /// Append a compatible value (implementing the `ToAvro` trait) to a `Writer`, also performing
157    /// schema validation.
158    ///
159    /// Returns the number of bytes written (it might be 0, see below).
160    ///
161    /// **NOTE**: This function is not guaranteed to perform any actual write, since it relies on
162    /// internal buffering for performance reasons. If you want to be sure the value has been
163    /// written, then call [`flush`](Writer::flush).
164    pub fn append<T: Into<Value>>(&mut self, value: T) -> AvroResult<usize> {
165        let n = self.maybe_write_header()?;
166
167        let avro = value.into();
168        self.append_value_ref(&avro).map(|m| m + n)
169    }
170
171    /// Append a compatible value to a `Writer`, also performing schema validation.
172    ///
173    /// Returns the number of bytes written (it might be 0, see below).
174    ///
175    /// **NOTE**: This function is not guaranteed to perform any actual write, since it relies on
176    /// internal buffering for performance reasons. If you want to be sure the value has been
177    /// written, then call [`flush`](Writer::flush).
178    pub fn append_value_ref(&mut self, value: &Value) -> AvroResult<usize> {
179        let n = self.maybe_write_header()?;
180
181        // Lazy init for users using the builder pattern with error throwing
182        match self.resolved_schema {
183            Some(ref rs) => {
184                write_value_ref_resolved(self.schema, rs, value, &mut self.buffer)?;
185                self.num_values += 1;
186
187                if self.buffer.len() >= self.block_size {
188                    return self.flush().map(|b| b + n);
189                }
190
191                Ok(n)
192            }
193            None => {
194                let rs = ResolvedSchema::try_from(self.schema)?;
195                self.resolved_schema = Some(rs);
196                self.append_value_ref(value)
197            }
198        }
199    }
200
201    /// Append anything implementing the `Serialize` trait to a `Writer` for
202    /// [`serde`](https://docs.serde.rs/serde/index.html) compatibility, also performing schema
203    /// validation.
204    ///
205    /// Returns the number of bytes written.
206    ///
207    /// **NOTE**: This function is not guaranteed to perform any actual write, since it relies on
208    /// internal buffering for performance reasons. If you want to be sure the value has been
209    /// written, then call [`flush`](Writer::flush).
210    pub fn append_ser<S: Serialize>(&mut self, value: S) -> AvroResult<usize> {
211        let n = self.maybe_write_header()?;
212
213        match self.resolved_schema {
214            Some(ref rs) => {
215                let mut serializer = SchemaAwareWriteSerializer::new(
216                    &mut self.buffer,
217                    self.schema,
218                    rs.get_names(),
219                    None,
220                );
221                value.serialize(&mut serializer)?;
222                self.num_values += 1;
223
224                if self.buffer.len() >= self.block_size {
225                    return self.flush().map(|b| b + n);
226                }
227
228                Ok(n)
229            }
230            None => {
231                let rs = ResolvedSchema::try_from(self.schema)?;
232                self.resolved_schema = Some(rs);
233                self.append_ser(value)
234            }
235        }
236    }
237
238    /// Extend a `Writer` with an `Iterator` of compatible values (implementing the `ToAvro`
239    /// trait), also performing schema validation.
240    ///
241    /// Returns the number of bytes written.
242    ///
243    /// **NOTE**: This function forces the written data to be flushed (an implicit
244    /// call to [`flush`](Writer::flush) is performed).
245    pub fn extend<I, T: Into<Value>>(&mut self, values: I) -> AvroResult<usize>
246    where
247        I: IntoIterator<Item = T>,
248    {
249        /*
250        https://github.com/rust-lang/rfcs/issues/811 :(
251        let mut stream = values
252            .filter_map(|value| value.serialize(&mut self.serializer).ok())
253            .map(|value| value.encode(self.schema))
254            .collect::<Option<Vec<_>>>()
255            .ok_or_else(|| err_msg("value does not match given schema"))?
256            .into_iter()
257            .fold(Vec::new(), |mut acc, stream| {
258                num_values += 1;
259                acc.extend(stream); acc
260            });
261        */
262
263        let mut num_bytes = 0;
264        for value in values {
265            num_bytes += self.append(value)?;
266        }
267        num_bytes += self.flush()?;
268
269        Ok(num_bytes)
270    }
271
272    /// Extend a `Writer` with an `Iterator` of anything implementing the `Serialize` trait for
273    /// [`serde`](https://docs.serde.rs/serde/index.html) compatibility, also performing schema
274    /// validation.
275    ///
276    /// Returns the number of bytes written.
277    ///
278    /// **NOTE**: This function forces the written data to be flushed (an implicit
279    /// call to [`flush`](Writer::flush) is performed).
280    pub fn extend_ser<I, T: Serialize>(&mut self, values: I) -> AvroResult<usize>
281    where
282        I: IntoIterator<Item = T>,
283    {
284        /*
285        https://github.com/rust-lang/rfcs/issues/811 :(
286        let mut stream = values
287            .filter_map(|value| value.serialize(&mut self.serializer).ok())
288            .map(|value| value.encode(self.schema))
289            .collect::<Option<Vec<_>>>()
290            .ok_or_else(|| err_msg("value does not match given schema"))?
291            .into_iter()
292            .fold(Vec::new(), |mut acc, stream| {
293                num_values += 1;
294                acc.extend(stream); acc
295            });
296        */
297
298        let mut num_bytes = 0;
299        for value in values {
300            num_bytes += self.append_ser(value)?;
301        }
302        num_bytes += self.flush()?;
303
304        Ok(num_bytes)
305    }
306
307    /// Extend a `Writer` by appending each `Value` from a slice, while also performing schema
308    /// validation on each value appended.
309    ///
310    /// Returns the number of bytes written.
311    ///
312    /// **NOTE**: This function forces the written data to be flushed (an implicit
313    /// call to [`flush`](Writer::flush) is performed).
314    pub fn extend_from_slice(&mut self, values: &[Value]) -> AvroResult<usize> {
315        let mut num_bytes = 0;
316        for value in values {
317            num_bytes += self.append_value_ref(value)?;
318        }
319        num_bytes += self.flush()?;
320
321        Ok(num_bytes)
322    }
323
324    /// Flush the content to the inner `Writer`.
325    ///
326    /// Call this function to make sure all the content has been written before releasing the `Writer`.
327    /// This will also write the header if it wasn't written yet and hasn't been disabled using
328    /// [`WriterBuilder::has_header`].
329    ///
330    /// Returns the number of bytes written.
331    pub fn flush(&mut self) -> AvroResult<usize> {
332        let mut num_bytes = self.maybe_write_header()?;
333        if self.num_values == 0 {
334            return Ok(num_bytes);
335        }
336
337        self.codec.compress(&mut self.buffer)?;
338
339        let num_values = self.num_values;
340        let stream_len = self.buffer.len();
341
342        num_bytes += self.append_raw(&num_values.into(), &Schema::Long)?
343            + self.append_raw(&stream_len.into(), &Schema::Long)?
344            + self
345                .writer
346                .write(self.buffer.as_ref())
347                .map_err(Details::WriteBytes)?
348            + self.append_marker()?;
349
350        self.buffer.clear();
351        self.num_values = 0;
352
353        self.writer.flush().map_err(Details::FlushWriter)?;
354
355        Ok(num_bytes)
356    }
357
358    /// Return what the `Writer` is writing to, consuming the `Writer` itself.
359    ///
360    /// **NOTE**: This function forces the written data to be flushed (an implicit
361    /// call to [`flush`](Writer::flush) is performed).
362    pub fn into_inner(mut self) -> AvroResult<W> {
363        self.maybe_write_header()?;
364        self.flush()?;
365
366        let mut this = ManuallyDrop::new(self);
367
368        // Extract every member that is not Copy and therefore should be dropped
369        let _resolved_schema = std::mem::take(&mut this.resolved_schema);
370        let _buffer = std::mem::take(&mut this.buffer);
371        let _user_metadata = std::mem::take(&mut this.user_metadata);
372
373        // SAFETY: double-drops are prevented by putting `this` in a ManuallyDrop that is never dropped
374        let writer = unsafe { std::ptr::read(&this.writer) };
375
376        Ok(writer)
377    }
378
379    /// Gets a reference to the underlying writer.
380    ///
381    /// **NOTE**: There is likely data still in the buffer. To have all the data
382    /// in the writer call [`flush`](Writer::flush) first.
383    pub fn get_ref(&self) -> &W {
384        &self.writer
385    }
386
387    /// Gets a mutable reference to the underlying writer.
388    ///
389    /// It is inadvisable to directly write to the underlying writer.
390    ///
391    /// **NOTE**: There is likely data still in the buffer. To have all the data
392    /// in the writer call [`flush`](Writer::flush) first.
393    pub fn get_mut(&mut self) -> &mut W {
394        &mut self.writer
395    }
396
397    /// Generate and append synchronization marker to the payload.
398    fn append_marker(&mut self) -> AvroResult<usize> {
399        // using .writer.write directly to avoid mutable borrow of self
400        // with ref borrowing of self.marker
401        self.writer
402            .write(&self.marker)
403            .map_err(|e| Details::WriteMarker(e).into())
404    }
405
406    /// Append a raw Avro Value to the payload avoiding to encode it again.
407    fn append_raw(&mut self, value: &Value, schema: &Schema) -> AvroResult<usize> {
408        self.append_bytes(encode_to_vec(value, schema)?.as_ref())
409    }
410
411    /// Append pure bytes to the payload.
412    fn append_bytes(&mut self, bytes: &[u8]) -> AvroResult<usize> {
413        self.writer
414            .write(bytes)
415            .map_err(|e| Details::WriteBytes(e).into())
416    }
417
418    /// Adds custom metadata to the file.
419    /// This method could be used only before adding the first record to the writer.
420    pub fn add_user_metadata<T: AsRef<[u8]>>(&mut self, key: String, value: T) -> AvroResult<()> {
421        if !self.has_header {
422            if key.starts_with("avro.") {
423                return Err(Details::InvalidMetadataKey(key).into());
424            }
425            self.user_metadata
426                .insert(key, Value::Bytes(value.as_ref().to_vec()));
427            Ok(())
428        } else {
429            Err(Details::FileHeaderAlreadyWritten.into())
430        }
431    }
432
433    /// Create an Avro header based on schema, codec and sync marker.
434    fn header(&self) -> Result<Vec<u8>, Error> {
435        let schema_bytes = serde_json::to_string(self.schema)
436            .map_err(Details::ConvertJsonToString)?
437            .into_bytes();
438
439        let mut metadata = HashMap::with_capacity(2);
440        metadata.insert("avro.schema", Value::Bytes(schema_bytes));
441        metadata.insert("avro.codec", self.codec.into());
442        match self.codec {
443            #[cfg(feature = "bzip")]
444            Codec::Bzip2(settings) => {
445                metadata.insert(
446                    "avro.codec.compression_level",
447                    Value::Bytes(vec![settings.compression_level]),
448                );
449            }
450            #[cfg(feature = "xz")]
451            Codec::Xz(settings) => {
452                metadata.insert(
453                    "avro.codec.compression_level",
454                    Value::Bytes(vec![settings.compression_level]),
455                );
456            }
457            #[cfg(feature = "zstandard")]
458            Codec::Zstandard(settings) => {
459                metadata.insert(
460                    "avro.codec.compression_level",
461                    Value::Bytes(vec![settings.compression_level]),
462                );
463            }
464            _ => {}
465        }
466
467        for (k, v) in &self.user_metadata {
468            metadata.insert(k.as_str(), v.clone());
469        }
470
471        let mut header = Vec::new();
472        header.extend_from_slice(AVRO_OBJECT_HEADER);
473        encode(&metadata.into(), &Schema::map(Schema::Bytes), &mut header)?;
474        header.extend_from_slice(&self.marker);
475
476        Ok(header)
477    }
478
479    fn maybe_write_header(&mut self) -> AvroResult<usize> {
480        if !self.has_header {
481            let header = self.header()?;
482            let n = self.append_bytes(header.as_ref())?;
483            self.has_header = true;
484            Ok(n)
485        } else {
486            Ok(0)
487        }
488    }
489}
490
491impl<W: Write> Drop for Writer<'_, W> {
492    /// Drop the writer, will try to flush ignoring any errors.
493    fn drop(&mut self) {
494        let _ = self.maybe_write_header();
495        let _ = self.flush();
496    }
497}
498
499/// Encode a compatible value (implementing the `ToAvro` trait) into Avro format, also performing
500/// schema validation.
501///
502/// This is an internal function which gets the bytes buffer where to write as parameter instead of
503/// creating a new one like `to_avro_datum`.
504fn write_avro_datum<T: Into<Value>, W: Write>(
505    schema: &Schema,
506    value: T,
507    writer: &mut W,
508) -> Result<(), Error> {
509    let avro = value.into();
510    if !avro.validate(schema) {
511        return Err(Details::Validation.into());
512    }
513    encode(&avro, schema, writer)?;
514    Ok(())
515}
516
517fn write_avro_datum_schemata<T: Into<Value>>(
518    schema: &Schema,
519    schemata: Vec<&Schema>,
520    value: T,
521    buffer: &mut Vec<u8>,
522) -> AvroResult<usize> {
523    let avro = value.into();
524    let rs = ResolvedSchema::try_from(schemata)?;
525    let names = rs.get_names();
526    let enclosing_namespace = schema.namespace();
527    if let Some(_err) = avro.validate_internal(schema, names, &enclosing_namespace) {
528        return Err(Details::Validation.into());
529    }
530    encode_internal(&avro, schema, names, &enclosing_namespace, buffer)
531}
532
533/// Writer that encodes messages according to the single object encoding v1 spec
534/// Uses an API similar to the current File Writer
535/// Writes all object bytes at once, and drains internal buffer
536pub struct GenericSingleObjectWriter {
537    buffer: Vec<u8>,
538    resolved: ResolvedOwnedSchema,
539}
540
541impl GenericSingleObjectWriter {
542    pub fn new_with_capacity(
543        schema: &Schema,
544        initial_buffer_cap: usize,
545    ) -> AvroResult<GenericSingleObjectWriter> {
546        let header_builder = RabinFingerprintHeader::from_schema(schema);
547        Self::new_with_capacity_and_header_builder(schema, initial_buffer_cap, header_builder)
548    }
549
550    pub fn new_with_capacity_and_header_builder<HB: HeaderBuilder>(
551        schema: &Schema,
552        initial_buffer_cap: usize,
553        header_builder: HB,
554    ) -> AvroResult<GenericSingleObjectWriter> {
555        let mut buffer = Vec::with_capacity(initial_buffer_cap);
556        let header = header_builder.build_header();
557        buffer.extend_from_slice(&header);
558
559        Ok(GenericSingleObjectWriter {
560            buffer,
561            resolved: ResolvedOwnedSchema::try_from(schema.clone())?,
562        })
563    }
564
565    const HEADER_LENGTH_RANGE: RangeInclusive<usize> = 10_usize..=20_usize;
566
567    /// Write the referenced Value to the provided Write object. Returns a result with the number of bytes written including the header
568    pub fn write_value_ref<W: Write>(&mut self, v: &Value, writer: &mut W) -> AvroResult<usize> {
569        let original_length = self.buffer.len();
570        if !Self::HEADER_LENGTH_RANGE.contains(&original_length) {
571            Err(Details::IllegalSingleObjectWriterState.into())
572        } else {
573            write_value_ref_owned_resolved(&self.resolved, v, &mut self.buffer)?;
574            writer
575                .write_all(&self.buffer)
576                .map_err(Details::WriteBytes)?;
577            let len = self.buffer.len();
578            self.buffer.truncate(original_length);
579            Ok(len)
580        }
581    }
582
583    /// Write the Value to the provided Write object. Returns a result with the number of bytes written including the header
584    pub fn write_value<W: Write>(&mut self, v: Value, writer: &mut W) -> AvroResult<usize> {
585        self.write_value_ref(&v, writer)
586    }
587}
588
589/// Writer that encodes messages according to the single object encoding v1 spec
590pub struct SpecificSingleObjectWriter<T>
591where
592    T: AvroSchema,
593{
594    inner: GenericSingleObjectWriter,
595    schema: Schema,
596    header_written: bool,
597    _model: PhantomData<T>,
598}
599
600impl<T> SpecificSingleObjectWriter<T>
601where
602    T: AvroSchema,
603{
604    pub fn with_capacity(buffer_cap: usize) -> AvroResult<SpecificSingleObjectWriter<T>> {
605        let schema = T::get_schema();
606        Ok(SpecificSingleObjectWriter {
607            inner: GenericSingleObjectWriter::new_with_capacity(&schema, buffer_cap)?,
608            schema,
609            header_written: false,
610            _model: PhantomData,
611        })
612    }
613}
614
615impl<T> SpecificSingleObjectWriter<T>
616where
617    T: AvroSchema + Into<Value>,
618{
619    /// Write the `Into<Value>` to the provided Write object. Returns a result with the number
620    /// of bytes written including the header
621    pub fn write_value<W: Write>(&mut self, data: T, writer: &mut W) -> AvroResult<usize> {
622        let v: Value = data.into();
623        self.inner.write_value_ref(&v, writer)
624    }
625}
626
627impl<T> SpecificSingleObjectWriter<T>
628where
629    T: AvroSchema + Serialize,
630{
631    /// Write the referenced `Serialize` object to the provided `Write` object. Returns a result with
632    /// the number of bytes written including the header
633    pub fn write_ref<W: Write>(&mut self, data: &T, writer: &mut W) -> AvroResult<usize> {
634        let mut bytes_written: usize = 0;
635
636        if !self.header_written {
637            bytes_written += writer
638                .write(self.inner.buffer.as_slice())
639                .map_err(Details::WriteBytes)?;
640            self.header_written = true;
641        }
642
643        bytes_written += write_avro_datum_ref(&self.schema, data, writer)?;
644
645        Ok(bytes_written)
646    }
647
648    /// Write the Serialize object to the provided Write object. Returns a result with the number
649    /// of bytes written including the header
650    pub fn write<W: Write>(&mut self, data: T, writer: &mut W) -> AvroResult<usize> {
651        self.write_ref(&data, writer)
652    }
653}
654
655fn write_value_ref_resolved(
656    schema: &Schema,
657    resolved_schema: &ResolvedSchema,
658    value: &Value,
659    buffer: &mut Vec<u8>,
660) -> AvroResult<usize> {
661    match value.validate_internal(schema, resolved_schema.get_names(), &schema.namespace()) {
662        Some(reason) => Err(Details::ValidationWithReason {
663            value: value.clone(),
664            schema: schema.clone(),
665            reason,
666        }
667        .into()),
668        None => encode_internal(
669            value,
670            schema,
671            resolved_schema.get_names(),
672            &schema.namespace(),
673            buffer,
674        ),
675    }
676}
677
678fn write_value_ref_owned_resolved(
679    resolved_schema: &ResolvedOwnedSchema,
680    value: &Value,
681    buffer: &mut Vec<u8>,
682) -> AvroResult<()> {
683    let root_schema = resolved_schema.get_root_schema();
684    if let Some(reason) = value.validate_internal(
685        root_schema,
686        resolved_schema.get_names(),
687        &root_schema.namespace(),
688    ) {
689        return Err(Details::ValidationWithReason {
690            value: value.clone(),
691            schema: root_schema.clone(),
692            reason,
693        }
694        .into());
695    }
696    encode_internal(
697        value,
698        root_schema,
699        resolved_schema.get_names(),
700        &root_schema.namespace(),
701        buffer,
702    )?;
703    Ok(())
704}
705
706/// Encode a compatible value (implementing the `ToAvro` trait) into Avro format, also
707/// performing schema validation.
708///
709/// **NOTE**: This function has a quite small niche of usage and does NOT generate headers and sync
710/// markers; use [`Writer`] to be fully Avro-compatible if you don't know what
711/// you are doing, instead.
712pub fn to_avro_datum<T: Into<Value>>(schema: &Schema, value: T) -> AvroResult<Vec<u8>> {
713    let mut buffer = Vec::new();
714    write_avro_datum(schema, value, &mut buffer)?;
715    Ok(buffer)
716}
717
718/// Write the referenced [Serialize]able object to the provided [Write] object.
719/// Returns a result with the number of bytes written.
720///
721/// **NOTE**: This function has a quite small niche of usage and does **NOT** generate headers and sync
722/// markers; use [`append_ser`](Writer::append_ser) to be fully Avro-compatible
723/// if you don't know what you are doing, instead.
724pub fn write_avro_datum_ref<T: Serialize, W: Write>(
725    schema: &Schema,
726    data: &T,
727    writer: &mut W,
728) -> AvroResult<usize> {
729    let names: HashMap<Name, &Schema> = HashMap::new();
730    let mut serializer = SchemaAwareWriteSerializer::new(writer, schema, &names, None);
731    let bytes_written = data.serialize(&mut serializer)?;
732    Ok(bytes_written)
733}
734
735/// Encode a compatible value (implementing the `ToAvro` trait) into Avro format, also
736/// performing schema validation.
737/// If the provided `schema` is incomplete then its dependencies must be
738/// provided in `schemata`
739pub fn to_avro_datum_schemata<T: Into<Value>>(
740    schema: &Schema,
741    schemata: Vec<&Schema>,
742    value: T,
743) -> AvroResult<Vec<u8>> {
744    let mut buffer = Vec::new();
745    write_avro_datum_schemata(schema, schemata, value, &mut buffer)?;
746    Ok(buffer)
747}
748
749#[cfg(not(target_arch = "wasm32"))]
750fn generate_sync_marker() -> [u8; 16] {
751    let mut marker = [0_u8; 16];
752    std::iter::repeat_with(rand::random)
753        .take(16)
754        .enumerate()
755        .for_each(|(i, n)| marker[i] = n);
756    marker
757}
758
759#[cfg(target_arch = "wasm32")]
760fn generate_sync_marker() -> [u8; 16] {
761    let mut marker = [0_u8; 16];
762    std::iter::repeat_with(quad_rand::rand)
763        .take(4)
764        .flat_map(|i| i.to_be_bytes())
765        .enumerate()
766        .for_each(|(i, n)| marker[i] = n);
767    marker
768}
769
770#[cfg(test)]
771mod tests {
772    use std::{cell::RefCell, rc::Rc};
773
774    use super::*;
775    use crate::{
776        Reader,
777        decimal::Decimal,
778        duration::{Days, Duration, Millis, Months},
779        headers::GlueSchemaUuidHeader,
780        rabin::Rabin,
781        schema::{DecimalSchema, FixedSchema, Name},
782        types::Record,
783        util::zig_i64,
784    };
785    use pretty_assertions::assert_eq;
786    use serde::{Deserialize, Serialize};
787    use uuid::Uuid;
788
789    use crate::{codec::DeflateSettings, error::Details};
790    use apache_avro_test_helper::TestResult;
791
792    const AVRO_OBJECT_HEADER_LEN: usize = AVRO_OBJECT_HEADER.len();
793
794    const SCHEMA: &str = r#"
795    {
796      "type": "record",
797      "name": "test",
798      "fields": [
799        {
800          "name": "a",
801          "type": "long",
802          "default": 42
803        },
804        {
805          "name": "b",
806          "type": "string"
807        }
808      ]
809    }
810    "#;
811
812    const UNION_SCHEMA: &str = r#"["null", "long"]"#;
813
814    #[test]
815    fn test_to_avro_datum() -> TestResult {
816        let schema = Schema::parse_str(SCHEMA)?;
817        let mut record = Record::new(&schema).unwrap();
818        record.put("a", 27i64);
819        record.put("b", "foo");
820
821        let mut expected = Vec::new();
822        zig_i64(27, &mut expected)?;
823        zig_i64(3, &mut expected)?;
824        expected.extend([b'f', b'o', b'o']);
825
826        assert_eq!(to_avro_datum(&schema, record)?, expected);
827
828        Ok(())
829    }
830
831    #[test]
832    fn avro_rs_193_write_avro_datum_ref() -> TestResult {
833        #[derive(Serialize)]
834        struct TestStruct {
835            a: i64,
836            b: String,
837        }
838
839        let schema = Schema::parse_str(SCHEMA)?;
840        let mut writer: Vec<u8> = Vec::new();
841        let data = TestStruct {
842            a: 27,
843            b: "foo".to_string(),
844        };
845
846        let mut expected = Vec::new();
847        zig_i64(27, &mut expected)?;
848        zig_i64(3, &mut expected)?;
849        expected.extend([b'f', b'o', b'o']);
850
851        let bytes = write_avro_datum_ref(&schema, &data, &mut writer)?;
852
853        assert_eq!(bytes, expected.len());
854        assert_eq!(writer, expected);
855
856        Ok(())
857    }
858
859    #[test]
860    fn avro_rs_220_flush_write_header() -> TestResult {
861        let schema = Schema::parse_str(SCHEMA)?;
862
863        // By default flush should write the header even if nothing was added yet
864        let mut writer = Writer::new(&schema, Vec::new());
865        writer.flush()?;
866        let result = writer.into_inner()?;
867        assert_eq!(result.len(), 163);
868
869        // Unless the user indicates via the builder that the header has already been written
870        let mut writer = Writer::builder()
871            .has_header(true)
872            .schema(&schema)
873            .writer(Vec::new())
874            .build();
875        writer.flush()?;
876        let result = writer.into_inner()?;
877        assert_eq!(result.len(), 0);
878
879        Ok(())
880    }
881
882    #[test]
883    fn test_union_not_null() -> TestResult {
884        let schema = Schema::parse_str(UNION_SCHEMA)?;
885        let union = Value::Union(1, Box::new(Value::Long(3)));
886
887        let mut expected = Vec::new();
888        zig_i64(1, &mut expected)?;
889        zig_i64(3, &mut expected)?;
890
891        assert_eq!(to_avro_datum(&schema, union)?, expected);
892
893        Ok(())
894    }
895
896    #[test]
897    fn test_union_null() -> TestResult {
898        let schema = Schema::parse_str(UNION_SCHEMA)?;
899        let union = Value::Union(0, Box::new(Value::Null));
900
901        let mut expected = Vec::new();
902        zig_i64(0, &mut expected)?;
903
904        assert_eq!(to_avro_datum(&schema, union)?, expected);
905
906        Ok(())
907    }
908
909    fn logical_type_test<T: Into<Value> + Clone>(
910        schema_str: &'static str,
911
912        expected_schema: &Schema,
913        value: Value,
914
915        raw_schema: &Schema,
916        raw_value: T,
917    ) -> TestResult {
918        let schema = Schema::parse_str(schema_str)?;
919        assert_eq!(&schema, expected_schema);
920        // The serialized format should be the same as the schema.
921        let ser = to_avro_datum(&schema, value.clone())?;
922        let raw_ser = to_avro_datum(raw_schema, raw_value)?;
923        assert_eq!(ser, raw_ser);
924
925        // Should deserialize from the schema into the logical type.
926        let mut r = ser.as_slice();
927        let de = crate::from_avro_datum(&schema, &mut r, None)?;
928        assert_eq!(de, value);
929        Ok(())
930    }
931
932    #[test]
933    fn date() -> TestResult {
934        logical_type_test(
935            r#"{"type": "int", "logicalType": "date"}"#,
936            &Schema::Date,
937            Value::Date(1_i32),
938            &Schema::Int,
939            1_i32,
940        )
941    }
942
943    #[test]
944    fn time_millis() -> TestResult {
945        logical_type_test(
946            r#"{"type": "int", "logicalType": "time-millis"}"#,
947            &Schema::TimeMillis,
948            Value::TimeMillis(1_i32),
949            &Schema::Int,
950            1_i32,
951        )
952    }
953
954    #[test]
955    fn time_micros() -> TestResult {
956        logical_type_test(
957            r#"{"type": "long", "logicalType": "time-micros"}"#,
958            &Schema::TimeMicros,
959            Value::TimeMicros(1_i64),
960            &Schema::Long,
961            1_i64,
962        )
963    }
964
965    #[test]
966    fn timestamp_millis() -> TestResult {
967        logical_type_test(
968            r#"{"type": "long", "logicalType": "timestamp-millis"}"#,
969            &Schema::TimestampMillis,
970            Value::TimestampMillis(1_i64),
971            &Schema::Long,
972            1_i64,
973        )
974    }
975
976    #[test]
977    fn timestamp_micros() -> TestResult {
978        logical_type_test(
979            r#"{"type": "long", "logicalType": "timestamp-micros"}"#,
980            &Schema::TimestampMicros,
981            Value::TimestampMicros(1_i64),
982            &Schema::Long,
983            1_i64,
984        )
985    }
986
987    #[test]
988    fn decimal_fixed() -> TestResult {
989        let size = 30;
990        let inner = Schema::Fixed(FixedSchema {
991            name: Name::new("decimal")?,
992            aliases: None,
993            doc: None,
994            size,
995            default: None,
996            attributes: Default::default(),
997        });
998        let value = vec![0u8; size];
999        logical_type_test(
1000            r#"{"type": {"type": "fixed", "size": 30, "name": "decimal"}, "logicalType": "decimal", "precision": 20, "scale": 5}"#,
1001            &Schema::Decimal(DecimalSchema {
1002                precision: 20,
1003                scale: 5,
1004                inner: Box::new(inner.clone()),
1005            }),
1006            Value::Decimal(Decimal::from(value.clone())),
1007            &inner,
1008            Value::Fixed(size, value),
1009        )
1010    }
1011
1012    #[test]
1013    fn decimal_bytes() -> TestResult {
1014        let inner = Schema::Bytes;
1015        let value = vec![0u8; 10];
1016        logical_type_test(
1017            r#"{"type": "bytes", "logicalType": "decimal", "precision": 4, "scale": 3}"#,
1018            &Schema::Decimal(DecimalSchema {
1019                precision: 4,
1020                scale: 3,
1021                inner: Box::new(inner.clone()),
1022            }),
1023            Value::Decimal(Decimal::from(value.clone())),
1024            &inner,
1025            value,
1026        )
1027    }
1028
1029    #[test]
1030    fn duration() -> TestResult {
1031        let inner = Schema::Fixed(FixedSchema {
1032            name: Name::new("duration")?,
1033            aliases: None,
1034            doc: None,
1035            size: 12,
1036            default: None,
1037            attributes: Default::default(),
1038        });
1039        let value = Value::Duration(Duration::new(
1040            Months::new(256),
1041            Days::new(512),
1042            Millis::new(1024),
1043        ));
1044        logical_type_test(
1045            r#"{"type": {"type": "fixed", "name": "duration", "size": 12}, "logicalType": "duration"}"#,
1046            &Schema::Duration,
1047            value,
1048            &inner,
1049            Value::Fixed(12, vec![0, 1, 0, 0, 0, 2, 0, 0, 0, 4, 0, 0]),
1050        )
1051    }
1052
1053    #[test]
1054    fn test_writer_append() -> TestResult {
1055        let schema = Schema::parse_str(SCHEMA)?;
1056        let mut writer = Writer::new(&schema, Vec::new());
1057
1058        let mut record = Record::new(&schema).unwrap();
1059        record.put("a", 27i64);
1060        record.put("b", "foo");
1061
1062        let n1 = writer.append(record.clone())?;
1063        let n2 = writer.append(record.clone())?;
1064        let n3 = writer.flush()?;
1065        let result = writer.into_inner()?;
1066
1067        assert_eq!(n1 + n2 + n3, result.len());
1068
1069        let mut data = Vec::new();
1070        zig_i64(27, &mut data)?;
1071        zig_i64(3, &mut data)?;
1072        data.extend(b"foo");
1073        data.extend(data.clone());
1074
1075        // starts with magic
1076        assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1077        // ends with data and sync marker
1078        let last_data_byte = result.len() - 16;
1079        assert_eq!(
1080            &result[last_data_byte - data.len()..last_data_byte],
1081            data.as_slice()
1082        );
1083
1084        Ok(())
1085    }
1086
1087    #[test]
1088    fn test_writer_extend() -> TestResult {
1089        let schema = Schema::parse_str(SCHEMA)?;
1090        let mut writer = Writer::new(&schema, Vec::new());
1091
1092        let mut record = Record::new(&schema).unwrap();
1093        record.put("a", 27i64);
1094        record.put("b", "foo");
1095        let record_copy = record.clone();
1096        let records = vec![record, record_copy];
1097
1098        let n1 = writer.extend(records)?;
1099        let n2 = writer.flush()?;
1100        let result = writer.into_inner()?;
1101
1102        assert_eq!(n1 + n2, result.len());
1103
1104        let mut data = Vec::new();
1105        zig_i64(27, &mut data)?;
1106        zig_i64(3, &mut data)?;
1107        data.extend(b"foo");
1108        data.extend(data.clone());
1109
1110        // starts with magic
1111        assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1112        // ends with data and sync marker
1113        let last_data_byte = result.len() - 16;
1114        assert_eq!(
1115            &result[last_data_byte - data.len()..last_data_byte],
1116            data.as_slice()
1117        );
1118
1119        Ok(())
1120    }
1121
1122    #[derive(Debug, Clone, Deserialize, Serialize)]
1123    struct TestSerdeSerialize {
1124        a: i64,
1125        b: String,
1126    }
1127
1128    #[test]
1129    fn test_writer_append_ser() -> TestResult {
1130        let schema = Schema::parse_str(SCHEMA)?;
1131        let mut writer = Writer::new(&schema, Vec::new());
1132
1133        let record = TestSerdeSerialize {
1134            a: 27,
1135            b: "foo".to_owned(),
1136        };
1137
1138        let n1 = writer.append_ser(record)?;
1139        let n2 = writer.flush()?;
1140        let result = writer.into_inner()?;
1141
1142        assert_eq!(n1 + n2, result.len());
1143
1144        let mut data = Vec::new();
1145        zig_i64(27, &mut data)?;
1146        zig_i64(3, &mut data)?;
1147        data.extend(b"foo");
1148
1149        // starts with magic
1150        assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1151        // ends with data and sync marker
1152        let last_data_byte = result.len() - 16;
1153        assert_eq!(
1154            &result[last_data_byte - data.len()..last_data_byte],
1155            data.as_slice()
1156        );
1157
1158        Ok(())
1159    }
1160
1161    #[test]
1162    fn test_writer_extend_ser() -> TestResult {
1163        let schema = Schema::parse_str(SCHEMA)?;
1164        let mut writer = Writer::new(&schema, Vec::new());
1165
1166        let record = TestSerdeSerialize {
1167            a: 27,
1168            b: "foo".to_owned(),
1169        };
1170        let record_copy = record.clone();
1171        let records = vec![record, record_copy];
1172
1173        let n1 = writer.extend_ser(records)?;
1174        let n2 = writer.flush()?;
1175        let result = writer.into_inner()?;
1176
1177        assert_eq!(n1 + n2, result.len());
1178
1179        let mut data = Vec::new();
1180        zig_i64(27, &mut data)?;
1181        zig_i64(3, &mut data)?;
1182        data.extend(b"foo");
1183        data.extend(data.clone());
1184
1185        // starts with magic
1186        assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1187        // ends with data and sync marker
1188        let last_data_byte = result.len() - 16;
1189        assert_eq!(
1190            &result[last_data_byte - data.len()..last_data_byte],
1191            data.as_slice()
1192        );
1193
1194        Ok(())
1195    }
1196
1197    fn make_writer_with_codec(schema: &Schema) -> Writer<'_, Vec<u8>> {
1198        Writer::with_codec(
1199            schema,
1200            Vec::new(),
1201            Codec::Deflate(DeflateSettings::default()),
1202        )
1203    }
1204
1205    fn make_writer_with_builder(schema: &Schema) -> Writer<'_, Vec<u8>> {
1206        Writer::builder()
1207            .writer(Vec::new())
1208            .schema(schema)
1209            .codec(Codec::Deflate(DeflateSettings::default()))
1210            .block_size(100)
1211            .build()
1212    }
1213
1214    fn check_writer(mut writer: Writer<'_, Vec<u8>>, schema: &Schema) -> TestResult {
1215        let mut record = Record::new(schema).unwrap();
1216        record.put("a", 27i64);
1217        record.put("b", "foo");
1218
1219        let n1 = writer.append(record.clone())?;
1220        let n2 = writer.append(record.clone())?;
1221        let n3 = writer.flush()?;
1222        let result = writer.into_inner()?;
1223
1224        assert_eq!(n1 + n2 + n3, result.len());
1225
1226        let mut data = Vec::new();
1227        zig_i64(27, &mut data)?;
1228        zig_i64(3, &mut data)?;
1229        data.extend(b"foo");
1230        data.extend(data.clone());
1231        Codec::Deflate(DeflateSettings::default()).compress(&mut data)?;
1232
1233        // starts with magic
1234        assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1235        // ends with data and sync marker
1236        let last_data_byte = result.len() - 16;
1237        assert_eq!(
1238            &result[last_data_byte - data.len()..last_data_byte],
1239            data.as_slice()
1240        );
1241
1242        Ok(())
1243    }
1244
1245    #[test]
1246    fn test_writer_with_codec() -> TestResult {
1247        let schema = Schema::parse_str(SCHEMA)?;
1248        let writer = make_writer_with_codec(&schema);
1249        check_writer(writer, &schema)
1250    }
1251
1252    #[test]
1253    fn test_writer_with_builder() -> TestResult {
1254        let schema = Schema::parse_str(SCHEMA)?;
1255        let writer = make_writer_with_builder(&schema);
1256        check_writer(writer, &schema)
1257    }
1258
1259    #[test]
1260    fn test_logical_writer() -> TestResult {
1261        const LOGICAL_TYPE_SCHEMA: &str = r#"
1262        {
1263          "type": "record",
1264          "name": "logical_type_test",
1265          "fields": [
1266            {
1267              "name": "a",
1268              "type": [
1269                "null",
1270                {
1271                  "type": "long",
1272                  "logicalType": "timestamp-micros"
1273                }
1274              ]
1275            }
1276          ]
1277        }
1278        "#;
1279        let codec = Codec::Deflate(DeflateSettings::default());
1280        let schema = Schema::parse_str(LOGICAL_TYPE_SCHEMA)?;
1281        let mut writer = Writer::builder()
1282            .schema(&schema)
1283            .codec(codec)
1284            .writer(Vec::new())
1285            .build();
1286
1287        let mut record1 = Record::new(&schema).unwrap();
1288        record1.put(
1289            "a",
1290            Value::Union(1, Box::new(Value::TimestampMicros(1234_i64))),
1291        );
1292
1293        let mut record2 = Record::new(&schema).unwrap();
1294        record2.put("a", Value::Union(0, Box::new(Value::Null)));
1295
1296        let n1 = writer.append(record1)?;
1297        let n2 = writer.append(record2)?;
1298        let n3 = writer.flush()?;
1299        let result = writer.into_inner()?;
1300
1301        assert_eq!(n1 + n2 + n3, result.len());
1302
1303        let mut data = Vec::new();
1304        // byte indicating not null
1305        zig_i64(1, &mut data)?;
1306        zig_i64(1234, &mut data)?;
1307
1308        // byte indicating null
1309        zig_i64(0, &mut data)?;
1310        codec.compress(&mut data)?;
1311
1312        // starts with magic
1313        assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1314        // ends with data and sync marker
1315        let last_data_byte = result.len() - 16;
1316        assert_eq!(
1317            &result[last_data_byte - data.len()..last_data_byte],
1318            data.as_slice()
1319        );
1320
1321        Ok(())
1322    }
1323
1324    #[test]
1325    fn test_avro_3405_writer_add_metadata_success() -> TestResult {
1326        let schema = Schema::parse_str(SCHEMA)?;
1327        let mut writer = Writer::new(&schema, Vec::new());
1328
1329        writer.add_user_metadata("stringKey".to_string(), String::from("stringValue"))?;
1330        writer.add_user_metadata("strKey".to_string(), "strValue")?;
1331        writer.add_user_metadata("bytesKey".to_string(), b"bytesValue")?;
1332        writer.add_user_metadata("vecKey".to_string(), vec![1, 2, 3])?;
1333
1334        let mut record = Record::new(&schema).unwrap();
1335        record.put("a", 27i64);
1336        record.put("b", "foo");
1337
1338        writer.append(record.clone())?;
1339        writer.append(record.clone())?;
1340        writer.flush()?;
1341        let result = writer.into_inner()?;
1342
1343        assert_eq!(result.len(), 260);
1344
1345        Ok(())
1346    }
1347
1348    #[test]
1349    fn test_avro_3881_metadata_empty_body() -> TestResult {
1350        let schema = Schema::parse_str(SCHEMA)?;
1351        let mut writer = Writer::new(&schema, Vec::new());
1352        writer.add_user_metadata("a".to_string(), "b")?;
1353        let result = writer.into_inner()?;
1354
1355        let reader = Reader::with_schema(&schema, &result[..])?;
1356        let mut expected = HashMap::new();
1357        expected.insert("a".to_string(), vec![b'b']);
1358        assert_eq!(reader.user_metadata(), &expected);
1359        assert_eq!(reader.into_iter().count(), 0);
1360
1361        Ok(())
1362    }
1363
1364    #[test]
1365    fn test_avro_3405_writer_add_metadata_failure() -> TestResult {
1366        let schema = Schema::parse_str(SCHEMA)?;
1367        let mut writer = Writer::new(&schema, Vec::new());
1368
1369        let mut record = Record::new(&schema).unwrap();
1370        record.put("a", 27i64);
1371        record.put("b", "foo");
1372        writer.append(record.clone())?;
1373
1374        match writer
1375            .add_user_metadata("stringKey".to_string(), String::from("value2"))
1376            .map_err(Error::into_details)
1377        {
1378            Err(e @ Details::FileHeaderAlreadyWritten) => {
1379                assert_eq!(e.to_string(), "The file metadata is already flushed.")
1380            }
1381            Err(e) => panic!("Unexpected error occurred while writing user metadata: {e:?}"),
1382            Ok(_) => panic!("Expected an error that metadata cannot be added after adding data"),
1383        }
1384
1385        Ok(())
1386    }
1387
1388    #[test]
1389    fn test_avro_3405_writer_add_metadata_reserved_prefix_failure() -> TestResult {
1390        let schema = Schema::parse_str(SCHEMA)?;
1391        let mut writer = Writer::new(&schema, Vec::new());
1392
1393        let key = "avro.stringKey".to_string();
1394        match writer
1395            .add_user_metadata(key.clone(), "value")
1396            .map_err(Error::into_details)
1397        {
1398            Err(ref e @ Details::InvalidMetadataKey(_)) => {
1399                assert_eq!(
1400                    e.to_string(),
1401                    format!(
1402                        "Metadata keys starting with 'avro.' are reserved for internal usage: {key}."
1403                    )
1404                )
1405            }
1406            Err(e) => panic!(
1407                "Unexpected error occurred while writing user metadata with reserved prefix ('avro.'): {e:?}"
1408            ),
1409            Ok(_) => {
1410                panic!("Expected an error that the metadata key cannot be prefixed with 'avro.'")
1411            }
1412        }
1413
1414        Ok(())
1415    }
1416
1417    #[test]
1418    fn test_avro_3405_writer_add_metadata_with_builder_api_success() -> TestResult {
1419        let schema = Schema::parse_str(SCHEMA)?;
1420
1421        let mut user_meta_data: HashMap<String, Value> = HashMap::new();
1422        user_meta_data.insert(
1423            "stringKey".to_string(),
1424            Value::String("stringValue".to_string()),
1425        );
1426        user_meta_data.insert("bytesKey".to_string(), Value::Bytes(b"bytesValue".to_vec()));
1427        user_meta_data.insert("vecKey".to_string(), Value::Bytes(vec![1, 2, 3]));
1428
1429        let writer: Writer<'_, Vec<u8>> = Writer::builder()
1430            .writer(Vec::new())
1431            .schema(&schema)
1432            .user_metadata(user_meta_data.clone())
1433            .build();
1434
1435        assert_eq!(writer.user_metadata, user_meta_data);
1436
1437        Ok(())
1438    }
1439
1440    #[derive(Serialize, Clone)]
1441    struct TestSingleObjectWriter {
1442        a: i64,
1443        b: f64,
1444        c: Vec<String>,
1445    }
1446
1447    impl AvroSchema for TestSingleObjectWriter {
1448        fn get_schema() -> Schema {
1449            let schema = r#"
1450            {
1451                "type":"record",
1452                "name":"TestSingleObjectWrtierSerialize",
1453                "fields":[
1454                    {
1455                        "name":"a",
1456                        "type":"long"
1457                    },
1458                    {
1459                        "name":"b",
1460                        "type":"double"
1461                    },
1462                    {
1463                        "name":"c",
1464                        "type":{
1465                            "type":"array",
1466                            "items":"string"
1467                        }
1468                    }
1469                ]
1470            }
1471            "#;
1472            Schema::parse_str(schema).unwrap()
1473        }
1474    }
1475
1476    impl From<TestSingleObjectWriter> for Value {
1477        fn from(obj: TestSingleObjectWriter) -> Value {
1478            Value::Record(vec![
1479                ("a".into(), obj.a.into()),
1480                ("b".into(), obj.b.into()),
1481                (
1482                    "c".into(),
1483                    Value::Array(obj.c.into_iter().map(|s| s.into()).collect()),
1484                ),
1485            ])
1486        }
1487    }
1488
1489    #[test]
1490    fn test_single_object_writer() -> TestResult {
1491        let mut buf: Vec<u8> = Vec::new();
1492        let obj = TestSingleObjectWriter {
1493            a: 300,
1494            b: 34.555,
1495            c: vec!["cat".into(), "dog".into()],
1496        };
1497        let mut writer = GenericSingleObjectWriter::new_with_capacity(
1498            &TestSingleObjectWriter::get_schema(),
1499            1024,
1500        )
1501        .expect("Should resolve schema");
1502        let value = obj.into();
1503        let written_bytes = writer
1504            .write_value_ref(&value, &mut buf)
1505            .expect("Error serializing properly");
1506
1507        assert!(buf.len() > 10, "no bytes written");
1508        assert_eq!(buf.len(), written_bytes);
1509        assert_eq!(buf[0], 0xC3);
1510        assert_eq!(buf[1], 0x01);
1511        assert_eq!(
1512            &buf[2..10],
1513            &TestSingleObjectWriter::get_schema()
1514                .fingerprint::<Rabin>()
1515                .bytes[..]
1516        );
1517        let mut msg_binary = Vec::new();
1518        encode(
1519            &value,
1520            &TestSingleObjectWriter::get_schema(),
1521            &mut msg_binary,
1522        )
1523        .expect("encode should have failed by here as a dependency of any writing");
1524        assert_eq!(&buf[10..], &msg_binary[..]);
1525
1526        Ok(())
1527    }
1528
1529    #[test]
1530    fn test_single_object_writer_with_header_builder() -> TestResult {
1531        let mut buf: Vec<u8> = Vec::new();
1532        let obj = TestSingleObjectWriter {
1533            a: 300,
1534            b: 34.555,
1535            c: vec!["cat".into(), "dog".into()],
1536        };
1537        let schema_uuid = Uuid::parse_str("b2f1cf00-0434-013e-439a-125eb8485a5f")?;
1538        let header_builder = GlueSchemaUuidHeader::from_uuid(schema_uuid);
1539        let mut writer = GenericSingleObjectWriter::new_with_capacity_and_header_builder(
1540            &TestSingleObjectWriter::get_schema(),
1541            1024,
1542            header_builder,
1543        )
1544        .expect("Should resolve schema");
1545        let value = obj.into();
1546        writer
1547            .write_value_ref(&value, &mut buf)
1548            .expect("Error serializing properly");
1549
1550        assert_eq!(buf[0], 0x03);
1551        assert_eq!(buf[1], 0x00);
1552        assert_eq!(buf[2..18], schema_uuid.into_bytes()[..]);
1553        Ok(())
1554    }
1555
1556    #[test]
1557    fn test_writer_parity() -> TestResult {
1558        let obj1 = TestSingleObjectWriter {
1559            a: 300,
1560            b: 34.555,
1561            c: vec!["cat".into(), "dog".into()],
1562        };
1563
1564        let mut buf1: Vec<u8> = Vec::new();
1565        let mut buf2: Vec<u8> = Vec::new();
1566        let mut buf3: Vec<u8> = Vec::new();
1567
1568        let mut generic_writer = GenericSingleObjectWriter::new_with_capacity(
1569            &TestSingleObjectWriter::get_schema(),
1570            1024,
1571        )
1572        .expect("Should resolve schema");
1573        let mut specific_writer =
1574            SpecificSingleObjectWriter::<TestSingleObjectWriter>::with_capacity(1024)
1575                .expect("Resolved should pass");
1576        specific_writer
1577            .write(obj1.clone(), &mut buf1)
1578            .expect("Serialization expected");
1579        specific_writer
1580            .write_value(obj1.clone(), &mut buf2)
1581            .expect("Serialization expected");
1582        generic_writer
1583            .write_value(obj1.into(), &mut buf3)
1584            .expect("Serialization expected");
1585        assert_eq!(buf1, buf2);
1586        assert_eq!(buf1, buf3);
1587
1588        Ok(())
1589    }
1590
1591    #[test]
1592    fn avro_3894_take_aliases_into_account_when_serializing() -> TestResult {
1593        const SCHEMA: &str = r#"
1594  {
1595      "type": "record",
1596      "name": "Conference",
1597      "fields": [
1598          {"type": "string", "name": "name"},
1599          {"type": ["null", "long"], "name": "date", "aliases" : [ "time2", "time" ]}
1600      ]
1601  }"#;
1602
1603        #[derive(Debug, PartialEq, Eq, Clone, Serialize)]
1604        pub struct Conference {
1605            pub name: String,
1606            pub time: Option<i64>,
1607        }
1608
1609        let conf = Conference {
1610            name: "RustConf".to_string(),
1611            time: Some(1234567890),
1612        };
1613
1614        let schema = Schema::parse_str(SCHEMA)?;
1615        let mut writer = Writer::new(&schema, Vec::new());
1616
1617        let bytes = writer.append_ser(conf)?;
1618
1619        assert_eq!(198, bytes);
1620
1621        Ok(())
1622    }
1623
1624    #[test]
1625    fn avro_4014_validation_returns_a_detailed_error() -> TestResult {
1626        const SCHEMA: &str = r#"
1627  {
1628      "type": "record",
1629      "name": "Conference",
1630      "fields": [
1631          {"type": "string", "name": "name"},
1632          {"type": ["null", "long"], "name": "date", "aliases" : [ "time2", "time" ]}
1633      ]
1634  }"#;
1635
1636        #[derive(Debug, PartialEq, Clone, Serialize)]
1637        pub struct Conference {
1638            pub name: String,
1639            pub time: Option<f64>, // wrong type: f64 instead of i64
1640        }
1641
1642        let conf = Conference {
1643            name: "RustConf".to_string(),
1644            time: Some(12345678.90),
1645        };
1646
1647        let schema = Schema::parse_str(SCHEMA)?;
1648        let mut writer = Writer::new(&schema, Vec::new());
1649
1650        match writer.append_ser(conf) {
1651            Ok(bytes) => panic!("Expected an error, but got {bytes} bytes written"),
1652            Err(e) => {
1653                assert_eq!(
1654                    e.to_string(),
1655                    r#"Failed to serialize field 'time' for record Record(RecordSchema { name: Name { name: "Conference", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "name", doc: None, aliases: None, default: None, schema: String, order: Ascending, position: 0, custom_attributes: {} }, RecordField { name: "date", doc: None, aliases: Some(["time2", "time"]), default: None, schema: Union(UnionSchema { schemas: [Null, Long], variant_index: {Null: 0, Long: 1} }), order: Ascending, position: 1, custom_attributes: {} }], lookup: {"date": 1, "name": 0, "time": 1, "time2": 1}, attributes: {} }): Failed to serialize value of type f64 using schema Long: 12345678.9. Cause: Expected: Long. Got: Double"#
1656                );
1657            }
1658        }
1659        Ok(())
1660    }
1661
1662    #[test]
1663    fn avro_4063_flush_applies_to_inner_writer() -> TestResult {
1664        const SCHEMA: &str = r#"
1665        {
1666            "type": "record",
1667            "name": "ExampleSchema",
1668            "fields": [
1669                {"name": "exampleField", "type": "string"}
1670            ]
1671        }
1672        "#;
1673
1674        #[derive(Clone, Default)]
1675        struct TestBuffer(Rc<RefCell<Vec<u8>>>);
1676
1677        impl TestBuffer {
1678            fn len(&self) -> usize {
1679                self.0.borrow().len()
1680            }
1681        }
1682
1683        impl Write for TestBuffer {
1684            fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
1685                self.0.borrow_mut().write(buf)
1686            }
1687
1688            fn flush(&mut self) -> std::io::Result<()> {
1689                Ok(())
1690            }
1691        }
1692
1693        let shared_buffer = TestBuffer::default();
1694
1695        let buffered_writer = std::io::BufWriter::new(shared_buffer.clone());
1696
1697        let schema = Schema::parse_str(SCHEMA)?;
1698
1699        let mut writer = Writer::new(&schema, buffered_writer);
1700
1701        let mut record = Record::new(writer.schema()).unwrap();
1702        record.put("exampleField", "value");
1703
1704        writer.append(record)?;
1705        writer.flush()?;
1706
1707        assert_eq!(
1708            shared_buffer.len(),
1709            167,
1710            "the test buffer was not fully written to after Writer::flush was called"
1711        );
1712
1713        Ok(())
1714    }
1715}