apache_avro/
writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Logic handling writing in Avro format at user level.
19use crate::{
20    AvroResult, Codec, Error,
21    encode::{encode, encode_internal, encode_to_vec},
22    error::Details,
23    headers::{HeaderBuilder, RabinFingerprintHeader},
24    schema::{AvroSchema, Name, ResolvedOwnedSchema, ResolvedSchema, Schema},
25    ser_schema::SchemaAwareWriteSerializer,
26    types::Value,
27};
28use serde::Serialize;
29use std::{
30    collections::HashMap, io::Write, marker::PhantomData, mem::ManuallyDrop, ops::RangeInclusive,
31};
32
33const DEFAULT_BLOCK_SIZE: usize = 16000;
34const AVRO_OBJECT_HEADER: &[u8] = b"Obj\x01";
35
36/// Main interface for writing Avro formatted values.
37///
38/// It is critical to call flush before `Writer<W>` is dropped. Though dropping will attempt to flush
39/// the contents of the buffer, any errors that happen in the process of dropping will be ignored.
40/// Calling flush ensures that the buffer is empty and thus dropping will not even attempt file operations.
41pub struct Writer<'a, W: Write> {
42    schema: &'a Schema,
43    writer: W,
44    resolved_schema: ResolvedSchema<'a>,
45    codec: Codec,
46    block_size: usize,
47    buffer: Vec<u8>,
48    num_values: usize,
49    marker: [u8; 16],
50    has_header: bool,
51    user_metadata: HashMap<String, Value>,
52}
53
54#[bon::bon]
55impl<'a, W: Write> Writer<'a, W> {
56    #[builder]
57    pub fn builder(
58        schema: &'a Schema,
59        schemata: Option<Vec<&'a Schema>>,
60        writer: W,
61        #[builder(default = Codec::Null)] codec: Codec,
62        #[builder(default = DEFAULT_BLOCK_SIZE)] block_size: usize,
63        #[builder(default = generate_sync_marker())] marker: [u8; 16],
64        /// Has the header already been written.
65        ///
66        /// To disable writing the header, this can be set to `true`.
67        #[builder(default = false)]
68        has_header: bool,
69        #[builder(default)] user_metadata: HashMap<String, Value>,
70    ) -> AvroResult<Self> {
71        let resolved_schema = if let Some(schemata) = schemata {
72            ResolvedSchema::try_from(schemata)?
73        } else {
74            ResolvedSchema::try_from(schema)?
75        };
76        Ok(Self {
77            schema,
78            writer,
79            resolved_schema,
80            codec,
81            block_size,
82            buffer: Vec::with_capacity(block_size),
83            num_values: 0,
84            marker,
85            has_header,
86            user_metadata,
87        })
88    }
89}
90
91impl<'a, W: Write> Writer<'a, W> {
92    /// Creates a `Writer` given a `Schema` and something implementing the `io::Write` trait to write
93    /// to.
94    /// No compression `Codec` will be used.
95    pub fn new(schema: &'a Schema, writer: W) -> AvroResult<Self> {
96        Writer::with_codec(schema, writer, Codec::Null)
97    }
98
99    /// Creates a `Writer` with a specific `Codec` given a `Schema` and something implementing the
100    /// `io::Write` trait to write to.
101    pub fn with_codec(schema: &'a Schema, writer: W, codec: Codec) -> AvroResult<Self> {
102        Self::builder()
103            .schema(schema)
104            .writer(writer)
105            .codec(codec)
106            .build()
107    }
108
109    /// Creates a `Writer` with a specific `Codec` given a `Schema` and something implementing the
110    /// `io::Write` trait to write to.
111    /// If the `schema` is incomplete, i.e. contains `Schema::Ref`s then all dependencies must
112    /// be provided in `schemata`.
113    pub fn with_schemata(
114        schema: &'a Schema,
115        schemata: Vec<&'a Schema>,
116        writer: W,
117        codec: Codec,
118    ) -> AvroResult<Self> {
119        Self::builder()
120            .schema(schema)
121            .schemata(schemata)
122            .writer(writer)
123            .codec(codec)
124            .build()
125    }
126
127    /// Creates a `Writer` that will append values to already populated
128    /// `std::io::Write` using the provided `marker`
129    /// No compression `Codec` will be used.
130    pub fn append_to(schema: &'a Schema, writer: W, marker: [u8; 16]) -> AvroResult<Self> {
131        Writer::append_to_with_codec(schema, writer, Codec::Null, marker)
132    }
133
134    /// Creates a `Writer` that will append values to already populated
135    /// `std::io::Write` using the provided `marker`
136    pub fn append_to_with_codec(
137        schema: &'a Schema,
138        writer: W,
139        codec: Codec,
140        marker: [u8; 16],
141    ) -> AvroResult<Self> {
142        Self::builder()
143            .schema(schema)
144            .writer(writer)
145            .codec(codec)
146            .marker(marker)
147            .has_header(true)
148            .build()
149    }
150
151    /// Creates a `Writer` that will append values to already populated
152    /// `std::io::Write` using the provided `marker`
153    pub fn append_to_with_codec_schemata(
154        schema: &'a Schema,
155        schemata: Vec<&'a Schema>,
156        writer: W,
157        codec: Codec,
158        marker: [u8; 16],
159    ) -> AvroResult<Self> {
160        Self::builder()
161            .schema(schema)
162            .schemata(schemata)
163            .writer(writer)
164            .codec(codec)
165            .marker(marker)
166            .has_header(true)
167            .build()
168    }
169
170    /// Get a reference to the `Schema` associated to a `Writer`.
171    pub fn schema(&self) -> &'a Schema {
172        self.schema
173    }
174
175    /// Append a compatible value (implementing the `ToAvro` trait) to a `Writer`, also performing
176    /// schema validation.
177    ///
178    /// Returns the number of bytes written (it might be 0, see below).
179    ///
180    /// **NOTE**: This function is not guaranteed to perform any actual write, since it relies on
181    /// internal buffering for performance reasons. If you want to be sure the value has been
182    /// written, then call [`flush`](Writer::flush).
183    pub fn append<T: Into<Value>>(&mut self, value: T) -> AvroResult<usize> {
184        let n = self.maybe_write_header()?;
185
186        let avro = value.into();
187        self.append_value_ref(&avro).map(|m| m + n)
188    }
189
190    /// Append a compatible value to a `Writer`, also performing schema validation.
191    ///
192    /// Returns the number of bytes written (it might be 0, see below).
193    ///
194    /// **NOTE**: This function is not guaranteed to perform any actual write, since it relies on
195    /// internal buffering for performance reasons. If you want to be sure the value has been
196    /// written, then call [`flush`](Writer::flush).
197    pub fn append_value_ref(&mut self, value: &Value) -> AvroResult<usize> {
198        let n = self.maybe_write_header()?;
199
200        write_value_ref_resolved(self.schema, &self.resolved_schema, value, &mut self.buffer)?;
201        self.num_values += 1;
202
203        if self.buffer.len() >= self.block_size {
204            return self.flush().map(|b| b + n);
205        }
206
207        Ok(n)
208    }
209
210    /// Append anything implementing the `Serialize` trait to a `Writer` for
211    /// [`serde`](https://docs.serde.rs/serde/index.html) compatibility, also performing schema
212    /// validation.
213    ///
214    /// Returns the number of bytes written.
215    ///
216    /// **NOTE**: This function is not guaranteed to perform any actual write, since it relies on
217    /// internal buffering for performance reasons. If you want to be sure the value has been
218    /// written, then call [`flush`](Writer::flush).
219    pub fn append_ser<S: Serialize>(&mut self, value: S) -> AvroResult<usize> {
220        let n = self.maybe_write_header()?;
221
222        let mut serializer = SchemaAwareWriteSerializer::new(
223            &mut self.buffer,
224            self.schema,
225            self.resolved_schema.get_names(),
226            None,
227        );
228        value.serialize(&mut serializer)?;
229        self.num_values += 1;
230
231        if self.buffer.len() >= self.block_size {
232            return self.flush().map(|b| b + n);
233        }
234
235        Ok(n)
236    }
237
238    /// Extend a `Writer` with an `Iterator` of compatible values (implementing the `ToAvro`
239    /// trait), also performing schema validation.
240    ///
241    /// Returns the number of bytes written.
242    ///
243    /// **NOTE**: This function forces the written data to be flushed (an implicit
244    /// call to [`flush`](Writer::flush) is performed).
245    pub fn extend<I, T: Into<Value>>(&mut self, values: I) -> AvroResult<usize>
246    where
247        I: IntoIterator<Item = T>,
248    {
249        /*
250        https://github.com/rust-lang/rfcs/issues/811 :(
251        let mut stream = values
252            .filter_map(|value| value.serialize(&mut self.serializer).ok())
253            .map(|value| value.encode(self.schema))
254            .collect::<Option<Vec<_>>>()
255            .ok_or_else(|| err_msg("value does not match given schema"))?
256            .into_iter()
257            .fold(Vec::new(), |mut acc, stream| {
258                num_values += 1;
259                acc.extend(stream); acc
260            });
261        */
262
263        let mut num_bytes = 0;
264        for value in values {
265            num_bytes += self.append(value)?;
266        }
267        num_bytes += self.flush()?;
268
269        Ok(num_bytes)
270    }
271
272    /// Extend a `Writer` with an `Iterator` of anything implementing the `Serialize` trait for
273    /// [`serde`](https://docs.serde.rs/serde/index.html) compatibility, also performing schema
274    /// validation.
275    ///
276    /// Returns the number of bytes written.
277    ///
278    /// **NOTE**: This function forces the written data to be flushed (an implicit
279    /// call to [`flush`](Writer::flush) is performed).
280    pub fn extend_ser<I, T: Serialize>(&mut self, values: I) -> AvroResult<usize>
281    where
282        I: IntoIterator<Item = T>,
283    {
284        /*
285        https://github.com/rust-lang/rfcs/issues/811 :(
286        let mut stream = values
287            .filter_map(|value| value.serialize(&mut self.serializer).ok())
288            .map(|value| value.encode(self.schema))
289            .collect::<Option<Vec<_>>>()
290            .ok_or_else(|| err_msg("value does not match given schema"))?
291            .into_iter()
292            .fold(Vec::new(), |mut acc, stream| {
293                num_values += 1;
294                acc.extend(stream); acc
295            });
296        */
297
298        let mut num_bytes = 0;
299        for value in values {
300            num_bytes += self.append_ser(value)?;
301        }
302        num_bytes += self.flush()?;
303
304        Ok(num_bytes)
305    }
306
307    /// Extend a `Writer` by appending each `Value` from a slice, while also performing schema
308    /// validation on each value appended.
309    ///
310    /// Returns the number of bytes written.
311    ///
312    /// **NOTE**: This function forces the written data to be flushed (an implicit
313    /// call to [`flush`](Writer::flush) is performed).
314    pub fn extend_from_slice(&mut self, values: &[Value]) -> AvroResult<usize> {
315        let mut num_bytes = 0;
316        for value in values {
317            num_bytes += self.append_value_ref(value)?;
318        }
319        num_bytes += self.flush()?;
320
321        Ok(num_bytes)
322    }
323
324    /// Flush the content to the inner `Writer`.
325    ///
326    /// Call this function to make sure all the content has been written before releasing the `Writer`.
327    /// This will also write the header if it wasn't written yet and hasn't been disabled using
328    /// [`WriterBuilder::has_header`].
329    ///
330    /// Returns the number of bytes written.
331    pub fn flush(&mut self) -> AvroResult<usize> {
332        let mut num_bytes = self.maybe_write_header()?;
333        if self.num_values == 0 {
334            return Ok(num_bytes);
335        }
336
337        self.codec.compress(&mut self.buffer)?;
338
339        let num_values = self.num_values;
340        let stream_len = self.buffer.len();
341
342        num_bytes += self.append_raw(&num_values.into(), &Schema::Long)?
343            + self.append_raw(&stream_len.into(), &Schema::Long)?
344            + self
345                .writer
346                .write(self.buffer.as_ref())
347                .map_err(Details::WriteBytes)?
348            + self.append_marker()?;
349
350        self.buffer.clear();
351        self.num_values = 0;
352
353        self.writer.flush().map_err(Details::FlushWriter)?;
354
355        Ok(num_bytes)
356    }
357
358    /// Return what the `Writer` is writing to, consuming the `Writer` itself.
359    ///
360    /// **NOTE**: This function forces the written data to be flushed (an implicit
361    /// call to [`flush`](Writer::flush) is performed).
362    pub fn into_inner(mut self) -> AvroResult<W> {
363        self.maybe_write_header()?;
364        self.flush()?;
365
366        let mut this = ManuallyDrop::new(self);
367
368        // Extract every member that is not Copy and therefore should be dropped
369        let _buffer = std::mem::take(&mut this.buffer);
370        let _user_metadata = std::mem::take(&mut this.user_metadata);
371        // SAFETY: resolved schema is not accessed after this and won't be dropped because of ManuallyDrop
372        unsafe { std::ptr::drop_in_place(&mut this.resolved_schema) };
373
374        // SAFETY: double-drops are prevented by putting `this` in a ManuallyDrop that is never dropped
375        let writer = unsafe { std::ptr::read(&this.writer) };
376
377        Ok(writer)
378    }
379
380    /// Gets a reference to the underlying writer.
381    ///
382    /// **NOTE**: There is likely data still in the buffer. To have all the data
383    /// in the writer call [`flush`](Writer::flush) first.
384    pub fn get_ref(&self) -> &W {
385        &self.writer
386    }
387
388    /// Gets a mutable reference to the underlying writer.
389    ///
390    /// It is inadvisable to directly write to the underlying writer.
391    ///
392    /// **NOTE**: There is likely data still in the buffer. To have all the data
393    /// in the writer call [`flush`](Writer::flush) first.
394    pub fn get_mut(&mut self) -> &mut W {
395        &mut self.writer
396    }
397
398    /// Generate and append synchronization marker to the payload.
399    fn append_marker(&mut self) -> AvroResult<usize> {
400        // using .writer.write directly to avoid mutable borrow of self
401        // with ref borrowing of self.marker
402        self.writer
403            .write(&self.marker)
404            .map_err(|e| Details::WriteMarker(e).into())
405    }
406
407    /// Append a raw Avro Value to the payload avoiding to encode it again.
408    fn append_raw(&mut self, value: &Value, schema: &Schema) -> AvroResult<usize> {
409        self.append_bytes(encode_to_vec(value, schema)?.as_ref())
410    }
411
412    /// Append pure bytes to the payload.
413    fn append_bytes(&mut self, bytes: &[u8]) -> AvroResult<usize> {
414        self.writer
415            .write(bytes)
416            .map_err(|e| Details::WriteBytes(e).into())
417    }
418
419    /// Adds custom metadata to the file.
420    /// This method could be used only before adding the first record to the writer.
421    pub fn add_user_metadata<T: AsRef<[u8]>>(&mut self, key: String, value: T) -> AvroResult<()> {
422        if !self.has_header {
423            if key.starts_with("avro.") {
424                return Err(Details::InvalidMetadataKey(key).into());
425            }
426            self.user_metadata
427                .insert(key, Value::Bytes(value.as_ref().to_vec()));
428            Ok(())
429        } else {
430            Err(Details::FileHeaderAlreadyWritten.into())
431        }
432    }
433
434    /// Create an Avro header based on schema, codec and sync marker.
435    fn header(&self) -> Result<Vec<u8>, Error> {
436        let schema_bytes = serde_json::to_string(self.schema)
437            .map_err(Details::ConvertJsonToString)?
438            .into_bytes();
439
440        let mut metadata = HashMap::with_capacity(2);
441        metadata.insert("avro.schema", Value::Bytes(schema_bytes));
442        if self.codec != Codec::Null {
443            metadata.insert("avro.codec", self.codec.into());
444        }
445        match self.codec {
446            #[cfg(feature = "bzip")]
447            Codec::Bzip2(settings) => {
448                metadata.insert(
449                    "avro.codec.compression_level",
450                    Value::Bytes(vec![settings.compression_level]),
451                );
452            }
453            #[cfg(feature = "xz")]
454            Codec::Xz(settings) => {
455                metadata.insert(
456                    "avro.codec.compression_level",
457                    Value::Bytes(vec![settings.compression_level]),
458                );
459            }
460            #[cfg(feature = "zstandard")]
461            Codec::Zstandard(settings) => {
462                metadata.insert(
463                    "avro.codec.compression_level",
464                    Value::Bytes(vec![settings.compression_level]),
465                );
466            }
467            _ => {}
468        }
469
470        for (k, v) in &self.user_metadata {
471            metadata.insert(k.as_str(), v.clone());
472        }
473
474        let mut header = Vec::new();
475        header.extend_from_slice(AVRO_OBJECT_HEADER);
476        encode(&metadata.into(), &Schema::map(Schema::Bytes), &mut header)?;
477        header.extend_from_slice(&self.marker);
478
479        Ok(header)
480    }
481
482    fn maybe_write_header(&mut self) -> AvroResult<usize> {
483        if !self.has_header {
484            let header = self.header()?;
485            let n = self.append_bytes(header.as_ref())?;
486            self.has_header = true;
487            Ok(n)
488        } else {
489            Ok(0)
490        }
491    }
492}
493
494impl<W: Write> Drop for Writer<'_, W> {
495    /// Drop the writer, will try to flush ignoring any errors.
496    fn drop(&mut self) {
497        let _ = self.maybe_write_header();
498        let _ = self.flush();
499    }
500}
501
502/// Encode a compatible value (implementing the `ToAvro` trait) into Avro format, also performing
503/// schema validation.
504///
505/// This is an internal function which gets the bytes buffer where to write as parameter instead of
506/// creating a new one like `to_avro_datum`.
507fn write_avro_datum<T: Into<Value>, W: Write>(
508    schema: &Schema,
509    value: T,
510    writer: &mut W,
511) -> Result<(), Error> {
512    let avro = value.into();
513    if !avro.validate(schema) {
514        return Err(Details::Validation.into());
515    }
516    encode(&avro, schema, writer)?;
517    Ok(())
518}
519
520fn write_avro_datum_schemata<T: Into<Value>>(
521    schema: &Schema,
522    schemata: Vec<&Schema>,
523    value: T,
524    buffer: &mut Vec<u8>,
525) -> AvroResult<usize> {
526    let avro = value.into();
527    let rs = ResolvedSchema::try_from(schemata)?;
528    let names = rs.get_names();
529    let enclosing_namespace = schema.namespace();
530    if let Some(_err) = avro.validate_internal(schema, names, &enclosing_namespace) {
531        return Err(Details::Validation.into());
532    }
533    encode_internal(&avro, schema, names, &enclosing_namespace, buffer)
534}
535
536/// Writer that encodes messages according to the single object encoding v1 spec
537/// Uses an API similar to the current File Writer
538/// Writes all object bytes at once, and drains internal buffer
539pub struct GenericSingleObjectWriter {
540    buffer: Vec<u8>,
541    resolved: ResolvedOwnedSchema,
542}
543
544impl GenericSingleObjectWriter {
545    pub fn new_with_capacity(
546        schema: &Schema,
547        initial_buffer_cap: usize,
548    ) -> AvroResult<GenericSingleObjectWriter> {
549        let header_builder = RabinFingerprintHeader::from_schema(schema);
550        Self::new_with_capacity_and_header_builder(schema, initial_buffer_cap, header_builder)
551    }
552
553    pub fn new_with_capacity_and_header_builder<HB: HeaderBuilder>(
554        schema: &Schema,
555        initial_buffer_cap: usize,
556        header_builder: HB,
557    ) -> AvroResult<GenericSingleObjectWriter> {
558        let mut buffer = Vec::with_capacity(initial_buffer_cap);
559        let header = header_builder.build_header();
560        buffer.extend_from_slice(&header);
561
562        Ok(GenericSingleObjectWriter {
563            buffer,
564            resolved: ResolvedOwnedSchema::try_from(schema.clone())?,
565        })
566    }
567
568    const HEADER_LENGTH_RANGE: RangeInclusive<usize> = 10_usize..=20_usize;
569
570    /// Write the referenced Value to the provided Write object. Returns a result with the number of bytes written including the header
571    pub fn write_value_ref<W: Write>(&mut self, v: &Value, writer: &mut W) -> AvroResult<usize> {
572        let original_length = self.buffer.len();
573        if !Self::HEADER_LENGTH_RANGE.contains(&original_length) {
574            Err(Details::IllegalSingleObjectWriterState.into())
575        } else {
576            write_value_ref_owned_resolved(&self.resolved, v, &mut self.buffer)?;
577            writer
578                .write_all(&self.buffer)
579                .map_err(Details::WriteBytes)?;
580            let len = self.buffer.len();
581            self.buffer.truncate(original_length);
582            Ok(len)
583        }
584    }
585
586    /// Write the Value to the provided Write object. Returns a result with the number of bytes written including the header
587    pub fn write_value<W: Write>(&mut self, v: Value, writer: &mut W) -> AvroResult<usize> {
588        self.write_value_ref(&v, writer)
589    }
590}
591
592/// Writer that encodes messages according to the single object encoding v1 spec
593pub struct SpecificSingleObjectWriter<T>
594where
595    T: AvroSchema,
596{
597    inner: GenericSingleObjectWriter,
598    schema: Schema,
599    header_written: bool,
600    _model: PhantomData<T>,
601}
602
603impl<T> SpecificSingleObjectWriter<T>
604where
605    T: AvroSchema,
606{
607    pub fn with_capacity(buffer_cap: usize) -> AvroResult<SpecificSingleObjectWriter<T>> {
608        let schema = T::get_schema();
609        Ok(SpecificSingleObjectWriter {
610            inner: GenericSingleObjectWriter::new_with_capacity(&schema, buffer_cap)?,
611            schema,
612            header_written: false,
613            _model: PhantomData,
614        })
615    }
616}
617
618impl<T> SpecificSingleObjectWriter<T>
619where
620    T: AvroSchema + Into<Value>,
621{
622    /// Write the `Into<Value>` to the provided Write object. Returns a result with the number
623    /// of bytes written including the header
624    pub fn write_value<W: Write>(&mut self, data: T, writer: &mut W) -> AvroResult<usize> {
625        let v: Value = data.into();
626        self.inner.write_value_ref(&v, writer)
627    }
628}
629
630impl<T> SpecificSingleObjectWriter<T>
631where
632    T: AvroSchema + Serialize,
633{
634    /// Write the referenced `Serialize` object to the provided `Write` object. Returns a result with
635    /// the number of bytes written including the header
636    pub fn write_ref<W: Write>(&mut self, data: &T, writer: &mut W) -> AvroResult<usize> {
637        let mut bytes_written: usize = 0;
638
639        if !self.header_written {
640            bytes_written += writer
641                .write(self.inner.buffer.as_slice())
642                .map_err(Details::WriteBytes)?;
643            self.header_written = true;
644        }
645
646        bytes_written += write_avro_datum_ref(&self.schema, data, writer)?;
647
648        Ok(bytes_written)
649    }
650
651    /// Write the Serialize object to the provided Write object. Returns a result with the number
652    /// of bytes written including the header
653    pub fn write<W: Write>(&mut self, data: T, writer: &mut W) -> AvroResult<usize> {
654        self.write_ref(&data, writer)
655    }
656}
657
658fn write_value_ref_resolved(
659    schema: &Schema,
660    resolved_schema: &ResolvedSchema,
661    value: &Value,
662    buffer: &mut Vec<u8>,
663) -> AvroResult<usize> {
664    match value.validate_internal(schema, resolved_schema.get_names(), &schema.namespace()) {
665        Some(reason) => Err(Details::ValidationWithReason {
666            value: value.clone(),
667            schema: schema.clone(),
668            reason,
669        }
670        .into()),
671        None => encode_internal(
672            value,
673            schema,
674            resolved_schema.get_names(),
675            &schema.namespace(),
676            buffer,
677        ),
678    }
679}
680
681fn write_value_ref_owned_resolved(
682    resolved_schema: &ResolvedOwnedSchema,
683    value: &Value,
684    buffer: &mut Vec<u8>,
685) -> AvroResult<()> {
686    let root_schema = resolved_schema.get_root_schema();
687    if let Some(reason) = value.validate_internal(
688        root_schema,
689        resolved_schema.get_names(),
690        &root_schema.namespace(),
691    ) {
692        return Err(Details::ValidationWithReason {
693            value: value.clone(),
694            schema: root_schema.clone(),
695            reason,
696        }
697        .into());
698    }
699    encode_internal(
700        value,
701        root_schema,
702        resolved_schema.get_names(),
703        &root_schema.namespace(),
704        buffer,
705    )?;
706    Ok(())
707}
708
709/// Encode a compatible value (implementing the `ToAvro` trait) into Avro format, also
710/// performing schema validation.
711///
712/// **NOTE**: This function has a quite small niche of usage and does NOT generate headers and sync
713/// markers; use [`Writer`] to be fully Avro-compatible if you don't know what
714/// you are doing, instead.
715pub fn to_avro_datum<T: Into<Value>>(schema: &Schema, value: T) -> AvroResult<Vec<u8>> {
716    let mut buffer = Vec::new();
717    write_avro_datum(schema, value, &mut buffer)?;
718    Ok(buffer)
719}
720
721/// Write the referenced [Serialize]able object to the provided [Write] object.
722/// Returns a result with the number of bytes written.
723///
724/// **NOTE**: This function has a quite small niche of usage and does **NOT** generate headers and sync
725/// markers; use [`append_ser`](Writer::append_ser) to be fully Avro-compatible
726/// if you don't know what you are doing, instead.
727pub fn write_avro_datum_ref<T: Serialize, W: Write>(
728    schema: &Schema,
729    data: &T,
730    writer: &mut W,
731) -> AvroResult<usize> {
732    let names: HashMap<Name, &Schema> = HashMap::new();
733    let mut serializer = SchemaAwareWriteSerializer::new(writer, schema, &names, None);
734    let bytes_written = data.serialize(&mut serializer)?;
735    Ok(bytes_written)
736}
737
738/// Encode a compatible value (implementing the `ToAvro` trait) into Avro format, also
739/// performing schema validation.
740/// If the provided `schema` is incomplete then its dependencies must be
741/// provided in `schemata`
742pub fn to_avro_datum_schemata<T: Into<Value>>(
743    schema: &Schema,
744    schemata: Vec<&Schema>,
745    value: T,
746) -> AvroResult<Vec<u8>> {
747    let mut buffer = Vec::new();
748    write_avro_datum_schemata(schema, schemata, value, &mut buffer)?;
749    Ok(buffer)
750}
751
752#[cfg(not(target_arch = "wasm32"))]
753fn generate_sync_marker() -> [u8; 16] {
754    let mut marker = [0_u8; 16];
755    std::iter::repeat_with(rand::random)
756        .take(16)
757        .enumerate()
758        .for_each(|(i, n)| marker[i] = n);
759    marker
760}
761
762#[cfg(target_arch = "wasm32")]
763fn generate_sync_marker() -> [u8; 16] {
764    let mut marker = [0_u8; 16];
765    std::iter::repeat_with(quad_rand::rand)
766        .take(4)
767        .flat_map(|i| i.to_be_bytes())
768        .enumerate()
769        .for_each(|(i, n)| marker[i] = n);
770    marker
771}
772
773#[cfg(test)]
774mod tests {
775    use std::{cell::RefCell, rc::Rc};
776
777    use super::*;
778    use crate::{
779        Reader,
780        decimal::Decimal,
781        duration::{Days, Duration, Millis, Months},
782        headers::GlueSchemaUuidHeader,
783        rabin::Rabin,
784        schema::{DecimalSchema, FixedSchema, Name},
785        types::Record,
786        util::zig_i64,
787    };
788    use pretty_assertions::assert_eq;
789    use serde::{Deserialize, Serialize};
790    use uuid::Uuid;
791
792    use crate::{codec::DeflateSettings, error::Details};
793    use apache_avro_test_helper::TestResult;
794
795    const AVRO_OBJECT_HEADER_LEN: usize = AVRO_OBJECT_HEADER.len();
796
797    const SCHEMA: &str = r#"
798    {
799      "type": "record",
800      "name": "test",
801      "fields": [
802        {
803          "name": "a",
804          "type": "long",
805          "default": 42
806        },
807        {
808          "name": "b",
809          "type": "string"
810        }
811      ]
812    }
813    "#;
814
815    const UNION_SCHEMA: &str = r#"["null", "long"]"#;
816
817    #[test]
818    fn test_to_avro_datum() -> TestResult {
819        let schema = Schema::parse_str(SCHEMA)?;
820        let mut record = Record::new(&schema).unwrap();
821        record.put("a", 27i64);
822        record.put("b", "foo");
823
824        let mut expected = Vec::new();
825        zig_i64(27, &mut expected)?;
826        zig_i64(3, &mut expected)?;
827        expected.extend([b'f', b'o', b'o']);
828
829        assert_eq!(to_avro_datum(&schema, record)?, expected);
830
831        Ok(())
832    }
833
834    #[test]
835    fn avro_rs_193_write_avro_datum_ref() -> TestResult {
836        #[derive(Serialize)]
837        struct TestStruct {
838            a: i64,
839            b: String,
840        }
841
842        let schema = Schema::parse_str(SCHEMA)?;
843        let mut writer: Vec<u8> = Vec::new();
844        let data = TestStruct {
845            a: 27,
846            b: "foo".to_string(),
847        };
848
849        let mut expected = Vec::new();
850        zig_i64(27, &mut expected)?;
851        zig_i64(3, &mut expected)?;
852        expected.extend([b'f', b'o', b'o']);
853
854        let bytes = write_avro_datum_ref(&schema, &data, &mut writer)?;
855
856        assert_eq!(bytes, expected.len());
857        assert_eq!(writer, expected);
858
859        Ok(())
860    }
861
862    #[test]
863    fn avro_rs_220_flush_write_header() -> TestResult {
864        let schema = Schema::parse_str(SCHEMA)?;
865
866        // By default flush should write the header even if nothing was added yet
867        let mut writer = Writer::new(&schema, Vec::new())?;
868        writer.flush()?;
869        let result = writer.into_inner()?;
870        assert_eq!(result.len(), 147);
871
872        // Unless the user indicates via the builder that the header has already been written
873        let mut writer = Writer::builder()
874            .has_header(true)
875            .schema(&schema)
876            .writer(Vec::new())
877            .build()?;
878        writer.flush()?;
879        let result = writer.into_inner()?;
880        assert_eq!(result.len(), 0);
881
882        Ok(())
883    }
884
885    #[test]
886    fn test_union_not_null() -> TestResult {
887        let schema = Schema::parse_str(UNION_SCHEMA)?;
888        let union = Value::Union(1, Box::new(Value::Long(3)));
889
890        let mut expected = Vec::new();
891        zig_i64(1, &mut expected)?;
892        zig_i64(3, &mut expected)?;
893
894        assert_eq!(to_avro_datum(&schema, union)?, expected);
895
896        Ok(())
897    }
898
899    #[test]
900    fn test_union_null() -> TestResult {
901        let schema = Schema::parse_str(UNION_SCHEMA)?;
902        let union = Value::Union(0, Box::new(Value::Null));
903
904        let mut expected = Vec::new();
905        zig_i64(0, &mut expected)?;
906
907        assert_eq!(to_avro_datum(&schema, union)?, expected);
908
909        Ok(())
910    }
911
912    fn logical_type_test<T: Into<Value> + Clone>(
913        schema_str: &'static str,
914
915        expected_schema: &Schema,
916        value: Value,
917
918        raw_schema: &Schema,
919        raw_value: T,
920    ) -> TestResult {
921        let schema = Schema::parse_str(schema_str)?;
922        assert_eq!(&schema, expected_schema);
923        // The serialized format should be the same as the schema.
924        let ser = to_avro_datum(&schema, value.clone())?;
925        let raw_ser = to_avro_datum(raw_schema, raw_value)?;
926        assert_eq!(ser, raw_ser);
927
928        // Should deserialize from the schema into the logical type.
929        let mut r = ser.as_slice();
930        let de = crate::from_avro_datum(&schema, &mut r, None)?;
931        assert_eq!(de, value);
932        Ok(())
933    }
934
935    #[test]
936    fn date() -> TestResult {
937        logical_type_test(
938            r#"{"type": "int", "logicalType": "date"}"#,
939            &Schema::Date,
940            Value::Date(1_i32),
941            &Schema::Int,
942            1_i32,
943        )
944    }
945
946    #[test]
947    fn time_millis() -> TestResult {
948        logical_type_test(
949            r#"{"type": "int", "logicalType": "time-millis"}"#,
950            &Schema::TimeMillis,
951            Value::TimeMillis(1_i32),
952            &Schema::Int,
953            1_i32,
954        )
955    }
956
957    #[test]
958    fn time_micros() -> TestResult {
959        logical_type_test(
960            r#"{"type": "long", "logicalType": "time-micros"}"#,
961            &Schema::TimeMicros,
962            Value::TimeMicros(1_i64),
963            &Schema::Long,
964            1_i64,
965        )
966    }
967
968    #[test]
969    fn timestamp_millis() -> TestResult {
970        logical_type_test(
971            r#"{"type": "long", "logicalType": "timestamp-millis"}"#,
972            &Schema::TimestampMillis,
973            Value::TimestampMillis(1_i64),
974            &Schema::Long,
975            1_i64,
976        )
977    }
978
979    #[test]
980    fn timestamp_micros() -> TestResult {
981        logical_type_test(
982            r#"{"type": "long", "logicalType": "timestamp-micros"}"#,
983            &Schema::TimestampMicros,
984            Value::TimestampMicros(1_i64),
985            &Schema::Long,
986            1_i64,
987        )
988    }
989
990    #[test]
991    fn decimal_fixed() -> TestResult {
992        let size = 30;
993        let inner = Schema::Fixed(FixedSchema {
994            name: Name::new("decimal")?,
995            aliases: None,
996            doc: None,
997            size,
998            default: None,
999            attributes: Default::default(),
1000        });
1001        let value = vec![0u8; size];
1002        logical_type_test(
1003            r#"{"type": {"type": "fixed", "size": 30, "name": "decimal"}, "logicalType": "decimal", "precision": 20, "scale": 5}"#,
1004            &Schema::Decimal(DecimalSchema {
1005                precision: 20,
1006                scale: 5,
1007                inner: Box::new(inner.clone()),
1008            }),
1009            Value::Decimal(Decimal::from(value.clone())),
1010            &inner,
1011            Value::Fixed(size, value),
1012        )
1013    }
1014
1015    #[test]
1016    fn decimal_bytes() -> TestResult {
1017        let inner = Schema::Bytes;
1018        let value = vec![0u8; 10];
1019        logical_type_test(
1020            r#"{"type": "bytes", "logicalType": "decimal", "precision": 4, "scale": 3}"#,
1021            &Schema::Decimal(DecimalSchema {
1022                precision: 4,
1023                scale: 3,
1024                inner: Box::new(inner.clone()),
1025            }),
1026            Value::Decimal(Decimal::from(value.clone())),
1027            &inner,
1028            value,
1029        )
1030    }
1031
1032    #[test]
1033    fn duration() -> TestResult {
1034        let inner = Schema::Fixed(FixedSchema {
1035            name: Name::new("duration")?,
1036            aliases: None,
1037            doc: None,
1038            size: 12,
1039            default: None,
1040            attributes: Default::default(),
1041        });
1042        let value = Value::Duration(Duration::new(
1043            Months::new(256),
1044            Days::new(512),
1045            Millis::new(1024),
1046        ));
1047        logical_type_test(
1048            r#"{"type": {"type": "fixed", "name": "duration", "size": 12}, "logicalType": "duration"}"#,
1049            &Schema::Duration,
1050            value,
1051            &inner,
1052            Value::Fixed(12, vec![0, 1, 0, 0, 0, 2, 0, 0, 0, 4, 0, 0]),
1053        )
1054    }
1055
1056    #[test]
1057    fn test_writer_append() -> TestResult {
1058        let schema = Schema::parse_str(SCHEMA)?;
1059        let mut writer = Writer::new(&schema, Vec::new())?;
1060
1061        let mut record = Record::new(&schema).unwrap();
1062        record.put("a", 27i64);
1063        record.put("b", "foo");
1064
1065        let n1 = writer.append(record.clone())?;
1066        let n2 = writer.append(record.clone())?;
1067        let n3 = writer.flush()?;
1068        let result = writer.into_inner()?;
1069
1070        assert_eq!(n1 + n2 + n3, result.len());
1071
1072        let mut data = Vec::new();
1073        zig_i64(27, &mut data)?;
1074        zig_i64(3, &mut data)?;
1075        data.extend(b"foo");
1076        data.extend(data.clone());
1077
1078        // starts with magic
1079        assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1080        // ends with data and sync marker
1081        let last_data_byte = result.len() - 16;
1082        assert_eq!(
1083            &result[last_data_byte - data.len()..last_data_byte],
1084            data.as_slice()
1085        );
1086
1087        Ok(())
1088    }
1089
1090    #[test]
1091    fn test_writer_extend() -> TestResult {
1092        let schema = Schema::parse_str(SCHEMA)?;
1093        let mut writer = Writer::new(&schema, Vec::new())?;
1094
1095        let mut record = Record::new(&schema).unwrap();
1096        record.put("a", 27i64);
1097        record.put("b", "foo");
1098        let record_copy = record.clone();
1099        let records = vec![record, record_copy];
1100
1101        let n1 = writer.extend(records)?;
1102        let n2 = writer.flush()?;
1103        let result = writer.into_inner()?;
1104
1105        assert_eq!(n1 + n2, result.len());
1106
1107        let mut data = Vec::new();
1108        zig_i64(27, &mut data)?;
1109        zig_i64(3, &mut data)?;
1110        data.extend(b"foo");
1111        data.extend(data.clone());
1112
1113        // starts with magic
1114        assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1115        // ends with data and sync marker
1116        let last_data_byte = result.len() - 16;
1117        assert_eq!(
1118            &result[last_data_byte - data.len()..last_data_byte],
1119            data.as_slice()
1120        );
1121
1122        Ok(())
1123    }
1124
1125    #[derive(Debug, Clone, Deserialize, Serialize)]
1126    struct TestSerdeSerialize {
1127        a: i64,
1128        b: String,
1129    }
1130
1131    #[test]
1132    fn test_writer_append_ser() -> TestResult {
1133        let schema = Schema::parse_str(SCHEMA)?;
1134        let mut writer = Writer::new(&schema, Vec::new())?;
1135
1136        let record = TestSerdeSerialize {
1137            a: 27,
1138            b: "foo".to_owned(),
1139        };
1140
1141        let n1 = writer.append_ser(record)?;
1142        let n2 = writer.flush()?;
1143        let result = writer.into_inner()?;
1144
1145        assert_eq!(n1 + n2, result.len());
1146
1147        let mut data = Vec::new();
1148        zig_i64(27, &mut data)?;
1149        zig_i64(3, &mut data)?;
1150        data.extend(b"foo");
1151
1152        // starts with magic
1153        assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1154        // ends with data and sync marker
1155        let last_data_byte = result.len() - 16;
1156        assert_eq!(
1157            &result[last_data_byte - data.len()..last_data_byte],
1158            data.as_slice()
1159        );
1160
1161        Ok(())
1162    }
1163
1164    #[test]
1165    fn test_writer_extend_ser() -> TestResult {
1166        let schema = Schema::parse_str(SCHEMA)?;
1167        let mut writer = Writer::new(&schema, Vec::new())?;
1168
1169        let record = TestSerdeSerialize {
1170            a: 27,
1171            b: "foo".to_owned(),
1172        };
1173        let record_copy = record.clone();
1174        let records = vec![record, record_copy];
1175
1176        let n1 = writer.extend_ser(records)?;
1177        let n2 = writer.flush()?;
1178        let result = writer.into_inner()?;
1179
1180        assert_eq!(n1 + n2, result.len());
1181
1182        let mut data = Vec::new();
1183        zig_i64(27, &mut data)?;
1184        zig_i64(3, &mut data)?;
1185        data.extend(b"foo");
1186        data.extend(data.clone());
1187
1188        // starts with magic
1189        assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1190        // ends with data and sync marker
1191        let last_data_byte = result.len() - 16;
1192        assert_eq!(
1193            &result[last_data_byte - data.len()..last_data_byte],
1194            data.as_slice()
1195        );
1196
1197        Ok(())
1198    }
1199
1200    fn make_writer_with_codec(schema: &Schema) -> AvroResult<Writer<'_, Vec<u8>>> {
1201        Writer::with_codec(
1202            schema,
1203            Vec::new(),
1204            Codec::Deflate(DeflateSettings::default()),
1205        )
1206    }
1207
1208    fn make_writer_with_builder(schema: &Schema) -> AvroResult<Writer<'_, Vec<u8>>> {
1209        Writer::builder()
1210            .writer(Vec::new())
1211            .schema(schema)
1212            .codec(Codec::Deflate(DeflateSettings::default()))
1213            .block_size(100)
1214            .build()
1215    }
1216
1217    fn check_writer(mut writer: Writer<'_, Vec<u8>>, schema: &Schema) -> TestResult {
1218        let mut record = Record::new(schema).unwrap();
1219        record.put("a", 27i64);
1220        record.put("b", "foo");
1221
1222        let n1 = writer.append(record.clone())?;
1223        let n2 = writer.append(record.clone())?;
1224        let n3 = writer.flush()?;
1225        let result = writer.into_inner()?;
1226
1227        assert_eq!(n1 + n2 + n3, result.len());
1228
1229        let mut data = Vec::new();
1230        zig_i64(27, &mut data)?;
1231        zig_i64(3, &mut data)?;
1232        data.extend(b"foo");
1233        data.extend(data.clone());
1234        Codec::Deflate(DeflateSettings::default()).compress(&mut data)?;
1235
1236        // starts with magic
1237        assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1238        // ends with data and sync marker
1239        let last_data_byte = result.len() - 16;
1240        assert_eq!(
1241            &result[last_data_byte - data.len()..last_data_byte],
1242            data.as_slice()
1243        );
1244
1245        Ok(())
1246    }
1247
1248    #[test]
1249    fn test_writer_with_codec() -> TestResult {
1250        let schema = Schema::parse_str(SCHEMA)?;
1251        let writer = make_writer_with_codec(&schema)?;
1252        check_writer(writer, &schema)
1253    }
1254
1255    #[test]
1256    fn test_writer_with_builder() -> TestResult {
1257        let schema = Schema::parse_str(SCHEMA)?;
1258        let writer = make_writer_with_builder(&schema)?;
1259        check_writer(writer, &schema)
1260    }
1261
1262    #[test]
1263    fn test_logical_writer() -> TestResult {
1264        const LOGICAL_TYPE_SCHEMA: &str = r#"
1265        {
1266          "type": "record",
1267          "name": "logical_type_test",
1268          "fields": [
1269            {
1270              "name": "a",
1271              "type": [
1272                "null",
1273                {
1274                  "type": "long",
1275                  "logicalType": "timestamp-micros"
1276                }
1277              ]
1278            }
1279          ]
1280        }
1281        "#;
1282        let codec = Codec::Deflate(DeflateSettings::default());
1283        let schema = Schema::parse_str(LOGICAL_TYPE_SCHEMA)?;
1284        let mut writer = Writer::builder()
1285            .schema(&schema)
1286            .codec(codec)
1287            .writer(Vec::new())
1288            .build()?;
1289
1290        let mut record1 = Record::new(&schema).unwrap();
1291        record1.put(
1292            "a",
1293            Value::Union(1, Box::new(Value::TimestampMicros(1234_i64))),
1294        );
1295
1296        let mut record2 = Record::new(&schema).unwrap();
1297        record2.put("a", Value::Union(0, Box::new(Value::Null)));
1298
1299        let n1 = writer.append(record1)?;
1300        let n2 = writer.append(record2)?;
1301        let n3 = writer.flush()?;
1302        let result = writer.into_inner()?;
1303
1304        assert_eq!(n1 + n2 + n3, result.len());
1305
1306        let mut data = Vec::new();
1307        // byte indicating not null
1308        zig_i64(1, &mut data)?;
1309        zig_i64(1234, &mut data)?;
1310
1311        // byte indicating null
1312        zig_i64(0, &mut data)?;
1313        codec.compress(&mut data)?;
1314
1315        // starts with magic
1316        assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1317        // ends with data and sync marker
1318        let last_data_byte = result.len() - 16;
1319        assert_eq!(
1320            &result[last_data_byte - data.len()..last_data_byte],
1321            data.as_slice()
1322        );
1323
1324        Ok(())
1325    }
1326
1327    #[test]
1328    fn test_avro_3405_writer_add_metadata_success() -> TestResult {
1329        let schema = Schema::parse_str(SCHEMA)?;
1330        let mut writer = Writer::new(&schema, Vec::new())?;
1331
1332        writer.add_user_metadata("stringKey".to_string(), String::from("stringValue"))?;
1333        writer.add_user_metadata("strKey".to_string(), "strValue")?;
1334        writer.add_user_metadata("bytesKey".to_string(), b"bytesValue")?;
1335        writer.add_user_metadata("vecKey".to_string(), vec![1, 2, 3])?;
1336
1337        let mut record = Record::new(&schema).unwrap();
1338        record.put("a", 27i64);
1339        record.put("b", "foo");
1340
1341        writer.append(record.clone())?;
1342        writer.append(record.clone())?;
1343        writer.flush()?;
1344        let result = writer.into_inner()?;
1345
1346        assert_eq!(result.len(), 244);
1347
1348        Ok(())
1349    }
1350
1351    #[test]
1352    fn test_avro_3881_metadata_empty_body() -> TestResult {
1353        let schema = Schema::parse_str(SCHEMA)?;
1354        let mut writer = Writer::new(&schema, Vec::new())?;
1355        writer.add_user_metadata("a".to_string(), "b")?;
1356        let result = writer.into_inner()?;
1357
1358        let reader = Reader::with_schema(&schema, &result[..])?;
1359        let mut expected = HashMap::new();
1360        expected.insert("a".to_string(), vec![b'b']);
1361        assert_eq!(reader.user_metadata(), &expected);
1362        assert_eq!(reader.into_iter().count(), 0);
1363
1364        Ok(())
1365    }
1366
1367    #[test]
1368    fn test_avro_3405_writer_add_metadata_failure() -> TestResult {
1369        let schema = Schema::parse_str(SCHEMA)?;
1370        let mut writer = Writer::new(&schema, Vec::new())?;
1371
1372        let mut record = Record::new(&schema).unwrap();
1373        record.put("a", 27i64);
1374        record.put("b", "foo");
1375        writer.append(record.clone())?;
1376
1377        match writer
1378            .add_user_metadata("stringKey".to_string(), String::from("value2"))
1379            .map_err(Error::into_details)
1380        {
1381            Err(e @ Details::FileHeaderAlreadyWritten) => {
1382                assert_eq!(e.to_string(), "The file metadata is already flushed.")
1383            }
1384            Err(e) => panic!("Unexpected error occurred while writing user metadata: {e:?}"),
1385            Ok(_) => panic!("Expected an error that metadata cannot be added after adding data"),
1386        }
1387
1388        Ok(())
1389    }
1390
1391    #[test]
1392    fn test_avro_3405_writer_add_metadata_reserved_prefix_failure() -> TestResult {
1393        let schema = Schema::parse_str(SCHEMA)?;
1394        let mut writer = Writer::new(&schema, Vec::new())?;
1395
1396        let key = "avro.stringKey".to_string();
1397        match writer
1398            .add_user_metadata(key.clone(), "value")
1399            .map_err(Error::into_details)
1400        {
1401            Err(ref e @ Details::InvalidMetadataKey(_)) => {
1402                assert_eq!(
1403                    e.to_string(),
1404                    format!(
1405                        "Metadata keys starting with 'avro.' are reserved for internal usage: {key}."
1406                    )
1407                )
1408            }
1409            Err(e) => panic!(
1410                "Unexpected error occurred while writing user metadata with reserved prefix ('avro.'): {e:?}"
1411            ),
1412            Ok(_) => {
1413                panic!("Expected an error that the metadata key cannot be prefixed with 'avro.'")
1414            }
1415        }
1416
1417        Ok(())
1418    }
1419
1420    #[test]
1421    fn test_avro_3405_writer_add_metadata_with_builder_api_success() -> TestResult {
1422        let schema = Schema::parse_str(SCHEMA)?;
1423
1424        let mut user_meta_data: HashMap<String, Value> = HashMap::new();
1425        user_meta_data.insert(
1426            "stringKey".to_string(),
1427            Value::String("stringValue".to_string()),
1428        );
1429        user_meta_data.insert("bytesKey".to_string(), Value::Bytes(b"bytesValue".to_vec()));
1430        user_meta_data.insert("vecKey".to_string(), Value::Bytes(vec![1, 2, 3]));
1431
1432        let writer: Writer<'_, Vec<u8>> = Writer::builder()
1433            .writer(Vec::new())
1434            .schema(&schema)
1435            .user_metadata(user_meta_data.clone())
1436            .build()?;
1437
1438        assert_eq!(writer.user_metadata, user_meta_data);
1439
1440        Ok(())
1441    }
1442
1443    #[derive(Serialize, Clone)]
1444    struct TestSingleObjectWriter {
1445        a: i64,
1446        b: f64,
1447        c: Vec<String>,
1448    }
1449
1450    impl AvroSchema for TestSingleObjectWriter {
1451        fn get_schema() -> Schema {
1452            let schema = r#"
1453            {
1454                "type":"record",
1455                "name":"TestSingleObjectWrtierSerialize",
1456                "fields":[
1457                    {
1458                        "name":"a",
1459                        "type":"long"
1460                    },
1461                    {
1462                        "name":"b",
1463                        "type":"double"
1464                    },
1465                    {
1466                        "name":"c",
1467                        "type":{
1468                            "type":"array",
1469                            "items":"string"
1470                        }
1471                    }
1472                ]
1473            }
1474            "#;
1475            Schema::parse_str(schema).unwrap()
1476        }
1477    }
1478
1479    impl From<TestSingleObjectWriter> for Value {
1480        fn from(obj: TestSingleObjectWriter) -> Value {
1481            Value::Record(vec![
1482                ("a".into(), obj.a.into()),
1483                ("b".into(), obj.b.into()),
1484                (
1485                    "c".into(),
1486                    Value::Array(obj.c.into_iter().map(|s| s.into()).collect()),
1487                ),
1488            ])
1489        }
1490    }
1491
1492    #[test]
1493    fn test_single_object_writer() -> TestResult {
1494        let mut buf: Vec<u8> = Vec::new();
1495        let obj = TestSingleObjectWriter {
1496            a: 300,
1497            b: 34.555,
1498            c: vec!["cat".into(), "dog".into()],
1499        };
1500        let mut writer = GenericSingleObjectWriter::new_with_capacity(
1501            &TestSingleObjectWriter::get_schema(),
1502            1024,
1503        )
1504        .expect("Should resolve schema");
1505        let value = obj.into();
1506        let written_bytes = writer
1507            .write_value_ref(&value, &mut buf)
1508            .expect("Error serializing properly");
1509
1510        assert!(buf.len() > 10, "no bytes written");
1511        assert_eq!(buf.len(), written_bytes);
1512        assert_eq!(buf[0], 0xC3);
1513        assert_eq!(buf[1], 0x01);
1514        assert_eq!(
1515            &buf[2..10],
1516            &TestSingleObjectWriter::get_schema()
1517                .fingerprint::<Rabin>()
1518                .bytes[..]
1519        );
1520        let mut msg_binary = Vec::new();
1521        encode(
1522            &value,
1523            &TestSingleObjectWriter::get_schema(),
1524            &mut msg_binary,
1525        )
1526        .expect("encode should have failed by here as a dependency of any writing");
1527        assert_eq!(&buf[10..], &msg_binary[..]);
1528
1529        Ok(())
1530    }
1531
1532    #[test]
1533    fn test_single_object_writer_with_header_builder() -> TestResult {
1534        let mut buf: Vec<u8> = Vec::new();
1535        let obj = TestSingleObjectWriter {
1536            a: 300,
1537            b: 34.555,
1538            c: vec!["cat".into(), "dog".into()],
1539        };
1540        let schema_uuid = Uuid::parse_str("b2f1cf00-0434-013e-439a-125eb8485a5f")?;
1541        let header_builder = GlueSchemaUuidHeader::from_uuid(schema_uuid);
1542        let mut writer = GenericSingleObjectWriter::new_with_capacity_and_header_builder(
1543            &TestSingleObjectWriter::get_schema(),
1544            1024,
1545            header_builder,
1546        )
1547        .expect("Should resolve schema");
1548        let value = obj.into();
1549        writer
1550            .write_value_ref(&value, &mut buf)
1551            .expect("Error serializing properly");
1552
1553        assert_eq!(buf[0], 0x03);
1554        assert_eq!(buf[1], 0x00);
1555        assert_eq!(buf[2..18], schema_uuid.into_bytes()[..]);
1556        Ok(())
1557    }
1558
1559    #[test]
1560    fn test_writer_parity() -> TestResult {
1561        let obj1 = TestSingleObjectWriter {
1562            a: 300,
1563            b: 34.555,
1564            c: vec!["cat".into(), "dog".into()],
1565        };
1566
1567        let mut buf1: Vec<u8> = Vec::new();
1568        let mut buf2: Vec<u8> = Vec::new();
1569        let mut buf3: Vec<u8> = Vec::new();
1570
1571        let mut generic_writer = GenericSingleObjectWriter::new_with_capacity(
1572            &TestSingleObjectWriter::get_schema(),
1573            1024,
1574        )
1575        .expect("Should resolve schema");
1576        let mut specific_writer =
1577            SpecificSingleObjectWriter::<TestSingleObjectWriter>::with_capacity(1024)
1578                .expect("Resolved should pass");
1579        specific_writer
1580            .write(obj1.clone(), &mut buf1)
1581            .expect("Serialization expected");
1582        specific_writer
1583            .write_value(obj1.clone(), &mut buf2)
1584            .expect("Serialization expected");
1585        generic_writer
1586            .write_value(obj1.into(), &mut buf3)
1587            .expect("Serialization expected");
1588        assert_eq!(buf1, buf2);
1589        assert_eq!(buf1, buf3);
1590
1591        Ok(())
1592    }
1593
1594    #[test]
1595    fn avro_3894_take_aliases_into_account_when_serializing() -> TestResult {
1596        const SCHEMA: &str = r#"
1597  {
1598      "type": "record",
1599      "name": "Conference",
1600      "fields": [
1601          {"type": "string", "name": "name"},
1602          {"type": ["null", "long"], "name": "date", "aliases" : [ "time2", "time" ]}
1603      ]
1604  }"#;
1605
1606        #[derive(Debug, PartialEq, Eq, Clone, Serialize)]
1607        pub struct Conference {
1608            pub name: String,
1609            pub time: Option<i64>,
1610        }
1611
1612        let conf = Conference {
1613            name: "RustConf".to_string(),
1614            time: Some(1234567890),
1615        };
1616
1617        let schema = Schema::parse_str(SCHEMA)?;
1618        let mut writer = Writer::new(&schema, Vec::new())?;
1619
1620        let bytes = writer.append_ser(conf)?;
1621
1622        assert_eq!(182, bytes);
1623
1624        Ok(())
1625    }
1626
1627    #[test]
1628    fn avro_4014_validation_returns_a_detailed_error() -> TestResult {
1629        const SCHEMA: &str = r#"
1630  {
1631      "type": "record",
1632      "name": "Conference",
1633      "fields": [
1634          {"type": "string", "name": "name"},
1635          {"type": ["null", "long"], "name": "date", "aliases" : [ "time2", "time" ]}
1636      ]
1637  }"#;
1638
1639        #[derive(Debug, PartialEq, Clone, Serialize)]
1640        pub struct Conference {
1641            pub name: String,
1642            pub time: Option<f64>, // wrong type: f64 instead of i64
1643        }
1644
1645        let conf = Conference {
1646            name: "RustConf".to_string(),
1647            time: Some(12345678.90),
1648        };
1649
1650        let schema = Schema::parse_str(SCHEMA)?;
1651        let mut writer = Writer::new(&schema, Vec::new())?;
1652
1653        match writer.append_ser(conf) {
1654            Ok(bytes) => panic!("Expected an error, but got {bytes} bytes written"),
1655            Err(e) => {
1656                assert_eq!(
1657                    e.to_string(),
1658                    r#"Failed to serialize field 'time' for record Record(RecordSchema { name: Name { name: "Conference", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "name", doc: None, aliases: None, default: None, schema: String, order: Ascending, position: 0, custom_attributes: {} }, RecordField { name: "date", doc: None, aliases: Some(["time2", "time"]), default: None, schema: Union(UnionSchema { schemas: [Null, Long], variant_index: {Null: 0, Long: 1} }), order: Ascending, position: 1, custom_attributes: {} }], lookup: {"date": 1, "name": 0, "time": 1, "time2": 1}, attributes: {} }): Failed to serialize value of type f64 using schema Union(UnionSchema { schemas: [Null, Long], variant_index: {Null: 0, Long: 1} }): 12345678.9. Cause: Cannot find a Double schema in [Null, Long]"#
1659                );
1660            }
1661        }
1662        Ok(())
1663    }
1664
1665    #[test]
1666    fn avro_4063_flush_applies_to_inner_writer() -> TestResult {
1667        const SCHEMA: &str = r#"
1668        {
1669            "type": "record",
1670            "name": "ExampleSchema",
1671            "fields": [
1672                {"name": "exampleField", "type": "string"}
1673            ]
1674        }
1675        "#;
1676
1677        #[derive(Clone, Default)]
1678        struct TestBuffer(Rc<RefCell<Vec<u8>>>);
1679
1680        impl TestBuffer {
1681            fn len(&self) -> usize {
1682                self.0.borrow().len()
1683            }
1684        }
1685
1686        impl Write for TestBuffer {
1687            fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
1688                self.0.borrow_mut().write(buf)
1689            }
1690
1691            fn flush(&mut self) -> std::io::Result<()> {
1692                Ok(())
1693            }
1694        }
1695
1696        let shared_buffer = TestBuffer::default();
1697
1698        let buffered_writer = std::io::BufWriter::new(shared_buffer.clone());
1699
1700        let schema = Schema::parse_str(SCHEMA)?;
1701
1702        let mut writer = Writer::new(&schema, buffered_writer)?;
1703
1704        let mut record = Record::new(writer.schema()).unwrap();
1705        record.put("exampleField", "value");
1706
1707        writer.append(record)?;
1708        writer.flush()?;
1709
1710        assert_eq!(
1711            shared_buffer.len(),
1712            151,
1713            "the test buffer was not fully written to after Writer::flush was called"
1714        );
1715
1716        Ok(())
1717    }
1718}