apache_avro/
writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Logic handling writing in Avro format at user level.
19use crate::{
20    AvroResult, Codec, Error,
21    encode::{encode, encode_internal, encode_to_vec},
22    error::Details,
23    headers::{HeaderBuilder, RabinFingerprintHeader},
24    schema::{AvroSchema, Name, ResolvedOwnedSchema, ResolvedSchema, Schema},
25    ser_schema::SchemaAwareWriteSerializer,
26    types::Value,
27};
28use serde::Serialize;
29use std::{
30    collections::HashMap, io::Write, marker::PhantomData, mem::ManuallyDrop, ops::RangeInclusive,
31};
32
33const DEFAULT_BLOCK_SIZE: usize = 16000;
34const AVRO_OBJECT_HEADER: &[u8] = b"Obj\x01";
35
36/// Main interface for writing Avro formatted values.
37///
38/// It is critical to call flush before `Writer<W>` is dropped. Though dropping will attempt to flush
39/// the contents of the buffer, any errors that happen in the process of dropping will be ignored.
40/// Calling flush ensures that the buffer is empty and thus dropping will not even attempt file operations.
41pub struct Writer<'a, W: Write> {
42    schema: &'a Schema,
43    writer: W,
44    resolved_schema: ResolvedSchema<'a>,
45    codec: Codec,
46    block_size: usize,
47    buffer: Vec<u8>,
48    num_values: usize,
49    marker: [u8; 16],
50    has_header: bool,
51    user_metadata: HashMap<String, Value>,
52}
53
54#[bon::bon]
55impl<'a, W: Write> Writer<'a, W> {
56    #[builder]
57    pub fn builder(
58        schema: &'a Schema,
59        schemata: Option<Vec<&'a Schema>>,
60        writer: W,
61        #[builder(default = Codec::Null)] codec: Codec,
62        #[builder(default = DEFAULT_BLOCK_SIZE)] block_size: usize,
63        #[builder(default = generate_sync_marker())] marker: [u8; 16],
64        /// Has the header already been written.
65        ///
66        /// To disable writing the header, this can be set to `true`.
67        #[builder(default = false)]
68        has_header: bool,
69        #[builder(default)] user_metadata: HashMap<String, Value>,
70    ) -> AvroResult<Self> {
71        let resolved_schema = if let Some(schemata) = schemata {
72            ResolvedSchema::try_from(schemata)?
73        } else {
74            ResolvedSchema::try_from(schema)?
75        };
76        Ok(Self {
77            schema,
78            writer,
79            resolved_schema,
80            codec,
81            block_size,
82            buffer: Vec::with_capacity(block_size),
83            num_values: 0,
84            marker,
85            has_header,
86            user_metadata,
87        })
88    }
89}
90
91impl<'a, W: Write> Writer<'a, W> {
92    /// Creates a `Writer` given a `Schema` and something implementing the `io::Write` trait to write
93    /// to.
94    /// No compression `Codec` will be used.
95    pub fn new(schema: &'a Schema, writer: W) -> AvroResult<Self> {
96        Writer::with_codec(schema, writer, Codec::Null)
97    }
98
99    /// Creates a `Writer` with a specific `Codec` given a `Schema` and something implementing the
100    /// `io::Write` trait to write to.
101    pub fn with_codec(schema: &'a Schema, writer: W, codec: Codec) -> AvroResult<Self> {
102        Self::builder()
103            .schema(schema)
104            .writer(writer)
105            .codec(codec)
106            .build()
107    }
108
109    /// Creates a `Writer` with a specific `Codec` given a `Schema` and something implementing the
110    /// `io::Write` trait to write to.
111    /// If the `schema` is incomplete, i.e. contains `Schema::Ref`s then all dependencies must
112    /// be provided in `schemata`.
113    pub fn with_schemata(
114        schema: &'a Schema,
115        schemata: Vec<&'a Schema>,
116        writer: W,
117        codec: Codec,
118    ) -> AvroResult<Self> {
119        Self::builder()
120            .schema(schema)
121            .schemata(schemata)
122            .writer(writer)
123            .codec(codec)
124            .build()
125    }
126
127    /// Creates a `Writer` that will append values to already populated
128    /// `std::io::Write` using the provided `marker`
129    /// No compression `Codec` will be used.
130    pub fn append_to(schema: &'a Schema, writer: W, marker: [u8; 16]) -> AvroResult<Self> {
131        Writer::append_to_with_codec(schema, writer, Codec::Null, marker)
132    }
133
134    /// Creates a `Writer` that will append values to already populated
135    /// `std::io::Write` using the provided `marker`
136    pub fn append_to_with_codec(
137        schema: &'a Schema,
138        writer: W,
139        codec: Codec,
140        marker: [u8; 16],
141    ) -> AvroResult<Self> {
142        Self::builder()
143            .schema(schema)
144            .writer(writer)
145            .codec(codec)
146            .marker(marker)
147            .has_header(true)
148            .build()
149    }
150
151    /// Creates a `Writer` that will append values to already populated
152    /// `std::io::Write` using the provided `marker`
153    pub fn append_to_with_codec_schemata(
154        schema: &'a Schema,
155        schemata: Vec<&'a Schema>,
156        writer: W,
157        codec: Codec,
158        marker: [u8; 16],
159    ) -> AvroResult<Self> {
160        Self::builder()
161            .schema(schema)
162            .schemata(schemata)
163            .writer(writer)
164            .codec(codec)
165            .marker(marker)
166            .has_header(true)
167            .build()
168    }
169
170    /// Get a reference to the `Schema` associated to a `Writer`.
171    pub fn schema(&self) -> &'a Schema {
172        self.schema
173    }
174
175    /// Append a compatible value (implementing the `ToAvro` trait) to a `Writer`, also performing
176    /// schema validation.
177    ///
178    /// Returns the number of bytes written (it might be 0, see below).
179    ///
180    /// **NOTE**: This function is not guaranteed to perform any actual write, since it relies on
181    /// internal buffering for performance reasons. If you want to be sure the value has been
182    /// written, then call [`flush`](Writer::flush).
183    pub fn append<T: Into<Value>>(&mut self, value: T) -> AvroResult<usize> {
184        let n = self.maybe_write_header()?;
185
186        let avro = value.into();
187        self.append_value_ref(&avro).map(|m| m + n)
188    }
189
190    /// Append a compatible value to a `Writer`, also performing schema validation.
191    ///
192    /// Returns the number of bytes written (it might be 0, see below).
193    ///
194    /// **NOTE**: This function is not guaranteed to perform any actual write, since it relies on
195    /// internal buffering for performance reasons. If you want to be sure the value has been
196    /// written, then call [`flush`](Writer::flush).
197    pub fn append_value_ref(&mut self, value: &Value) -> AvroResult<usize> {
198        let n = self.maybe_write_header()?;
199
200        write_value_ref_resolved(self.schema, &self.resolved_schema, value, &mut self.buffer)?;
201        self.num_values += 1;
202
203        if self.buffer.len() >= self.block_size {
204            return self.flush().map(|b| b + n);
205        }
206
207        Ok(n)
208    }
209
210    /// Append anything implementing the `Serialize` trait to a `Writer` for
211    /// [`serde`](https://docs.serde.rs/serde/index.html) compatibility, also performing schema
212    /// validation.
213    ///
214    /// Returns the number of bytes written.
215    ///
216    /// **NOTE**: This function is not guaranteed to perform any actual write, since it relies on
217    /// internal buffering for performance reasons. If you want to be sure the value has been
218    /// written, then call [`flush`](Writer::flush).
219    pub fn append_ser<S: Serialize>(&mut self, value: S) -> AvroResult<usize> {
220        let n = self.maybe_write_header()?;
221
222        let mut serializer = SchemaAwareWriteSerializer::new(
223            &mut self.buffer,
224            self.schema,
225            self.resolved_schema.get_names(),
226            None,
227        );
228        value.serialize(&mut serializer)?;
229        self.num_values += 1;
230
231        if self.buffer.len() >= self.block_size {
232            return self.flush().map(|b| b + n);
233        }
234
235        Ok(n)
236    }
237
238    /// Extend a `Writer` with an `Iterator` of compatible values (implementing the `ToAvro`
239    /// trait), also performing schema validation.
240    ///
241    /// Returns the number of bytes written.
242    ///
243    /// **NOTE**: This function forces the written data to be flushed (an implicit
244    /// call to [`flush`](Writer::flush) is performed).
245    pub fn extend<I, T: Into<Value>>(&mut self, values: I) -> AvroResult<usize>
246    where
247        I: IntoIterator<Item = T>,
248    {
249        /*
250        https://github.com/rust-lang/rfcs/issues/811 :(
251        let mut stream = values
252            .filter_map(|value| value.serialize(&mut self.serializer).ok())
253            .map(|value| value.encode(self.schema))
254            .collect::<Option<Vec<_>>>()
255            .ok_or_else(|| err_msg("value does not match given schema"))?
256            .into_iter()
257            .fold(Vec::new(), |mut acc, stream| {
258                num_values += 1;
259                acc.extend(stream); acc
260            });
261        */
262
263        let mut num_bytes = 0;
264        for value in values {
265            num_bytes += self.append(value)?;
266        }
267        num_bytes += self.flush()?;
268
269        Ok(num_bytes)
270    }
271
272    /// Extend a `Writer` with an `Iterator` of anything implementing the `Serialize` trait for
273    /// [`serde`](https://docs.serde.rs/serde/index.html) compatibility, also performing schema
274    /// validation.
275    ///
276    /// Returns the number of bytes written.
277    ///
278    /// **NOTE**: This function forces the written data to be flushed (an implicit
279    /// call to [`flush`](Writer::flush) is performed).
280    pub fn extend_ser<I, T: Serialize>(&mut self, values: I) -> AvroResult<usize>
281    where
282        I: IntoIterator<Item = T>,
283    {
284        /*
285        https://github.com/rust-lang/rfcs/issues/811 :(
286        let mut stream = values
287            .filter_map(|value| value.serialize(&mut self.serializer).ok())
288            .map(|value| value.encode(self.schema))
289            .collect::<Option<Vec<_>>>()
290            .ok_or_else(|| err_msg("value does not match given schema"))?
291            .into_iter()
292            .fold(Vec::new(), |mut acc, stream| {
293                num_values += 1;
294                acc.extend(stream); acc
295            });
296        */
297
298        let mut num_bytes = 0;
299        for value in values {
300            num_bytes += self.append_ser(value)?;
301        }
302        num_bytes += self.flush()?;
303
304        Ok(num_bytes)
305    }
306
307    /// Extend a `Writer` by appending each `Value` from a slice, while also performing schema
308    /// validation on each value appended.
309    ///
310    /// Returns the number of bytes written.
311    ///
312    /// **NOTE**: This function forces the written data to be flushed (an implicit
313    /// call to [`flush`](Writer::flush) is performed).
314    pub fn extend_from_slice(&mut self, values: &[Value]) -> AvroResult<usize> {
315        let mut num_bytes = 0;
316        for value in values {
317            num_bytes += self.append_value_ref(value)?;
318        }
319        num_bytes += self.flush()?;
320
321        Ok(num_bytes)
322    }
323
324    /// Flush the content to the inner `Writer`.
325    ///
326    /// Call this function to make sure all the content has been written before releasing the `Writer`.
327    /// This will also write the header if it wasn't written yet and hasn't been disabled using
328    /// [`WriterBuilder::has_header`].
329    ///
330    /// Returns the number of bytes written.
331    pub fn flush(&mut self) -> AvroResult<usize> {
332        let mut num_bytes = self.maybe_write_header()?;
333        if self.num_values == 0 {
334            return Ok(num_bytes);
335        }
336
337        self.codec.compress(&mut self.buffer)?;
338
339        let num_values = self.num_values;
340        let stream_len = self.buffer.len();
341
342        num_bytes += self.append_raw(&num_values.into(), &Schema::Long)?
343            + self.append_raw(&stream_len.into(), &Schema::Long)?
344            + self
345                .writer
346                .write(self.buffer.as_ref())
347                .map_err(Details::WriteBytes)?
348            + self.append_marker()?;
349
350        self.buffer.clear();
351        self.num_values = 0;
352
353        self.writer.flush().map_err(Details::FlushWriter)?;
354
355        Ok(num_bytes)
356    }
357
358    /// Return what the `Writer` is writing to, consuming the `Writer` itself.
359    ///
360    /// **NOTE**: This function forces the written data to be flushed (an implicit
361    /// call to [`flush`](Writer::flush) is performed).
362    pub fn into_inner(mut self) -> AvroResult<W> {
363        self.maybe_write_header()?;
364        self.flush()?;
365
366        let mut this = ManuallyDrop::new(self);
367
368        // Extract every member that is not Copy and therefore should be dropped
369        let _buffer = std::mem::take(&mut this.buffer);
370        let _user_metadata = std::mem::take(&mut this.user_metadata);
371        // SAFETY: resolved schema is not accessed after this and won't be dropped because of ManuallyDrop
372        unsafe { std::ptr::drop_in_place(&mut this.resolved_schema) };
373
374        // SAFETY: double-drops are prevented by putting `this` in a ManuallyDrop that is never dropped
375        let writer = unsafe { std::ptr::read(&this.writer) };
376
377        Ok(writer)
378    }
379
380    /// Gets a reference to the underlying writer.
381    ///
382    /// **NOTE**: There is likely data still in the buffer. To have all the data
383    /// in the writer call [`flush`](Writer::flush) first.
384    pub fn get_ref(&self) -> &W {
385        &self.writer
386    }
387
388    /// Gets a mutable reference to the underlying writer.
389    ///
390    /// It is inadvisable to directly write to the underlying writer.
391    ///
392    /// **NOTE**: There is likely data still in the buffer. To have all the data
393    /// in the writer call [`flush`](Writer::flush) first.
394    pub fn get_mut(&mut self) -> &mut W {
395        &mut self.writer
396    }
397
398    /// Generate and append synchronization marker to the payload.
399    fn append_marker(&mut self) -> AvroResult<usize> {
400        // using .writer.write directly to avoid mutable borrow of self
401        // with ref borrowing of self.marker
402        self.writer
403            .write(&self.marker)
404            .map_err(|e| Details::WriteMarker(e).into())
405    }
406
407    /// Append a raw Avro Value to the payload avoiding to encode it again.
408    fn append_raw(&mut self, value: &Value, schema: &Schema) -> AvroResult<usize> {
409        self.append_bytes(encode_to_vec(value, schema)?.as_ref())
410    }
411
412    /// Append pure bytes to the payload.
413    fn append_bytes(&mut self, bytes: &[u8]) -> AvroResult<usize> {
414        self.writer
415            .write(bytes)
416            .map_err(|e| Details::WriteBytes(e).into())
417    }
418
419    /// Adds custom metadata to the file.
420    /// This method could be used only before adding the first record to the writer.
421    pub fn add_user_metadata<T: AsRef<[u8]>>(&mut self, key: String, value: T) -> AvroResult<()> {
422        if !self.has_header {
423            if key.starts_with("avro.") {
424                return Err(Details::InvalidMetadataKey(key).into());
425            }
426            self.user_metadata
427                .insert(key, Value::Bytes(value.as_ref().to_vec()));
428            Ok(())
429        } else {
430            Err(Details::FileHeaderAlreadyWritten.into())
431        }
432    }
433
434    /// Create an Avro header based on schema, codec and sync marker.
435    fn header(&self) -> Result<Vec<u8>, Error> {
436        let schema_bytes = serde_json::to_string(self.schema)
437            .map_err(Details::ConvertJsonToString)?
438            .into_bytes();
439
440        let mut metadata = HashMap::with_capacity(2);
441        metadata.insert("avro.schema", Value::Bytes(schema_bytes));
442        metadata.insert("avro.codec", self.codec.into());
443        match self.codec {
444            #[cfg(feature = "bzip")]
445            Codec::Bzip2(settings) => {
446                metadata.insert(
447                    "avro.codec.compression_level",
448                    Value::Bytes(vec![settings.compression_level]),
449                );
450            }
451            #[cfg(feature = "xz")]
452            Codec::Xz(settings) => {
453                metadata.insert(
454                    "avro.codec.compression_level",
455                    Value::Bytes(vec![settings.compression_level]),
456                );
457            }
458            #[cfg(feature = "zstandard")]
459            Codec::Zstandard(settings) => {
460                metadata.insert(
461                    "avro.codec.compression_level",
462                    Value::Bytes(vec![settings.compression_level]),
463                );
464            }
465            _ => {}
466        }
467
468        for (k, v) in &self.user_metadata {
469            metadata.insert(k.as_str(), v.clone());
470        }
471
472        let mut header = Vec::new();
473        header.extend_from_slice(AVRO_OBJECT_HEADER);
474        encode(&metadata.into(), &Schema::map(Schema::Bytes), &mut header)?;
475        header.extend_from_slice(&self.marker);
476
477        Ok(header)
478    }
479
480    fn maybe_write_header(&mut self) -> AvroResult<usize> {
481        if !self.has_header {
482            let header = self.header()?;
483            let n = self.append_bytes(header.as_ref())?;
484            self.has_header = true;
485            Ok(n)
486        } else {
487            Ok(0)
488        }
489    }
490}
491
492impl<W: Write> Drop for Writer<'_, W> {
493    /// Drop the writer, will try to flush ignoring any errors.
494    fn drop(&mut self) {
495        let _ = self.maybe_write_header();
496        let _ = self.flush();
497    }
498}
499
500/// Encode a compatible value (implementing the `ToAvro` trait) into Avro format, also performing
501/// schema validation.
502///
503/// This is an internal function which gets the bytes buffer where to write as parameter instead of
504/// creating a new one like `to_avro_datum`.
505fn write_avro_datum<T: Into<Value>, W: Write>(
506    schema: &Schema,
507    value: T,
508    writer: &mut W,
509) -> Result<(), Error> {
510    let avro = value.into();
511    if !avro.validate(schema) {
512        return Err(Details::Validation.into());
513    }
514    encode(&avro, schema, writer)?;
515    Ok(())
516}
517
518fn write_avro_datum_schemata<T: Into<Value>>(
519    schema: &Schema,
520    schemata: Vec<&Schema>,
521    value: T,
522    buffer: &mut Vec<u8>,
523) -> AvroResult<usize> {
524    let avro = value.into();
525    let rs = ResolvedSchema::try_from(schemata)?;
526    let names = rs.get_names();
527    let enclosing_namespace = schema.namespace();
528    if let Some(_err) = avro.validate_internal(schema, names, &enclosing_namespace) {
529        return Err(Details::Validation.into());
530    }
531    encode_internal(&avro, schema, names, &enclosing_namespace, buffer)
532}
533
534/// Writer that encodes messages according to the single object encoding v1 spec
535/// Uses an API similar to the current File Writer
536/// Writes all object bytes at once, and drains internal buffer
537pub struct GenericSingleObjectWriter {
538    buffer: Vec<u8>,
539    resolved: ResolvedOwnedSchema,
540}
541
542impl GenericSingleObjectWriter {
543    pub fn new_with_capacity(
544        schema: &Schema,
545        initial_buffer_cap: usize,
546    ) -> AvroResult<GenericSingleObjectWriter> {
547        let header_builder = RabinFingerprintHeader::from_schema(schema);
548        Self::new_with_capacity_and_header_builder(schema, initial_buffer_cap, header_builder)
549    }
550
551    pub fn new_with_capacity_and_header_builder<HB: HeaderBuilder>(
552        schema: &Schema,
553        initial_buffer_cap: usize,
554        header_builder: HB,
555    ) -> AvroResult<GenericSingleObjectWriter> {
556        let mut buffer = Vec::with_capacity(initial_buffer_cap);
557        let header = header_builder.build_header();
558        buffer.extend_from_slice(&header);
559
560        Ok(GenericSingleObjectWriter {
561            buffer,
562            resolved: ResolvedOwnedSchema::try_from(schema.clone())?,
563        })
564    }
565
566    const HEADER_LENGTH_RANGE: RangeInclusive<usize> = 10_usize..=20_usize;
567
568    /// Write the referenced Value to the provided Write object. Returns a result with the number of bytes written including the header
569    pub fn write_value_ref<W: Write>(&mut self, v: &Value, writer: &mut W) -> AvroResult<usize> {
570        let original_length = self.buffer.len();
571        if !Self::HEADER_LENGTH_RANGE.contains(&original_length) {
572            Err(Details::IllegalSingleObjectWriterState.into())
573        } else {
574            write_value_ref_owned_resolved(&self.resolved, v, &mut self.buffer)?;
575            writer
576                .write_all(&self.buffer)
577                .map_err(Details::WriteBytes)?;
578            let len = self.buffer.len();
579            self.buffer.truncate(original_length);
580            Ok(len)
581        }
582    }
583
584    /// Write the Value to the provided Write object. Returns a result with the number of bytes written including the header
585    pub fn write_value<W: Write>(&mut self, v: Value, writer: &mut W) -> AvroResult<usize> {
586        self.write_value_ref(&v, writer)
587    }
588}
589
590/// Writer that encodes messages according to the single object encoding v1 spec
591pub struct SpecificSingleObjectWriter<T>
592where
593    T: AvroSchema,
594{
595    inner: GenericSingleObjectWriter,
596    schema: Schema,
597    header_written: bool,
598    _model: PhantomData<T>,
599}
600
601impl<T> SpecificSingleObjectWriter<T>
602where
603    T: AvroSchema,
604{
605    pub fn with_capacity(buffer_cap: usize) -> AvroResult<SpecificSingleObjectWriter<T>> {
606        let schema = T::get_schema();
607        Ok(SpecificSingleObjectWriter {
608            inner: GenericSingleObjectWriter::new_with_capacity(&schema, buffer_cap)?,
609            schema,
610            header_written: false,
611            _model: PhantomData,
612        })
613    }
614}
615
616impl<T> SpecificSingleObjectWriter<T>
617where
618    T: AvroSchema + Into<Value>,
619{
620    /// Write the `Into<Value>` to the provided Write object. Returns a result with the number
621    /// of bytes written including the header
622    pub fn write_value<W: Write>(&mut self, data: T, writer: &mut W) -> AvroResult<usize> {
623        let v: Value = data.into();
624        self.inner.write_value_ref(&v, writer)
625    }
626}
627
628impl<T> SpecificSingleObjectWriter<T>
629where
630    T: AvroSchema + Serialize,
631{
632    /// Write the referenced `Serialize` object to the provided `Write` object. Returns a result with
633    /// the number of bytes written including the header
634    pub fn write_ref<W: Write>(&mut self, data: &T, writer: &mut W) -> AvroResult<usize> {
635        let mut bytes_written: usize = 0;
636
637        if !self.header_written {
638            bytes_written += writer
639                .write(self.inner.buffer.as_slice())
640                .map_err(Details::WriteBytes)?;
641            self.header_written = true;
642        }
643
644        bytes_written += write_avro_datum_ref(&self.schema, data, writer)?;
645
646        Ok(bytes_written)
647    }
648
649    /// Write the Serialize object to the provided Write object. Returns a result with the number
650    /// of bytes written including the header
651    pub fn write<W: Write>(&mut self, data: T, writer: &mut W) -> AvroResult<usize> {
652        self.write_ref(&data, writer)
653    }
654}
655
656fn write_value_ref_resolved(
657    schema: &Schema,
658    resolved_schema: &ResolvedSchema,
659    value: &Value,
660    buffer: &mut Vec<u8>,
661) -> AvroResult<usize> {
662    match value.validate_internal(schema, resolved_schema.get_names(), &schema.namespace()) {
663        Some(reason) => Err(Details::ValidationWithReason {
664            value: value.clone(),
665            schema: schema.clone(),
666            reason,
667        }
668        .into()),
669        None => encode_internal(
670            value,
671            schema,
672            resolved_schema.get_names(),
673            &schema.namespace(),
674            buffer,
675        ),
676    }
677}
678
679fn write_value_ref_owned_resolved(
680    resolved_schema: &ResolvedOwnedSchema,
681    value: &Value,
682    buffer: &mut Vec<u8>,
683) -> AvroResult<()> {
684    let root_schema = resolved_schema.get_root_schema();
685    if let Some(reason) = value.validate_internal(
686        root_schema,
687        resolved_schema.get_names(),
688        &root_schema.namespace(),
689    ) {
690        return Err(Details::ValidationWithReason {
691            value: value.clone(),
692            schema: root_schema.clone(),
693            reason,
694        }
695        .into());
696    }
697    encode_internal(
698        value,
699        root_schema,
700        resolved_schema.get_names(),
701        &root_schema.namespace(),
702        buffer,
703    )?;
704    Ok(())
705}
706
707/// Encode a compatible value (implementing the `ToAvro` trait) into Avro format, also
708/// performing schema validation.
709///
710/// **NOTE**: This function has a quite small niche of usage and does NOT generate headers and sync
711/// markers; use [`Writer`] to be fully Avro-compatible if you don't know what
712/// you are doing, instead.
713pub fn to_avro_datum<T: Into<Value>>(schema: &Schema, value: T) -> AvroResult<Vec<u8>> {
714    let mut buffer = Vec::new();
715    write_avro_datum(schema, value, &mut buffer)?;
716    Ok(buffer)
717}
718
719/// Write the referenced [Serialize]able object to the provided [Write] object.
720/// Returns a result with the number of bytes written.
721///
722/// **NOTE**: This function has a quite small niche of usage and does **NOT** generate headers and sync
723/// markers; use [`append_ser`](Writer::append_ser) to be fully Avro-compatible
724/// if you don't know what you are doing, instead.
725pub fn write_avro_datum_ref<T: Serialize, W: Write>(
726    schema: &Schema,
727    data: &T,
728    writer: &mut W,
729) -> AvroResult<usize> {
730    let names: HashMap<Name, &Schema> = HashMap::new();
731    let mut serializer = SchemaAwareWriteSerializer::new(writer, schema, &names, None);
732    let bytes_written = data.serialize(&mut serializer)?;
733    Ok(bytes_written)
734}
735
736/// Encode a compatible value (implementing the `ToAvro` trait) into Avro format, also
737/// performing schema validation.
738/// If the provided `schema` is incomplete then its dependencies must be
739/// provided in `schemata`
740pub fn to_avro_datum_schemata<T: Into<Value>>(
741    schema: &Schema,
742    schemata: Vec<&Schema>,
743    value: T,
744) -> AvroResult<Vec<u8>> {
745    let mut buffer = Vec::new();
746    write_avro_datum_schemata(schema, schemata, value, &mut buffer)?;
747    Ok(buffer)
748}
749
750#[cfg(not(target_arch = "wasm32"))]
751fn generate_sync_marker() -> [u8; 16] {
752    let mut marker = [0_u8; 16];
753    std::iter::repeat_with(rand::random)
754        .take(16)
755        .enumerate()
756        .for_each(|(i, n)| marker[i] = n);
757    marker
758}
759
760#[cfg(target_arch = "wasm32")]
761fn generate_sync_marker() -> [u8; 16] {
762    let mut marker = [0_u8; 16];
763    std::iter::repeat_with(quad_rand::rand)
764        .take(4)
765        .flat_map(|i| i.to_be_bytes())
766        .enumerate()
767        .for_each(|(i, n)| marker[i] = n);
768    marker
769}
770
771#[cfg(test)]
772mod tests {
773    use std::{cell::RefCell, rc::Rc};
774
775    use super::*;
776    use crate::{
777        Reader,
778        decimal::Decimal,
779        duration::{Days, Duration, Millis, Months},
780        headers::GlueSchemaUuidHeader,
781        rabin::Rabin,
782        schema::{DecimalSchema, FixedSchema, Name},
783        types::Record,
784        util::zig_i64,
785    };
786    use pretty_assertions::assert_eq;
787    use serde::{Deserialize, Serialize};
788    use uuid::Uuid;
789
790    use crate::{codec::DeflateSettings, error::Details};
791    use apache_avro_test_helper::TestResult;
792
793    const AVRO_OBJECT_HEADER_LEN: usize = AVRO_OBJECT_HEADER.len();
794
795    const SCHEMA: &str = r#"
796    {
797      "type": "record",
798      "name": "test",
799      "fields": [
800        {
801          "name": "a",
802          "type": "long",
803          "default": 42
804        },
805        {
806          "name": "b",
807          "type": "string"
808        }
809      ]
810    }
811    "#;
812
813    const UNION_SCHEMA: &str = r#"["null", "long"]"#;
814
815    #[test]
816    fn test_to_avro_datum() -> TestResult {
817        let schema = Schema::parse_str(SCHEMA)?;
818        let mut record = Record::new(&schema).unwrap();
819        record.put("a", 27i64);
820        record.put("b", "foo");
821
822        let mut expected = Vec::new();
823        zig_i64(27, &mut expected)?;
824        zig_i64(3, &mut expected)?;
825        expected.extend([b'f', b'o', b'o']);
826
827        assert_eq!(to_avro_datum(&schema, record)?, expected);
828
829        Ok(())
830    }
831
832    #[test]
833    fn avro_rs_193_write_avro_datum_ref() -> TestResult {
834        #[derive(Serialize)]
835        struct TestStruct {
836            a: i64,
837            b: String,
838        }
839
840        let schema = Schema::parse_str(SCHEMA)?;
841        let mut writer: Vec<u8> = Vec::new();
842        let data = TestStruct {
843            a: 27,
844            b: "foo".to_string(),
845        };
846
847        let mut expected = Vec::new();
848        zig_i64(27, &mut expected)?;
849        zig_i64(3, &mut expected)?;
850        expected.extend([b'f', b'o', b'o']);
851
852        let bytes = write_avro_datum_ref(&schema, &data, &mut writer)?;
853
854        assert_eq!(bytes, expected.len());
855        assert_eq!(writer, expected);
856
857        Ok(())
858    }
859
860    #[test]
861    fn avro_rs_220_flush_write_header() -> TestResult {
862        let schema = Schema::parse_str(SCHEMA)?;
863
864        // By default flush should write the header even if nothing was added yet
865        let mut writer = Writer::new(&schema, Vec::new())?;
866        writer.flush()?;
867        let result = writer.into_inner()?;
868        assert_eq!(result.len(), 163);
869
870        // Unless the user indicates via the builder that the header has already been written
871        let mut writer = Writer::builder()
872            .has_header(true)
873            .schema(&schema)
874            .writer(Vec::new())
875            .build()?;
876        writer.flush()?;
877        let result = writer.into_inner()?;
878        assert_eq!(result.len(), 0);
879
880        Ok(())
881    }
882
883    #[test]
884    fn test_union_not_null() -> TestResult {
885        let schema = Schema::parse_str(UNION_SCHEMA)?;
886        let union = Value::Union(1, Box::new(Value::Long(3)));
887
888        let mut expected = Vec::new();
889        zig_i64(1, &mut expected)?;
890        zig_i64(3, &mut expected)?;
891
892        assert_eq!(to_avro_datum(&schema, union)?, expected);
893
894        Ok(())
895    }
896
897    #[test]
898    fn test_union_null() -> TestResult {
899        let schema = Schema::parse_str(UNION_SCHEMA)?;
900        let union = Value::Union(0, Box::new(Value::Null));
901
902        let mut expected = Vec::new();
903        zig_i64(0, &mut expected)?;
904
905        assert_eq!(to_avro_datum(&schema, union)?, expected);
906
907        Ok(())
908    }
909
910    fn logical_type_test<T: Into<Value> + Clone>(
911        schema_str: &'static str,
912
913        expected_schema: &Schema,
914        value: Value,
915
916        raw_schema: &Schema,
917        raw_value: T,
918    ) -> TestResult {
919        let schema = Schema::parse_str(schema_str)?;
920        assert_eq!(&schema, expected_schema);
921        // The serialized format should be the same as the schema.
922        let ser = to_avro_datum(&schema, value.clone())?;
923        let raw_ser = to_avro_datum(raw_schema, raw_value)?;
924        assert_eq!(ser, raw_ser);
925
926        // Should deserialize from the schema into the logical type.
927        let mut r = ser.as_slice();
928        let de = crate::from_avro_datum(&schema, &mut r, None)?;
929        assert_eq!(de, value);
930        Ok(())
931    }
932
933    #[test]
934    fn date() -> TestResult {
935        logical_type_test(
936            r#"{"type": "int", "logicalType": "date"}"#,
937            &Schema::Date,
938            Value::Date(1_i32),
939            &Schema::Int,
940            1_i32,
941        )
942    }
943
944    #[test]
945    fn time_millis() -> TestResult {
946        logical_type_test(
947            r#"{"type": "int", "logicalType": "time-millis"}"#,
948            &Schema::TimeMillis,
949            Value::TimeMillis(1_i32),
950            &Schema::Int,
951            1_i32,
952        )
953    }
954
955    #[test]
956    fn time_micros() -> TestResult {
957        logical_type_test(
958            r#"{"type": "long", "logicalType": "time-micros"}"#,
959            &Schema::TimeMicros,
960            Value::TimeMicros(1_i64),
961            &Schema::Long,
962            1_i64,
963        )
964    }
965
966    #[test]
967    fn timestamp_millis() -> TestResult {
968        logical_type_test(
969            r#"{"type": "long", "logicalType": "timestamp-millis"}"#,
970            &Schema::TimestampMillis,
971            Value::TimestampMillis(1_i64),
972            &Schema::Long,
973            1_i64,
974        )
975    }
976
977    #[test]
978    fn timestamp_micros() -> TestResult {
979        logical_type_test(
980            r#"{"type": "long", "logicalType": "timestamp-micros"}"#,
981            &Schema::TimestampMicros,
982            Value::TimestampMicros(1_i64),
983            &Schema::Long,
984            1_i64,
985        )
986    }
987
988    #[test]
989    fn decimal_fixed() -> TestResult {
990        let size = 30;
991        let inner = Schema::Fixed(FixedSchema {
992            name: Name::new("decimal")?,
993            aliases: None,
994            doc: None,
995            size,
996            default: None,
997            attributes: Default::default(),
998        });
999        let value = vec![0u8; size];
1000        logical_type_test(
1001            r#"{"type": {"type": "fixed", "size": 30, "name": "decimal"}, "logicalType": "decimal", "precision": 20, "scale": 5}"#,
1002            &Schema::Decimal(DecimalSchema {
1003                precision: 20,
1004                scale: 5,
1005                inner: Box::new(inner.clone()),
1006            }),
1007            Value::Decimal(Decimal::from(value.clone())),
1008            &inner,
1009            Value::Fixed(size, value),
1010        )
1011    }
1012
1013    #[test]
1014    fn decimal_bytes() -> TestResult {
1015        let inner = Schema::Bytes;
1016        let value = vec![0u8; 10];
1017        logical_type_test(
1018            r#"{"type": "bytes", "logicalType": "decimal", "precision": 4, "scale": 3}"#,
1019            &Schema::Decimal(DecimalSchema {
1020                precision: 4,
1021                scale: 3,
1022                inner: Box::new(inner.clone()),
1023            }),
1024            Value::Decimal(Decimal::from(value.clone())),
1025            &inner,
1026            value,
1027        )
1028    }
1029
1030    #[test]
1031    fn duration() -> TestResult {
1032        let inner = Schema::Fixed(FixedSchema {
1033            name: Name::new("duration")?,
1034            aliases: None,
1035            doc: None,
1036            size: 12,
1037            default: None,
1038            attributes: Default::default(),
1039        });
1040        let value = Value::Duration(Duration::new(
1041            Months::new(256),
1042            Days::new(512),
1043            Millis::new(1024),
1044        ));
1045        logical_type_test(
1046            r#"{"type": {"type": "fixed", "name": "duration", "size": 12}, "logicalType": "duration"}"#,
1047            &Schema::Duration,
1048            value,
1049            &inner,
1050            Value::Fixed(12, vec![0, 1, 0, 0, 0, 2, 0, 0, 0, 4, 0, 0]),
1051        )
1052    }
1053
1054    #[test]
1055    fn test_writer_append() -> TestResult {
1056        let schema = Schema::parse_str(SCHEMA)?;
1057        let mut writer = Writer::new(&schema, Vec::new())?;
1058
1059        let mut record = Record::new(&schema).unwrap();
1060        record.put("a", 27i64);
1061        record.put("b", "foo");
1062
1063        let n1 = writer.append(record.clone())?;
1064        let n2 = writer.append(record.clone())?;
1065        let n3 = writer.flush()?;
1066        let result = writer.into_inner()?;
1067
1068        assert_eq!(n1 + n2 + n3, result.len());
1069
1070        let mut data = Vec::new();
1071        zig_i64(27, &mut data)?;
1072        zig_i64(3, &mut data)?;
1073        data.extend(b"foo");
1074        data.extend(data.clone());
1075
1076        // starts with magic
1077        assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1078        // ends with data and sync marker
1079        let last_data_byte = result.len() - 16;
1080        assert_eq!(
1081            &result[last_data_byte - data.len()..last_data_byte],
1082            data.as_slice()
1083        );
1084
1085        Ok(())
1086    }
1087
1088    #[test]
1089    fn test_writer_extend() -> TestResult {
1090        let schema = Schema::parse_str(SCHEMA)?;
1091        let mut writer = Writer::new(&schema, Vec::new())?;
1092
1093        let mut record = Record::new(&schema).unwrap();
1094        record.put("a", 27i64);
1095        record.put("b", "foo");
1096        let record_copy = record.clone();
1097        let records = vec![record, record_copy];
1098
1099        let n1 = writer.extend(records)?;
1100        let n2 = writer.flush()?;
1101        let result = writer.into_inner()?;
1102
1103        assert_eq!(n1 + n2, result.len());
1104
1105        let mut data = Vec::new();
1106        zig_i64(27, &mut data)?;
1107        zig_i64(3, &mut data)?;
1108        data.extend(b"foo");
1109        data.extend(data.clone());
1110
1111        // starts with magic
1112        assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1113        // ends with data and sync marker
1114        let last_data_byte = result.len() - 16;
1115        assert_eq!(
1116            &result[last_data_byte - data.len()..last_data_byte],
1117            data.as_slice()
1118        );
1119
1120        Ok(())
1121    }
1122
1123    #[derive(Debug, Clone, Deserialize, Serialize)]
1124    struct TestSerdeSerialize {
1125        a: i64,
1126        b: String,
1127    }
1128
1129    #[test]
1130    fn test_writer_append_ser() -> TestResult {
1131        let schema = Schema::parse_str(SCHEMA)?;
1132        let mut writer = Writer::new(&schema, Vec::new())?;
1133
1134        let record = TestSerdeSerialize {
1135            a: 27,
1136            b: "foo".to_owned(),
1137        };
1138
1139        let n1 = writer.append_ser(record)?;
1140        let n2 = writer.flush()?;
1141        let result = writer.into_inner()?;
1142
1143        assert_eq!(n1 + n2, result.len());
1144
1145        let mut data = Vec::new();
1146        zig_i64(27, &mut data)?;
1147        zig_i64(3, &mut data)?;
1148        data.extend(b"foo");
1149
1150        // starts with magic
1151        assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1152        // ends with data and sync marker
1153        let last_data_byte = result.len() - 16;
1154        assert_eq!(
1155            &result[last_data_byte - data.len()..last_data_byte],
1156            data.as_slice()
1157        );
1158
1159        Ok(())
1160    }
1161
1162    #[test]
1163    fn test_writer_extend_ser() -> TestResult {
1164        let schema = Schema::parse_str(SCHEMA)?;
1165        let mut writer = Writer::new(&schema, Vec::new())?;
1166
1167        let record = TestSerdeSerialize {
1168            a: 27,
1169            b: "foo".to_owned(),
1170        };
1171        let record_copy = record.clone();
1172        let records = vec![record, record_copy];
1173
1174        let n1 = writer.extend_ser(records)?;
1175        let n2 = writer.flush()?;
1176        let result = writer.into_inner()?;
1177
1178        assert_eq!(n1 + n2, result.len());
1179
1180        let mut data = Vec::new();
1181        zig_i64(27, &mut data)?;
1182        zig_i64(3, &mut data)?;
1183        data.extend(b"foo");
1184        data.extend(data.clone());
1185
1186        // starts with magic
1187        assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1188        // ends with data and sync marker
1189        let last_data_byte = result.len() - 16;
1190        assert_eq!(
1191            &result[last_data_byte - data.len()..last_data_byte],
1192            data.as_slice()
1193        );
1194
1195        Ok(())
1196    }
1197
1198    fn make_writer_with_codec(schema: &Schema) -> AvroResult<Writer<'_, Vec<u8>>> {
1199        Writer::with_codec(
1200            schema,
1201            Vec::new(),
1202            Codec::Deflate(DeflateSettings::default()),
1203        )
1204    }
1205
1206    fn make_writer_with_builder(schema: &Schema) -> AvroResult<Writer<'_, Vec<u8>>> {
1207        Writer::builder()
1208            .writer(Vec::new())
1209            .schema(schema)
1210            .codec(Codec::Deflate(DeflateSettings::default()))
1211            .block_size(100)
1212            .build()
1213    }
1214
1215    fn check_writer(mut writer: Writer<'_, Vec<u8>>, schema: &Schema) -> TestResult {
1216        let mut record = Record::new(schema).unwrap();
1217        record.put("a", 27i64);
1218        record.put("b", "foo");
1219
1220        let n1 = writer.append(record.clone())?;
1221        let n2 = writer.append(record.clone())?;
1222        let n3 = writer.flush()?;
1223        let result = writer.into_inner()?;
1224
1225        assert_eq!(n1 + n2 + n3, result.len());
1226
1227        let mut data = Vec::new();
1228        zig_i64(27, &mut data)?;
1229        zig_i64(3, &mut data)?;
1230        data.extend(b"foo");
1231        data.extend(data.clone());
1232        Codec::Deflate(DeflateSettings::default()).compress(&mut data)?;
1233
1234        // starts with magic
1235        assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1236        // ends with data and sync marker
1237        let last_data_byte = result.len() - 16;
1238        assert_eq!(
1239            &result[last_data_byte - data.len()..last_data_byte],
1240            data.as_slice()
1241        );
1242
1243        Ok(())
1244    }
1245
1246    #[test]
1247    fn test_writer_with_codec() -> TestResult {
1248        let schema = Schema::parse_str(SCHEMA)?;
1249        let writer = make_writer_with_codec(&schema)?;
1250        check_writer(writer, &schema)
1251    }
1252
1253    #[test]
1254    fn test_writer_with_builder() -> TestResult {
1255        let schema = Schema::parse_str(SCHEMA)?;
1256        let writer = make_writer_with_builder(&schema)?;
1257        check_writer(writer, &schema)
1258    }
1259
1260    #[test]
1261    fn test_logical_writer() -> TestResult {
1262        const LOGICAL_TYPE_SCHEMA: &str = r#"
1263        {
1264          "type": "record",
1265          "name": "logical_type_test",
1266          "fields": [
1267            {
1268              "name": "a",
1269              "type": [
1270                "null",
1271                {
1272                  "type": "long",
1273                  "logicalType": "timestamp-micros"
1274                }
1275              ]
1276            }
1277          ]
1278        }
1279        "#;
1280        let codec = Codec::Deflate(DeflateSettings::default());
1281        let schema = Schema::parse_str(LOGICAL_TYPE_SCHEMA)?;
1282        let mut writer = Writer::builder()
1283            .schema(&schema)
1284            .codec(codec)
1285            .writer(Vec::new())
1286            .build()?;
1287
1288        let mut record1 = Record::new(&schema).unwrap();
1289        record1.put(
1290            "a",
1291            Value::Union(1, Box::new(Value::TimestampMicros(1234_i64))),
1292        );
1293
1294        let mut record2 = Record::new(&schema).unwrap();
1295        record2.put("a", Value::Union(0, Box::new(Value::Null)));
1296
1297        let n1 = writer.append(record1)?;
1298        let n2 = writer.append(record2)?;
1299        let n3 = writer.flush()?;
1300        let result = writer.into_inner()?;
1301
1302        assert_eq!(n1 + n2 + n3, result.len());
1303
1304        let mut data = Vec::new();
1305        // byte indicating not null
1306        zig_i64(1, &mut data)?;
1307        zig_i64(1234, &mut data)?;
1308
1309        // byte indicating null
1310        zig_i64(0, &mut data)?;
1311        codec.compress(&mut data)?;
1312
1313        // starts with magic
1314        assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1315        // ends with data and sync marker
1316        let last_data_byte = result.len() - 16;
1317        assert_eq!(
1318            &result[last_data_byte - data.len()..last_data_byte],
1319            data.as_slice()
1320        );
1321
1322        Ok(())
1323    }
1324
1325    #[test]
1326    fn test_avro_3405_writer_add_metadata_success() -> TestResult {
1327        let schema = Schema::parse_str(SCHEMA)?;
1328        let mut writer = Writer::new(&schema, Vec::new())?;
1329
1330        writer.add_user_metadata("stringKey".to_string(), String::from("stringValue"))?;
1331        writer.add_user_metadata("strKey".to_string(), "strValue")?;
1332        writer.add_user_metadata("bytesKey".to_string(), b"bytesValue")?;
1333        writer.add_user_metadata("vecKey".to_string(), vec![1, 2, 3])?;
1334
1335        let mut record = Record::new(&schema).unwrap();
1336        record.put("a", 27i64);
1337        record.put("b", "foo");
1338
1339        writer.append(record.clone())?;
1340        writer.append(record.clone())?;
1341        writer.flush()?;
1342        let result = writer.into_inner()?;
1343
1344        assert_eq!(result.len(), 260);
1345
1346        Ok(())
1347    }
1348
1349    #[test]
1350    fn test_avro_3881_metadata_empty_body() -> TestResult {
1351        let schema = Schema::parse_str(SCHEMA)?;
1352        let mut writer = Writer::new(&schema, Vec::new())?;
1353        writer.add_user_metadata("a".to_string(), "b")?;
1354        let result = writer.into_inner()?;
1355
1356        let reader = Reader::with_schema(&schema, &result[..])?;
1357        let mut expected = HashMap::new();
1358        expected.insert("a".to_string(), vec![b'b']);
1359        assert_eq!(reader.user_metadata(), &expected);
1360        assert_eq!(reader.into_iter().count(), 0);
1361
1362        Ok(())
1363    }
1364
1365    #[test]
1366    fn test_avro_3405_writer_add_metadata_failure() -> TestResult {
1367        let schema = Schema::parse_str(SCHEMA)?;
1368        let mut writer = Writer::new(&schema, Vec::new())?;
1369
1370        let mut record = Record::new(&schema).unwrap();
1371        record.put("a", 27i64);
1372        record.put("b", "foo");
1373        writer.append(record.clone())?;
1374
1375        match writer
1376            .add_user_metadata("stringKey".to_string(), String::from("value2"))
1377            .map_err(Error::into_details)
1378        {
1379            Err(e @ Details::FileHeaderAlreadyWritten) => {
1380                assert_eq!(e.to_string(), "The file metadata is already flushed.")
1381            }
1382            Err(e) => panic!("Unexpected error occurred while writing user metadata: {e:?}"),
1383            Ok(_) => panic!("Expected an error that metadata cannot be added after adding data"),
1384        }
1385
1386        Ok(())
1387    }
1388
1389    #[test]
1390    fn test_avro_3405_writer_add_metadata_reserved_prefix_failure() -> TestResult {
1391        let schema = Schema::parse_str(SCHEMA)?;
1392        let mut writer = Writer::new(&schema, Vec::new())?;
1393
1394        let key = "avro.stringKey".to_string();
1395        match writer
1396            .add_user_metadata(key.clone(), "value")
1397            .map_err(Error::into_details)
1398        {
1399            Err(ref e @ Details::InvalidMetadataKey(_)) => {
1400                assert_eq!(
1401                    e.to_string(),
1402                    format!(
1403                        "Metadata keys starting with 'avro.' are reserved for internal usage: {key}."
1404                    )
1405                )
1406            }
1407            Err(e) => panic!(
1408                "Unexpected error occurred while writing user metadata with reserved prefix ('avro.'): {e:?}"
1409            ),
1410            Ok(_) => {
1411                panic!("Expected an error that the metadata key cannot be prefixed with 'avro.'")
1412            }
1413        }
1414
1415        Ok(())
1416    }
1417
1418    #[test]
1419    fn test_avro_3405_writer_add_metadata_with_builder_api_success() -> TestResult {
1420        let schema = Schema::parse_str(SCHEMA)?;
1421
1422        let mut user_meta_data: HashMap<String, Value> = HashMap::new();
1423        user_meta_data.insert(
1424            "stringKey".to_string(),
1425            Value::String("stringValue".to_string()),
1426        );
1427        user_meta_data.insert("bytesKey".to_string(), Value::Bytes(b"bytesValue".to_vec()));
1428        user_meta_data.insert("vecKey".to_string(), Value::Bytes(vec![1, 2, 3]));
1429
1430        let writer: Writer<'_, Vec<u8>> = Writer::builder()
1431            .writer(Vec::new())
1432            .schema(&schema)
1433            .user_metadata(user_meta_data.clone())
1434            .build()?;
1435
1436        assert_eq!(writer.user_metadata, user_meta_data);
1437
1438        Ok(())
1439    }
1440
1441    #[derive(Serialize, Clone)]
1442    struct TestSingleObjectWriter {
1443        a: i64,
1444        b: f64,
1445        c: Vec<String>,
1446    }
1447
1448    impl AvroSchema for TestSingleObjectWriter {
1449        fn get_schema() -> Schema {
1450            let schema = r#"
1451            {
1452                "type":"record",
1453                "name":"TestSingleObjectWrtierSerialize",
1454                "fields":[
1455                    {
1456                        "name":"a",
1457                        "type":"long"
1458                    },
1459                    {
1460                        "name":"b",
1461                        "type":"double"
1462                    },
1463                    {
1464                        "name":"c",
1465                        "type":{
1466                            "type":"array",
1467                            "items":"string"
1468                        }
1469                    }
1470                ]
1471            }
1472            "#;
1473            Schema::parse_str(schema).unwrap()
1474        }
1475    }
1476
1477    impl From<TestSingleObjectWriter> for Value {
1478        fn from(obj: TestSingleObjectWriter) -> Value {
1479            Value::Record(vec![
1480                ("a".into(), obj.a.into()),
1481                ("b".into(), obj.b.into()),
1482                (
1483                    "c".into(),
1484                    Value::Array(obj.c.into_iter().map(|s| s.into()).collect()),
1485                ),
1486            ])
1487        }
1488    }
1489
1490    #[test]
1491    fn test_single_object_writer() -> TestResult {
1492        let mut buf: Vec<u8> = Vec::new();
1493        let obj = TestSingleObjectWriter {
1494            a: 300,
1495            b: 34.555,
1496            c: vec!["cat".into(), "dog".into()],
1497        };
1498        let mut writer = GenericSingleObjectWriter::new_with_capacity(
1499            &TestSingleObjectWriter::get_schema(),
1500            1024,
1501        )
1502        .expect("Should resolve schema");
1503        let value = obj.into();
1504        let written_bytes = writer
1505            .write_value_ref(&value, &mut buf)
1506            .expect("Error serializing properly");
1507
1508        assert!(buf.len() > 10, "no bytes written");
1509        assert_eq!(buf.len(), written_bytes);
1510        assert_eq!(buf[0], 0xC3);
1511        assert_eq!(buf[1], 0x01);
1512        assert_eq!(
1513            &buf[2..10],
1514            &TestSingleObjectWriter::get_schema()
1515                .fingerprint::<Rabin>()
1516                .bytes[..]
1517        );
1518        let mut msg_binary = Vec::new();
1519        encode(
1520            &value,
1521            &TestSingleObjectWriter::get_schema(),
1522            &mut msg_binary,
1523        )
1524        .expect("encode should have failed by here as a dependency of any writing");
1525        assert_eq!(&buf[10..], &msg_binary[..]);
1526
1527        Ok(())
1528    }
1529
1530    #[test]
1531    fn test_single_object_writer_with_header_builder() -> TestResult {
1532        let mut buf: Vec<u8> = Vec::new();
1533        let obj = TestSingleObjectWriter {
1534            a: 300,
1535            b: 34.555,
1536            c: vec!["cat".into(), "dog".into()],
1537        };
1538        let schema_uuid = Uuid::parse_str("b2f1cf00-0434-013e-439a-125eb8485a5f")?;
1539        let header_builder = GlueSchemaUuidHeader::from_uuid(schema_uuid);
1540        let mut writer = GenericSingleObjectWriter::new_with_capacity_and_header_builder(
1541            &TestSingleObjectWriter::get_schema(),
1542            1024,
1543            header_builder,
1544        )
1545        .expect("Should resolve schema");
1546        let value = obj.into();
1547        writer
1548            .write_value_ref(&value, &mut buf)
1549            .expect("Error serializing properly");
1550
1551        assert_eq!(buf[0], 0x03);
1552        assert_eq!(buf[1], 0x00);
1553        assert_eq!(buf[2..18], schema_uuid.into_bytes()[..]);
1554        Ok(())
1555    }
1556
1557    #[test]
1558    fn test_writer_parity() -> TestResult {
1559        let obj1 = TestSingleObjectWriter {
1560            a: 300,
1561            b: 34.555,
1562            c: vec!["cat".into(), "dog".into()],
1563        };
1564
1565        let mut buf1: Vec<u8> = Vec::new();
1566        let mut buf2: Vec<u8> = Vec::new();
1567        let mut buf3: Vec<u8> = Vec::new();
1568
1569        let mut generic_writer = GenericSingleObjectWriter::new_with_capacity(
1570            &TestSingleObjectWriter::get_schema(),
1571            1024,
1572        )
1573        .expect("Should resolve schema");
1574        let mut specific_writer =
1575            SpecificSingleObjectWriter::<TestSingleObjectWriter>::with_capacity(1024)
1576                .expect("Resolved should pass");
1577        specific_writer
1578            .write(obj1.clone(), &mut buf1)
1579            .expect("Serialization expected");
1580        specific_writer
1581            .write_value(obj1.clone(), &mut buf2)
1582            .expect("Serialization expected");
1583        generic_writer
1584            .write_value(obj1.into(), &mut buf3)
1585            .expect("Serialization expected");
1586        assert_eq!(buf1, buf2);
1587        assert_eq!(buf1, buf3);
1588
1589        Ok(())
1590    }
1591
1592    #[test]
1593    fn avro_3894_take_aliases_into_account_when_serializing() -> TestResult {
1594        const SCHEMA: &str = r#"
1595  {
1596      "type": "record",
1597      "name": "Conference",
1598      "fields": [
1599          {"type": "string", "name": "name"},
1600          {"type": ["null", "long"], "name": "date", "aliases" : [ "time2", "time" ]}
1601      ]
1602  }"#;
1603
1604        #[derive(Debug, PartialEq, Eq, Clone, Serialize)]
1605        pub struct Conference {
1606            pub name: String,
1607            pub time: Option<i64>,
1608        }
1609
1610        let conf = Conference {
1611            name: "RustConf".to_string(),
1612            time: Some(1234567890),
1613        };
1614
1615        let schema = Schema::parse_str(SCHEMA)?;
1616        let mut writer = Writer::new(&schema, Vec::new())?;
1617
1618        let bytes = writer.append_ser(conf)?;
1619
1620        assert_eq!(198, bytes);
1621
1622        Ok(())
1623    }
1624
1625    #[test]
1626    fn avro_4014_validation_returns_a_detailed_error() -> TestResult {
1627        const SCHEMA: &str = r#"
1628  {
1629      "type": "record",
1630      "name": "Conference",
1631      "fields": [
1632          {"type": "string", "name": "name"},
1633          {"type": ["null", "long"], "name": "date", "aliases" : [ "time2", "time" ]}
1634      ]
1635  }"#;
1636
1637        #[derive(Debug, PartialEq, Clone, Serialize)]
1638        pub struct Conference {
1639            pub name: String,
1640            pub time: Option<f64>, // wrong type: f64 instead of i64
1641        }
1642
1643        let conf = Conference {
1644            name: "RustConf".to_string(),
1645            time: Some(12345678.90),
1646        };
1647
1648        let schema = Schema::parse_str(SCHEMA)?;
1649        let mut writer = Writer::new(&schema, Vec::new())?;
1650
1651        match writer.append_ser(conf) {
1652            Ok(bytes) => panic!("Expected an error, but got {bytes} bytes written"),
1653            Err(e) => {
1654                assert_eq!(
1655                    e.to_string(),
1656                    r#"Failed to serialize field 'time' for record Record(RecordSchema { name: Name { name: "Conference", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "name", doc: None, aliases: None, default: None, schema: String, order: Ascending, position: 0, custom_attributes: {} }, RecordField { name: "date", doc: None, aliases: Some(["time2", "time"]), default: None, schema: Union(UnionSchema { schemas: [Null, Long], variant_index: {Null: 0, Long: 1} }), order: Ascending, position: 1, custom_attributes: {} }], lookup: {"date": 1, "name": 0, "time": 1, "time2": 1}, attributes: {} }): Failed to serialize value of type f64 using schema Long: 12345678.9. Cause: Expected: Long. Got: Double"#
1657                );
1658            }
1659        }
1660        Ok(())
1661    }
1662
1663    #[test]
1664    fn avro_4063_flush_applies_to_inner_writer() -> TestResult {
1665        const SCHEMA: &str = r#"
1666        {
1667            "type": "record",
1668            "name": "ExampleSchema",
1669            "fields": [
1670                {"name": "exampleField", "type": "string"}
1671            ]
1672        }
1673        "#;
1674
1675        #[derive(Clone, Default)]
1676        struct TestBuffer(Rc<RefCell<Vec<u8>>>);
1677
1678        impl TestBuffer {
1679            fn len(&self) -> usize {
1680                self.0.borrow().len()
1681            }
1682        }
1683
1684        impl Write for TestBuffer {
1685            fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
1686                self.0.borrow_mut().write(buf)
1687            }
1688
1689            fn flush(&mut self) -> std::io::Result<()> {
1690                Ok(())
1691            }
1692        }
1693
1694        let shared_buffer = TestBuffer::default();
1695
1696        let buffered_writer = std::io::BufWriter::new(shared_buffer.clone());
1697
1698        let schema = Schema::parse_str(SCHEMA)?;
1699
1700        let mut writer = Writer::new(&schema, buffered_writer)?;
1701
1702        let mut record = Record::new(writer.schema()).unwrap();
1703        record.put("exampleField", "value");
1704
1705        writer.append(record)?;
1706        writer.flush()?;
1707
1708        assert_eq!(
1709            shared_buffer.len(),
1710            167,
1711            "the test buffer was not fully written to after Writer::flush was called"
1712        );
1713
1714        Ok(())
1715    }
1716}