apache_avro/
writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Logic handling writing in Avro format at user level.
19use crate::{
20    encode::{encode, encode_internal, encode_to_vec},
21    headers::{HeaderBuilder, RabinFingerprintHeader},
22    schema::{AvroSchema, Name, ResolvedOwnedSchema, ResolvedSchema, Schema},
23    ser_schema::SchemaAwareWriteSerializer,
24    types::Value,
25    AvroResult, Codec, Error,
26};
27use serde::Serialize;
28use std::{
29    collections::HashMap, io::Write, marker::PhantomData, mem::ManuallyDrop, ops::RangeInclusive,
30};
31
32const DEFAULT_BLOCK_SIZE: usize = 16000;
33const AVRO_OBJECT_HEADER: &[u8] = b"Obj\x01";
34
35/// Main interface for writing Avro formatted values.
36///
37/// It is critical to call flush before `Writer<W>` is dropped. Though dropping will attempt to flush
38/// the contents of the buffer, any errors that happen in the process of dropping will be ignored.
39/// Calling flush ensures that the buffer is empty and thus dropping will not even attempt file operations.
40#[derive(bon::Builder)]
41pub struct Writer<'a, W: Write> {
42    schema: &'a Schema,
43    writer: W,
44    #[builder(skip)]
45    resolved_schema: Option<ResolvedSchema<'a>>,
46    #[builder(default = Codec::Null)]
47    codec: Codec,
48    #[builder(default = DEFAULT_BLOCK_SIZE)]
49    block_size: usize,
50    #[builder(skip = Vec::with_capacity(block_size))]
51    buffer: Vec<u8>,
52    #[builder(skip)]
53    num_values: usize,
54    #[builder(default = generate_sync_marker())]
55    marker: [u8; 16],
56    #[builder(default = false)]
57    has_header: bool,
58    #[builder(default)]
59    user_metadata: HashMap<String, Value>,
60}
61
62impl<'a, W: Write> Writer<'a, W> {
63    /// Creates a `Writer` given a `Schema` and something implementing the `io::Write` trait to write
64    /// to.
65    /// No compression `Codec` will be used.
66    pub fn new(schema: &'a Schema, writer: W) -> Self {
67        Writer::with_codec(schema, writer, Codec::Null)
68    }
69
70    /// Creates a `Writer` with a specific `Codec` given a `Schema` and something implementing the
71    /// `io::Write` trait to write to.
72    pub fn with_codec(schema: &'a Schema, writer: W, codec: Codec) -> Self {
73        let mut w = Self::builder()
74            .schema(schema)
75            .writer(writer)
76            .codec(codec)
77            .build();
78        w.resolved_schema = ResolvedSchema::try_from(schema).ok();
79        w
80    }
81
82    /// Creates a `Writer` with a specific `Codec` given a `Schema` and something implementing the
83    /// `io::Write` trait to write to.
84    /// If the `schema` is incomplete, i.e. contains `Schema::Ref`s then all dependencies must
85    /// be provided in `schemata`.
86    pub fn with_schemata(
87        schema: &'a Schema,
88        schemata: Vec<&'a Schema>,
89        writer: W,
90        codec: Codec,
91    ) -> Self {
92        let mut w = Self::builder()
93            .schema(schema)
94            .writer(writer)
95            .codec(codec)
96            .build();
97        w.resolved_schema = ResolvedSchema::try_from(schemata).ok();
98        w
99    }
100
101    /// Creates a `Writer` that will append values to already populated
102    /// `std::io::Write` using the provided `marker`
103    /// No compression `Codec` will be used.
104    pub fn append_to(schema: &'a Schema, writer: W, marker: [u8; 16]) -> Self {
105        Writer::append_to_with_codec(schema, writer, Codec::Null, marker)
106    }
107
108    /// Creates a `Writer` that will append values to already populated
109    /// `std::io::Write` using the provided `marker`
110    pub fn append_to_with_codec(
111        schema: &'a Schema,
112        writer: W,
113        codec: Codec,
114        marker: [u8; 16],
115    ) -> Self {
116        let mut w = Self::builder()
117            .schema(schema)
118            .writer(writer)
119            .codec(codec)
120            .marker(marker)
121            .has_header(true)
122            .build();
123        w.resolved_schema = ResolvedSchema::try_from(schema).ok();
124        w
125    }
126
127    /// Creates a `Writer` that will append values to already populated
128    /// `std::io::Write` using the provided `marker`
129    pub fn append_to_with_codec_schemata(
130        schema: &'a Schema,
131        schemata: Vec<&'a Schema>,
132        writer: W,
133        codec: Codec,
134        marker: [u8; 16],
135    ) -> Self {
136        let mut w = Self::builder()
137            .schema(schema)
138            .writer(writer)
139            .codec(codec)
140            .marker(marker)
141            .has_header(true)
142            .build();
143        w.resolved_schema = ResolvedSchema::try_from(schemata).ok();
144        w
145    }
146
147    /// Get a reference to the `Schema` associated to a `Writer`.
148    pub fn schema(&self) -> &'a Schema {
149        self.schema
150    }
151
152    /// Append a compatible value (implementing the `ToAvro` trait) to a `Writer`, also performing
153    /// schema validation.
154    ///
155    /// Return the number of bytes written (it might be 0, see below).
156    ///
157    /// **NOTE**: This function is not guaranteed to perform any actual write, since it relies on
158    /// internal buffering for performance reasons. If you want to be sure the value has been
159    /// written, then call [`flush`](Writer::flush).
160    pub fn append<T: Into<Value>>(&mut self, value: T) -> AvroResult<usize> {
161        let n = self.maybe_write_header()?;
162
163        let avro = value.into();
164        self.append_value_ref(&avro).map(|m| m + n)
165    }
166
167    /// Append a compatible value to a `Writer`, also performing schema validation.
168    ///
169    /// Return the number of bytes written (it might be 0, see below).
170    ///
171    /// **NOTE**: This function is not guaranteed to perform any actual write, since it relies on
172    /// internal buffering for performance reasons. If you want to be sure the value has been
173    /// written, then call [`flush`](Writer::flush).
174    pub fn append_value_ref(&mut self, value: &Value) -> AvroResult<usize> {
175        let n = self.maybe_write_header()?;
176
177        // Lazy init for users using the builder pattern with error throwing
178        match self.resolved_schema {
179            Some(ref rs) => {
180                write_value_ref_resolved(self.schema, rs, value, &mut self.buffer)?;
181                self.num_values += 1;
182
183                if self.buffer.len() >= self.block_size {
184                    return self.flush().map(|b| b + n);
185                }
186
187                Ok(n)
188            }
189            None => {
190                let rs = ResolvedSchema::try_from(self.schema)?;
191                self.resolved_schema = Some(rs);
192                self.append_value_ref(value)
193            }
194        }
195    }
196
197    /// Append anything implementing the `Serialize` trait to a `Writer` for
198    /// [`serde`](https://docs.serde.rs/serde/index.html) compatibility, also performing schema
199    /// validation.
200    ///
201    /// Return the number of bytes written.
202    ///
203    /// **NOTE**: This function is not guaranteed to perform any actual write, since it relies on
204    /// internal buffering for performance reasons. If you want to be sure the value has been
205    /// written, then call [`flush`](Writer::flush).
206    pub fn append_ser<S: Serialize>(&mut self, value: S) -> AvroResult<usize> {
207        let n = self.maybe_write_header()?;
208
209        match self.resolved_schema {
210            Some(ref rs) => {
211                let mut serializer = SchemaAwareWriteSerializer::new(
212                    &mut self.buffer,
213                    self.schema,
214                    rs.get_names(),
215                    None,
216                );
217                value.serialize(&mut serializer)?;
218                self.num_values += 1;
219
220                if self.buffer.len() >= self.block_size {
221                    return self.flush().map(|b| b + n);
222                }
223
224                Ok(n)
225            }
226            None => {
227                let rs = ResolvedSchema::try_from(self.schema)?;
228                self.resolved_schema = Some(rs);
229                self.append_ser(value)
230            }
231        }
232    }
233
234    /// Extend a `Writer` with an `Iterator` of compatible values (implementing the `ToAvro`
235    /// trait), also performing schema validation.
236    ///
237    /// Return the number of bytes written.
238    ///
239    /// **NOTE**: This function forces the written data to be flushed (an implicit
240    /// call to [`flush`](Writer::flush) is performed).
241    pub fn extend<I, T: Into<Value>>(&mut self, values: I) -> AvroResult<usize>
242    where
243        I: IntoIterator<Item = T>,
244    {
245        /*
246        https://github.com/rust-lang/rfcs/issues/811 :(
247        let mut stream = values
248            .filter_map(|value| value.serialize(&mut self.serializer).ok())
249            .map(|value| value.encode(self.schema))
250            .collect::<Option<Vec<_>>>()
251            .ok_or_else(|| err_msg("value does not match given schema"))?
252            .into_iter()
253            .fold(Vec::new(), |mut acc, stream| {
254                num_values += 1;
255                acc.extend(stream); acc
256            });
257        */
258
259        let mut num_bytes = 0;
260        for value in values {
261            num_bytes += self.append(value)?;
262        }
263        num_bytes += self.flush()?;
264
265        Ok(num_bytes)
266    }
267
268    /// Extend a `Writer` with an `Iterator` of anything implementing the `Serialize` trait for
269    /// [`serde`](https://docs.serde.rs/serde/index.html) compatibility, also performing schema
270    /// validation.
271    ///
272    /// Return the number of bytes written.
273    ///
274    /// **NOTE**: This function forces the written data to be flushed (an implicit
275    /// call to [`flush`](Writer::flush) is performed).
276    pub fn extend_ser<I, T: Serialize>(&mut self, values: I) -> AvroResult<usize>
277    where
278        I: IntoIterator<Item = T>,
279    {
280        /*
281        https://github.com/rust-lang/rfcs/issues/811 :(
282        let mut stream = values
283            .filter_map(|value| value.serialize(&mut self.serializer).ok())
284            .map(|value| value.encode(self.schema))
285            .collect::<Option<Vec<_>>>()
286            .ok_or_else(|| err_msg("value does not match given schema"))?
287            .into_iter()
288            .fold(Vec::new(), |mut acc, stream| {
289                num_values += 1;
290                acc.extend(stream); acc
291            });
292        */
293
294        let mut num_bytes = 0;
295        for value in values {
296            num_bytes += self.append_ser(value)?;
297        }
298        num_bytes += self.flush()?;
299
300        Ok(num_bytes)
301    }
302
303    /// Extend a `Writer` by appending each `Value` from a slice, while also performing schema
304    /// validation on each value appended.
305    ///
306    /// Return the number of bytes written.
307    ///
308    /// **NOTE**: This function forces the written data to be flushed (an implicit
309    /// call to [`flush`](Writer::flush) is performed).
310    pub fn extend_from_slice(&mut self, values: &[Value]) -> AvroResult<usize> {
311        let mut num_bytes = 0;
312        for value in values {
313            num_bytes += self.append_value_ref(value)?;
314        }
315        num_bytes += self.flush()?;
316
317        Ok(num_bytes)
318    }
319
320    /// Flush the content appended to a `Writer`. Call this function to make sure all the content
321    /// has been written before releasing the `Writer`.
322    ///
323    /// Return the number of bytes written.
324    pub fn flush(&mut self) -> AvroResult<usize> {
325        if self.num_values == 0 {
326            return Ok(0);
327        }
328
329        self.codec.compress(&mut self.buffer)?;
330
331        let num_values = self.num_values;
332        let stream_len = self.buffer.len();
333
334        let num_bytes = self.append_raw(&num_values.into(), &Schema::Long)?
335            + self.append_raw(&stream_len.into(), &Schema::Long)?
336            + self
337                .writer
338                .write(self.buffer.as_ref())
339                .map_err(Error::WriteBytes)?
340            + self.append_marker()?;
341
342        self.buffer.clear();
343        self.num_values = 0;
344
345        self.writer.flush().map_err(Error::FlushWriter)?;
346
347        Ok(num_bytes)
348    }
349
350    /// Return what the `Writer` is writing to, consuming the `Writer` itself.
351    ///
352    /// **NOTE**: This function forces the written data to be flushed (an implicit
353    /// call to [`flush`](Writer::flush) is performed).
354    pub fn into_inner(mut self) -> AvroResult<W> {
355        self.maybe_write_header()?;
356        self.flush()?;
357
358        let mut this = ManuallyDrop::new(self);
359
360        // Extract every member that is not Copy and therefore should be dropped
361        let _resolved_schema = std::mem::take(&mut this.resolved_schema);
362        let _buffer = std::mem::take(&mut this.buffer);
363        let _user_metadata = std::mem::take(&mut this.user_metadata);
364
365        // SAFETY: double-drops are prevented by putting `this` in a ManuallyDrop that is never dropped
366        let writer = unsafe { std::ptr::read(&this.writer) };
367
368        Ok(writer)
369    }
370
371    /// Gets a reference to the underlying writer.
372    ///
373    /// **NOTE**: There is likely data still in the buffer. To have all the data
374    /// in the writer call [`flush`](Writer::flush) first.
375    pub fn get_ref(&self) -> &W {
376        &self.writer
377    }
378
379    /// Gets a mutable reference to the underlying writer.
380    ///
381    /// It is inadvisable to directly write to the underlying writer.
382    ///
383    /// **NOTE**: There is likely data still in the buffer. To have all the data
384    /// in the writer call [`flush`](Writer::flush) first.
385    pub fn get_mut(&mut self) -> &mut W {
386        &mut self.writer
387    }
388
389    /// Generate and append synchronization marker to the payload.
390    fn append_marker(&mut self) -> AvroResult<usize> {
391        // using .writer.write directly to avoid mutable borrow of self
392        // with ref borrowing of self.marker
393        self.writer.write(&self.marker).map_err(Error::WriteMarker)
394    }
395
396    /// Append a raw Avro Value to the payload avoiding to encode it again.
397    fn append_raw(&mut self, value: &Value, schema: &Schema) -> AvroResult<usize> {
398        self.append_bytes(encode_to_vec(value, schema)?.as_ref())
399    }
400
401    /// Append pure bytes to the payload.
402    fn append_bytes(&mut self, bytes: &[u8]) -> AvroResult<usize> {
403        self.writer.write(bytes).map_err(Error::WriteBytes)
404    }
405
406    /// Adds custom metadata to the file.
407    /// This method could be used only before adding the first record to the writer.
408    pub fn add_user_metadata<T: AsRef<[u8]>>(&mut self, key: String, value: T) -> AvroResult<()> {
409        if !self.has_header {
410            if key.starts_with("avro.") {
411                return Err(Error::InvalidMetadataKey(key));
412            }
413            self.user_metadata
414                .insert(key, Value::Bytes(value.as_ref().to_vec()));
415            Ok(())
416        } else {
417            Err(Error::FileHeaderAlreadyWritten)
418        }
419    }
420
421    /// Create an Avro header based on schema, codec and sync marker.
422    fn header(&self) -> Result<Vec<u8>, Error> {
423        let schema_bytes = serde_json::to_string(self.schema)
424            .map_err(Error::ConvertJsonToString)?
425            .into_bytes();
426
427        let mut metadata = HashMap::with_capacity(2);
428        metadata.insert("avro.schema", Value::Bytes(schema_bytes));
429        metadata.insert("avro.codec", self.codec.into());
430        match self.codec {
431            #[cfg(feature = "bzip")]
432            Codec::Bzip2(settings) => {
433                metadata.insert(
434                    "avro.codec.compression_level",
435                    Value::Bytes(vec![settings.compression_level]),
436                );
437            }
438            #[cfg(feature = "xz")]
439            Codec::Xz(settings) => {
440                metadata.insert(
441                    "avro.codec.compression_level",
442                    Value::Bytes(vec![settings.compression_level]),
443                );
444            }
445            #[cfg(feature = "zstandard")]
446            Codec::Zstandard(settings) => {
447                metadata.insert(
448                    "avro.codec.compression_level",
449                    Value::Bytes(vec![settings.compression_level]),
450                );
451            }
452            _ => {}
453        }
454
455        for (k, v) in &self.user_metadata {
456            metadata.insert(k.as_str(), v.clone());
457        }
458
459        let mut header = Vec::new();
460        header.extend_from_slice(AVRO_OBJECT_HEADER);
461        encode(&metadata.into(), &Schema::map(Schema::Bytes), &mut header)?;
462        header.extend_from_slice(&self.marker);
463
464        Ok(header)
465    }
466
467    fn maybe_write_header(&mut self) -> AvroResult<usize> {
468        if !self.has_header {
469            let header = self.header()?;
470            let n = self.append_bytes(header.as_ref())?;
471            self.has_header = true;
472            Ok(n)
473        } else {
474            Ok(0)
475        }
476    }
477}
478
479impl<'a, W: Write> Drop for Writer<'a, W> {
480    /// Drop the writer, will try to flush ignoring any errors.
481    fn drop(&mut self) {
482        let _ = self.maybe_write_header();
483        let _ = self.flush();
484    }
485}
486
487/// Encode a compatible value (implementing the `ToAvro` trait) into Avro format, also performing
488/// schema validation.
489///
490/// This is an internal function which gets the bytes buffer where to write as parameter instead of
491/// creating a new one like `to_avro_datum`.
492fn write_avro_datum<T: Into<Value>, W: Write>(
493    schema: &Schema,
494    value: T,
495    writer: &mut W,
496) -> Result<(), Error> {
497    let avro = value.into();
498    if !avro.validate(schema) {
499        return Err(Error::Validation);
500    }
501    encode(&avro, schema, writer)?;
502    Ok(())
503}
504
505fn write_avro_datum_schemata<T: Into<Value>>(
506    schema: &Schema,
507    schemata: Vec<&Schema>,
508    value: T,
509    buffer: &mut Vec<u8>,
510) -> AvroResult<usize> {
511    let avro = value.into();
512    let rs = ResolvedSchema::try_from(schemata)?;
513    let names = rs.get_names();
514    let enclosing_namespace = schema.namespace();
515    if let Some(_err) = avro.validate_internal(schema, names, &enclosing_namespace) {
516        return Err(Error::Validation);
517    }
518    encode_internal(&avro, schema, names, &enclosing_namespace, buffer)
519}
520
521/// Writer that encodes messages according to the single object encoding v1 spec
522/// Uses an API similar to the current File Writer
523/// Writes all object bytes at once, and drains internal buffer
524pub struct GenericSingleObjectWriter {
525    buffer: Vec<u8>,
526    resolved: ResolvedOwnedSchema,
527}
528
529impl GenericSingleObjectWriter {
530    pub fn new_with_capacity(
531        schema: &Schema,
532        initial_buffer_cap: usize,
533    ) -> AvroResult<GenericSingleObjectWriter> {
534        let header_builder = RabinFingerprintHeader::from_schema(schema);
535        Self::new_with_capacity_and_header_builder(schema, initial_buffer_cap, header_builder)
536    }
537
538    pub fn new_with_capacity_and_header_builder<HB: HeaderBuilder>(
539        schema: &Schema,
540        initial_buffer_cap: usize,
541        header_builder: HB,
542    ) -> AvroResult<GenericSingleObjectWriter> {
543        let mut buffer = Vec::with_capacity(initial_buffer_cap);
544        let header = header_builder.build_header();
545        buffer.extend_from_slice(&header);
546
547        Ok(GenericSingleObjectWriter {
548            buffer,
549            resolved: ResolvedOwnedSchema::try_from(schema.clone())?,
550        })
551    }
552
553    const HEADER_LENGTH_RANGE: RangeInclusive<usize> = 10_usize..=20_usize;
554
555    /// Write the referenced Value to the provided Write object. Returns a result with the number of bytes written including the header
556    pub fn write_value_ref<W: Write>(&mut self, v: &Value, writer: &mut W) -> AvroResult<usize> {
557        let original_length = self.buffer.len();
558        if !Self::HEADER_LENGTH_RANGE.contains(&original_length) {
559            Err(Error::IllegalSingleObjectWriterState)
560        } else {
561            write_value_ref_owned_resolved(&self.resolved, v, &mut self.buffer)?;
562            writer.write_all(&self.buffer).map_err(Error::WriteBytes)?;
563            let len = self.buffer.len();
564            self.buffer.truncate(original_length);
565            Ok(len)
566        }
567    }
568
569    /// Write the Value to the provided Write object. Returns a result with the number of bytes written including the header
570    pub fn write_value<W: Write>(&mut self, v: Value, writer: &mut W) -> AvroResult<usize> {
571        self.write_value_ref(&v, writer)
572    }
573}
574
575/// Writer that encodes messages according to the single object encoding v1 spec
576pub struct SpecificSingleObjectWriter<T>
577where
578    T: AvroSchema,
579{
580    inner: GenericSingleObjectWriter,
581    schema: Schema,
582    header_written: bool,
583    _model: PhantomData<T>,
584}
585
586impl<T> SpecificSingleObjectWriter<T>
587where
588    T: AvroSchema,
589{
590    pub fn with_capacity(buffer_cap: usize) -> AvroResult<SpecificSingleObjectWriter<T>> {
591        let schema = T::get_schema();
592        Ok(SpecificSingleObjectWriter {
593            inner: GenericSingleObjectWriter::new_with_capacity(&schema, buffer_cap)?,
594            schema,
595            header_written: false,
596            _model: PhantomData,
597        })
598    }
599}
600
601impl<T> SpecificSingleObjectWriter<T>
602where
603    T: AvroSchema + Into<Value>,
604{
605    /// Write the `Into<Value>` to the provided Write object. Returns a result with the number
606    /// of bytes written including the header
607    pub fn write_value<W: Write>(&mut self, data: T, writer: &mut W) -> AvroResult<usize> {
608        let v: Value = data.into();
609        self.inner.write_value_ref(&v, writer)
610    }
611}
612
613impl<T> SpecificSingleObjectWriter<T>
614where
615    T: AvroSchema + Serialize,
616{
617    /// Write the referenced `Serialize` object to the provided `Write` object. Returns a result with
618    /// the number of bytes written including the header
619    pub fn write_ref<W: Write>(&mut self, data: &T, writer: &mut W) -> AvroResult<usize> {
620        let mut bytes_written: usize = 0;
621
622        if !self.header_written {
623            bytes_written += writer
624                .write(self.inner.buffer.as_slice())
625                .map_err(Error::WriteBytes)?;
626            self.header_written = true;
627        }
628
629        bytes_written += write_avro_datum_ref(&self.schema, data, writer)?;
630
631        Ok(bytes_written)
632    }
633
634    /// Write the Serialize object to the provided Write object. Returns a result with the number
635    /// of bytes written including the header
636    pub fn write<W: Write>(&mut self, data: T, writer: &mut W) -> AvroResult<usize> {
637        self.write_ref(&data, writer)
638    }
639}
640
641fn write_value_ref_resolved(
642    schema: &Schema,
643    resolved_schema: &ResolvedSchema,
644    value: &Value,
645    buffer: &mut Vec<u8>,
646) -> AvroResult<usize> {
647    match value.validate_internal(schema, resolved_schema.get_names(), &schema.namespace()) {
648        Some(reason) => Err(Error::ValidationWithReason {
649            value: value.clone(),
650            schema: Box::new(schema.clone()),
651            reason,
652        }),
653        None => encode_internal(
654            value,
655            schema,
656            resolved_schema.get_names(),
657            &schema.namespace(),
658            buffer,
659        ),
660    }
661}
662
663fn write_value_ref_owned_resolved(
664    resolved_schema: &ResolvedOwnedSchema,
665    value: &Value,
666    buffer: &mut Vec<u8>,
667) -> AvroResult<()> {
668    let root_schema = resolved_schema.get_root_schema();
669    if let Some(reason) = value.validate_internal(
670        root_schema,
671        resolved_schema.get_names(),
672        &root_schema.namespace(),
673    ) {
674        return Err(Error::ValidationWithReason {
675            value: value.clone(),
676            schema: Box::new(root_schema.clone()),
677            reason,
678        });
679    }
680    encode_internal(
681        value,
682        root_schema,
683        resolved_schema.get_names(),
684        &root_schema.namespace(),
685        buffer,
686    )?;
687    Ok(())
688}
689
690/// Encode a compatible value (implementing the `ToAvro` trait) into Avro format, also
691/// performing schema validation.
692///
693/// **NOTE**: This function has a quite small niche of usage and does NOT generate headers and sync
694/// markers; use [`Writer`] to be fully Avro-compatible if you don't know what
695/// you are doing, instead.
696pub fn to_avro_datum<T: Into<Value>>(schema: &Schema, value: T) -> AvroResult<Vec<u8>> {
697    let mut buffer = Vec::new();
698    write_avro_datum(schema, value, &mut buffer)?;
699    Ok(buffer)
700}
701
702/// Write the referenced [Serialize]able object to the provided [Write] object.
703/// Returns a result with the number of bytes written.
704///
705/// **NOTE**: This function has a quite small niche of usage and does **NOT** generate headers and sync
706/// markers; use [`append_ser`](Writer::append_ser) to be fully Avro-compatible
707/// if you don't know what you are doing, instead.
708pub fn write_avro_datum_ref<T: Serialize, W: Write>(
709    schema: &Schema,
710    data: &T,
711    writer: &mut W,
712) -> AvroResult<usize> {
713    let names: HashMap<Name, &Schema> = HashMap::new();
714    let mut serializer = SchemaAwareWriteSerializer::new(writer, schema, &names, None);
715    let bytes_written = data.serialize(&mut serializer)?;
716    Ok(bytes_written)
717}
718
719/// Encode a compatible value (implementing the `ToAvro` trait) into Avro format, also
720/// performing schema validation.
721/// If the provided `schema` is incomplete then its dependencies must be
722/// provided in `schemata`
723pub fn to_avro_datum_schemata<T: Into<Value>>(
724    schema: &Schema,
725    schemata: Vec<&Schema>,
726    value: T,
727) -> AvroResult<Vec<u8>> {
728    let mut buffer = Vec::new();
729    write_avro_datum_schemata(schema, schemata, value, &mut buffer)?;
730    Ok(buffer)
731}
732
733#[cfg(not(target_arch = "wasm32"))]
734fn generate_sync_marker() -> [u8; 16] {
735    let mut marker = [0_u8; 16];
736    std::iter::repeat_with(rand::random)
737        .take(16)
738        .enumerate()
739        .for_each(|(i, n)| marker[i] = n);
740    marker
741}
742
743#[cfg(target_arch = "wasm32")]
744fn generate_sync_marker() -> [u8; 16] {
745    let mut marker = [0_u8; 16];
746    std::iter::repeat_with(quad_rand::rand)
747        .take(4)
748        .flat_map(|i| i.to_be_bytes())
749        .enumerate()
750        .for_each(|(i, n)| marker[i] = n);
751    marker
752}
753
754#[cfg(test)]
755mod tests {
756    use std::{cell::RefCell, rc::Rc};
757
758    use super::*;
759    use crate::{
760        decimal::Decimal,
761        duration::{Days, Duration, Millis, Months},
762        headers::GlueSchemaUuidHeader,
763        rabin::Rabin,
764        schema::{DecimalSchema, FixedSchema, Name},
765        types::Record,
766        util::zig_i64,
767        Reader,
768    };
769    use pretty_assertions::assert_eq;
770    use serde::{Deserialize, Serialize};
771    use uuid::Uuid;
772
773    use crate::codec::DeflateSettings;
774    use apache_avro_test_helper::TestResult;
775
776    const AVRO_OBJECT_HEADER_LEN: usize = AVRO_OBJECT_HEADER.len();
777
778    const SCHEMA: &str = r#"
779    {
780      "type": "record",
781      "name": "test",
782      "fields": [
783        {
784          "name": "a",
785          "type": "long",
786          "default": 42
787        },
788        {
789          "name": "b",
790          "type": "string"
791        }
792      ]
793    }
794    "#;
795
796    const UNION_SCHEMA: &str = r#"["null", "long"]"#;
797
798    #[test]
799    fn test_to_avro_datum() -> TestResult {
800        let schema = Schema::parse_str(SCHEMA)?;
801        let mut record = Record::new(&schema).unwrap();
802        record.put("a", 27i64);
803        record.put("b", "foo");
804
805        let mut expected = Vec::new();
806        zig_i64(27, &mut expected)?;
807        zig_i64(3, &mut expected)?;
808        expected.extend([b'f', b'o', b'o']);
809
810        assert_eq!(to_avro_datum(&schema, record)?, expected);
811
812        Ok(())
813    }
814
815    #[test]
816    fn avro_rs_193_write_avro_datum_ref() -> TestResult {
817        #[derive(Serialize)]
818        struct TestStruct {
819            a: i64,
820            b: String,
821        }
822
823        let schema = Schema::parse_str(SCHEMA)?;
824        let mut writer: Vec<u8> = Vec::new();
825        let data = TestStruct {
826            a: 27,
827            b: "foo".to_string(),
828        };
829
830        let mut expected = Vec::new();
831        zig_i64(27, &mut expected)?;
832        zig_i64(3, &mut expected)?;
833        expected.extend([b'f', b'o', b'o']);
834
835        let bytes = write_avro_datum_ref(&schema, &data, &mut writer)?;
836
837        assert_eq!(bytes, expected.len());
838        assert_eq!(writer, expected);
839
840        Ok(())
841    }
842
843    #[test]
844    fn test_union_not_null() -> TestResult {
845        let schema = Schema::parse_str(UNION_SCHEMA)?;
846        let union = Value::Union(1, Box::new(Value::Long(3)));
847
848        let mut expected = Vec::new();
849        zig_i64(1, &mut expected)?;
850        zig_i64(3, &mut expected)?;
851
852        assert_eq!(to_avro_datum(&schema, union)?, expected);
853
854        Ok(())
855    }
856
857    #[test]
858    fn test_union_null() -> TestResult {
859        let schema = Schema::parse_str(UNION_SCHEMA)?;
860        let union = Value::Union(0, Box::new(Value::Null));
861
862        let mut expected = Vec::new();
863        zig_i64(0, &mut expected)?;
864
865        assert_eq!(to_avro_datum(&schema, union)?, expected);
866
867        Ok(())
868    }
869
870    fn logical_type_test<T: Into<Value> + Clone>(
871        schema_str: &'static str,
872
873        expected_schema: &Schema,
874        value: Value,
875
876        raw_schema: &Schema,
877        raw_value: T,
878    ) -> TestResult {
879        let schema = Schema::parse_str(schema_str)?;
880        assert_eq!(&schema, expected_schema);
881        // The serialized format should be the same as the schema.
882        let ser = to_avro_datum(&schema, value.clone())?;
883        let raw_ser = to_avro_datum(raw_schema, raw_value)?;
884        assert_eq!(ser, raw_ser);
885
886        // Should deserialize from the schema into the logical type.
887        let mut r = ser.as_slice();
888        let de = crate::from_avro_datum(&schema, &mut r, None)?;
889        assert_eq!(de, value);
890        Ok(())
891    }
892
893    #[test]
894    fn date() -> TestResult {
895        logical_type_test(
896            r#"{"type": "int", "logicalType": "date"}"#,
897            &Schema::Date,
898            Value::Date(1_i32),
899            &Schema::Int,
900            1_i32,
901        )
902    }
903
904    #[test]
905    fn time_millis() -> TestResult {
906        logical_type_test(
907            r#"{"type": "int", "logicalType": "time-millis"}"#,
908            &Schema::TimeMillis,
909            Value::TimeMillis(1_i32),
910            &Schema::Int,
911            1_i32,
912        )
913    }
914
915    #[test]
916    fn time_micros() -> TestResult {
917        logical_type_test(
918            r#"{"type": "long", "logicalType": "time-micros"}"#,
919            &Schema::TimeMicros,
920            Value::TimeMicros(1_i64),
921            &Schema::Long,
922            1_i64,
923        )
924    }
925
926    #[test]
927    fn timestamp_millis() -> TestResult {
928        logical_type_test(
929            r#"{"type": "long", "logicalType": "timestamp-millis"}"#,
930            &Schema::TimestampMillis,
931            Value::TimestampMillis(1_i64),
932            &Schema::Long,
933            1_i64,
934        )
935    }
936
937    #[test]
938    fn timestamp_micros() -> TestResult {
939        logical_type_test(
940            r#"{"type": "long", "logicalType": "timestamp-micros"}"#,
941            &Schema::TimestampMicros,
942            Value::TimestampMicros(1_i64),
943            &Schema::Long,
944            1_i64,
945        )
946    }
947
948    #[test]
949    fn decimal_fixed() -> TestResult {
950        let size = 30;
951        let inner = Schema::Fixed(FixedSchema {
952            name: Name::new("decimal")?,
953            aliases: None,
954            doc: None,
955            size,
956            default: None,
957            attributes: Default::default(),
958        });
959        let value = vec![0u8; size];
960        logical_type_test(
961            r#"{"type": {"type": "fixed", "size": 30, "name": "decimal"}, "logicalType": "decimal", "precision": 20, "scale": 5}"#,
962            &Schema::Decimal(DecimalSchema {
963                precision: 20,
964                scale: 5,
965                inner: Box::new(inner.clone()),
966            }),
967            Value::Decimal(Decimal::from(value.clone())),
968            &inner,
969            Value::Fixed(size, value),
970        )
971    }
972
973    #[test]
974    fn decimal_bytes() -> TestResult {
975        let inner = Schema::Bytes;
976        let value = vec![0u8; 10];
977        logical_type_test(
978            r#"{"type": "bytes", "logicalType": "decimal", "precision": 4, "scale": 3}"#,
979            &Schema::Decimal(DecimalSchema {
980                precision: 4,
981                scale: 3,
982                inner: Box::new(inner.clone()),
983            }),
984            Value::Decimal(Decimal::from(value.clone())),
985            &inner,
986            value,
987        )
988    }
989
990    #[test]
991    fn duration() -> TestResult {
992        let inner = Schema::Fixed(FixedSchema {
993            name: Name::new("duration")?,
994            aliases: None,
995            doc: None,
996            size: 12,
997            default: None,
998            attributes: Default::default(),
999        });
1000        let value = Value::Duration(Duration::new(
1001            Months::new(256),
1002            Days::new(512),
1003            Millis::new(1024),
1004        ));
1005        logical_type_test(
1006            r#"{"type": {"type": "fixed", "name": "duration", "size": 12}, "logicalType": "duration"}"#,
1007            &Schema::Duration,
1008            value,
1009            &inner,
1010            Value::Fixed(12, vec![0, 1, 0, 0, 0, 2, 0, 0, 0, 4, 0, 0]),
1011        )
1012    }
1013
1014    #[test]
1015    fn test_writer_append() -> TestResult {
1016        let schema = Schema::parse_str(SCHEMA)?;
1017        let mut writer = Writer::new(&schema, Vec::new());
1018
1019        let mut record = Record::new(&schema).unwrap();
1020        record.put("a", 27i64);
1021        record.put("b", "foo");
1022
1023        let n1 = writer.append(record.clone())?;
1024        let n2 = writer.append(record.clone())?;
1025        let n3 = writer.flush()?;
1026        let result = writer.into_inner()?;
1027
1028        assert_eq!(n1 + n2 + n3, result.len());
1029
1030        let mut data = Vec::new();
1031        zig_i64(27, &mut data)?;
1032        zig_i64(3, &mut data)?;
1033        data.extend(b"foo");
1034        data.extend(data.clone());
1035
1036        // starts with magic
1037        assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1038        // ends with data and sync marker
1039        let last_data_byte = result.len() - 16;
1040        assert_eq!(
1041            &result[last_data_byte - data.len()..last_data_byte],
1042            data.as_slice()
1043        );
1044
1045        Ok(())
1046    }
1047
1048    #[test]
1049    fn test_writer_extend() -> TestResult {
1050        let schema = Schema::parse_str(SCHEMA)?;
1051        let mut writer = Writer::new(&schema, Vec::new());
1052
1053        let mut record = Record::new(&schema).unwrap();
1054        record.put("a", 27i64);
1055        record.put("b", "foo");
1056        let record_copy = record.clone();
1057        let records = vec![record, record_copy];
1058
1059        let n1 = writer.extend(records)?;
1060        let n2 = writer.flush()?;
1061        let result = writer.into_inner()?;
1062
1063        assert_eq!(n1 + n2, result.len());
1064
1065        let mut data = Vec::new();
1066        zig_i64(27, &mut data)?;
1067        zig_i64(3, &mut data)?;
1068        data.extend(b"foo");
1069        data.extend(data.clone());
1070
1071        // starts with magic
1072        assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1073        // ends with data and sync marker
1074        let last_data_byte = result.len() - 16;
1075        assert_eq!(
1076            &result[last_data_byte - data.len()..last_data_byte],
1077            data.as_slice()
1078        );
1079
1080        Ok(())
1081    }
1082
1083    #[derive(Debug, Clone, Deserialize, Serialize)]
1084    struct TestSerdeSerialize {
1085        a: i64,
1086        b: String,
1087    }
1088
1089    #[test]
1090    fn test_writer_append_ser() -> TestResult {
1091        let schema = Schema::parse_str(SCHEMA)?;
1092        let mut writer = Writer::new(&schema, Vec::new());
1093
1094        let record = TestSerdeSerialize {
1095            a: 27,
1096            b: "foo".to_owned(),
1097        };
1098
1099        let n1 = writer.append_ser(record)?;
1100        let n2 = writer.flush()?;
1101        let result = writer.into_inner()?;
1102
1103        assert_eq!(n1 + n2, result.len());
1104
1105        let mut data = Vec::new();
1106        zig_i64(27, &mut data)?;
1107        zig_i64(3, &mut data)?;
1108        data.extend(b"foo");
1109
1110        // starts with magic
1111        assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1112        // ends with data and sync marker
1113        let last_data_byte = result.len() - 16;
1114        assert_eq!(
1115            &result[last_data_byte - data.len()..last_data_byte],
1116            data.as_slice()
1117        );
1118
1119        Ok(())
1120    }
1121
1122    #[test]
1123    fn test_writer_extend_ser() -> TestResult {
1124        let schema = Schema::parse_str(SCHEMA)?;
1125        let mut writer = Writer::new(&schema, Vec::new());
1126
1127        let record = TestSerdeSerialize {
1128            a: 27,
1129            b: "foo".to_owned(),
1130        };
1131        let record_copy = record.clone();
1132        let records = vec![record, record_copy];
1133
1134        let n1 = writer.extend_ser(records)?;
1135        let n2 = writer.flush()?;
1136        let result = writer.into_inner()?;
1137
1138        assert_eq!(n1 + n2, result.len());
1139
1140        let mut data = Vec::new();
1141        zig_i64(27, &mut data)?;
1142        zig_i64(3, &mut data)?;
1143        data.extend(b"foo");
1144        data.extend(data.clone());
1145
1146        // starts with magic
1147        assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1148        // ends with data and sync marker
1149        let last_data_byte = result.len() - 16;
1150        assert_eq!(
1151            &result[last_data_byte - data.len()..last_data_byte],
1152            data.as_slice()
1153        );
1154
1155        Ok(())
1156    }
1157
1158    fn make_writer_with_codec(schema: &Schema) -> Writer<'_, Vec<u8>> {
1159        Writer::with_codec(
1160            schema,
1161            Vec::new(),
1162            Codec::Deflate(DeflateSettings::default()),
1163        )
1164    }
1165
1166    fn make_writer_with_builder(schema: &Schema) -> Writer<'_, Vec<u8>> {
1167        Writer::builder()
1168            .writer(Vec::new())
1169            .schema(schema)
1170            .codec(Codec::Deflate(DeflateSettings::default()))
1171            .block_size(100)
1172            .build()
1173    }
1174
1175    fn check_writer(mut writer: Writer<'_, Vec<u8>>, schema: &Schema) -> TestResult {
1176        let mut record = Record::new(schema).unwrap();
1177        record.put("a", 27i64);
1178        record.put("b", "foo");
1179
1180        let n1 = writer.append(record.clone())?;
1181        let n2 = writer.append(record.clone())?;
1182        let n3 = writer.flush()?;
1183        let result = writer.into_inner()?;
1184
1185        assert_eq!(n1 + n2 + n3, result.len());
1186
1187        let mut data = Vec::new();
1188        zig_i64(27, &mut data)?;
1189        zig_i64(3, &mut data)?;
1190        data.extend(b"foo");
1191        data.extend(data.clone());
1192        Codec::Deflate(DeflateSettings::default()).compress(&mut data)?;
1193
1194        // starts with magic
1195        assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1196        // ends with data and sync marker
1197        let last_data_byte = result.len() - 16;
1198        assert_eq!(
1199            &result[last_data_byte - data.len()..last_data_byte],
1200            data.as_slice()
1201        );
1202
1203        Ok(())
1204    }
1205
1206    #[test]
1207    fn test_writer_with_codec() -> TestResult {
1208        let schema = Schema::parse_str(SCHEMA)?;
1209        let writer = make_writer_with_codec(&schema);
1210        check_writer(writer, &schema)
1211    }
1212
1213    #[test]
1214    fn test_writer_with_builder() -> TestResult {
1215        let schema = Schema::parse_str(SCHEMA)?;
1216        let writer = make_writer_with_builder(&schema);
1217        check_writer(writer, &schema)
1218    }
1219
1220    #[test]
1221    fn test_logical_writer() -> TestResult {
1222        const LOGICAL_TYPE_SCHEMA: &str = r#"
1223        {
1224          "type": "record",
1225          "name": "logical_type_test",
1226          "fields": [
1227            {
1228              "name": "a",
1229              "type": [
1230                "null",
1231                {
1232                  "type": "long",
1233                  "logicalType": "timestamp-micros"
1234                }
1235              ]
1236            }
1237          ]
1238        }
1239        "#;
1240        let codec = Codec::Deflate(DeflateSettings::default());
1241        let schema = Schema::parse_str(LOGICAL_TYPE_SCHEMA)?;
1242        let mut writer = Writer::builder()
1243            .schema(&schema)
1244            .codec(codec)
1245            .writer(Vec::new())
1246            .build();
1247
1248        let mut record1 = Record::new(&schema).unwrap();
1249        record1.put(
1250            "a",
1251            Value::Union(1, Box::new(Value::TimestampMicros(1234_i64))),
1252        );
1253
1254        let mut record2 = Record::new(&schema).unwrap();
1255        record2.put("a", Value::Union(0, Box::new(Value::Null)));
1256
1257        let n1 = writer.append(record1)?;
1258        let n2 = writer.append(record2)?;
1259        let n3 = writer.flush()?;
1260        let result = writer.into_inner()?;
1261
1262        assert_eq!(n1 + n2 + n3, result.len());
1263
1264        let mut data = Vec::new();
1265        // byte indicating not null
1266        zig_i64(1, &mut data)?;
1267        zig_i64(1234, &mut data)?;
1268
1269        // byte indicating null
1270        zig_i64(0, &mut data)?;
1271        codec.compress(&mut data)?;
1272
1273        // starts with magic
1274        assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1275        // ends with data and sync marker
1276        let last_data_byte = result.len() - 16;
1277        assert_eq!(
1278            &result[last_data_byte - data.len()..last_data_byte],
1279            data.as_slice()
1280        );
1281
1282        Ok(())
1283    }
1284
1285    #[test]
1286    fn test_avro_3405_writer_add_metadata_success() -> TestResult {
1287        let schema = Schema::parse_str(SCHEMA)?;
1288        let mut writer = Writer::new(&schema, Vec::new());
1289
1290        writer.add_user_metadata("stringKey".to_string(), String::from("stringValue"))?;
1291        writer.add_user_metadata("strKey".to_string(), "strValue")?;
1292        writer.add_user_metadata("bytesKey".to_string(), b"bytesValue")?;
1293        writer.add_user_metadata("vecKey".to_string(), vec![1, 2, 3])?;
1294
1295        let mut record = Record::new(&schema).unwrap();
1296        record.put("a", 27i64);
1297        record.put("b", "foo");
1298
1299        writer.append(record.clone())?;
1300        writer.append(record.clone())?;
1301        writer.flush()?;
1302        let result = writer.into_inner()?;
1303
1304        assert_eq!(result.len(), 260);
1305
1306        Ok(())
1307    }
1308
1309    #[test]
1310    fn test_avro_3881_metadata_empty_body() -> TestResult {
1311        let schema = Schema::parse_str(SCHEMA)?;
1312        let mut writer = Writer::new(&schema, Vec::new());
1313        writer.add_user_metadata("a".to_string(), "b")?;
1314        let result = writer.into_inner()?;
1315
1316        let reader = Reader::with_schema(&schema, &result[..])?;
1317        let mut expected = HashMap::new();
1318        expected.insert("a".to_string(), vec![b'b']);
1319        assert_eq!(reader.user_metadata(), &expected);
1320        assert_eq!(reader.into_iter().count(), 0);
1321
1322        Ok(())
1323    }
1324
1325    #[test]
1326    fn test_avro_3405_writer_add_metadata_failure() -> TestResult {
1327        let schema = Schema::parse_str(SCHEMA)?;
1328        let mut writer = Writer::new(&schema, Vec::new());
1329
1330        let mut record = Record::new(&schema).unwrap();
1331        record.put("a", 27i64);
1332        record.put("b", "foo");
1333        writer.append(record.clone())?;
1334
1335        match writer.add_user_metadata("stringKey".to_string(), String::from("value2")) {
1336            Err(e @ Error::FileHeaderAlreadyWritten) => {
1337                assert_eq!(e.to_string(), "The file metadata is already flushed.")
1338            }
1339            Err(e) => panic!("Unexpected error occurred while writing user metadata: {e:?}"),
1340            Ok(_) => panic!("Expected an error that metadata cannot be added after adding data"),
1341        }
1342
1343        Ok(())
1344    }
1345
1346    #[test]
1347    fn test_avro_3405_writer_add_metadata_reserved_prefix_failure() -> TestResult {
1348        let schema = Schema::parse_str(SCHEMA)?;
1349        let mut writer = Writer::new(&schema, Vec::new());
1350
1351        let key = "avro.stringKey".to_string();
1352        match writer.add_user_metadata(key.clone(), "value") {
1353            Err(ref e @ Error::InvalidMetadataKey(_)) => {
1354                assert_eq!(e.to_string(), format!("Metadata keys starting with 'avro.' are reserved for internal usage: {key}."))
1355            }
1356            Err(e) => panic!(
1357                "Unexpected error occurred while writing user metadata with reserved prefix ('avro.'): {e:?}"
1358            ),
1359            Ok(_) => panic!("Expected an error that the metadata key cannot be prefixed with 'avro.'"),
1360        }
1361
1362        Ok(())
1363    }
1364
1365    #[test]
1366    fn test_avro_3405_writer_add_metadata_with_builder_api_success() -> TestResult {
1367        let schema = Schema::parse_str(SCHEMA)?;
1368
1369        let mut user_meta_data: HashMap<String, Value> = HashMap::new();
1370        user_meta_data.insert(
1371            "stringKey".to_string(),
1372            Value::String("stringValue".to_string()),
1373        );
1374        user_meta_data.insert("bytesKey".to_string(), Value::Bytes(b"bytesValue".to_vec()));
1375        user_meta_data.insert("vecKey".to_string(), Value::Bytes(vec![1, 2, 3]));
1376
1377        let writer: Writer<'_, Vec<u8>> = Writer::builder()
1378            .writer(Vec::new())
1379            .schema(&schema)
1380            .user_metadata(user_meta_data.clone())
1381            .build();
1382
1383        assert_eq!(writer.user_metadata, user_meta_data);
1384
1385        Ok(())
1386    }
1387
1388    #[derive(Serialize, Clone)]
1389    struct TestSingleObjectWriter {
1390        a: i64,
1391        b: f64,
1392        c: Vec<String>,
1393    }
1394
1395    impl AvroSchema for TestSingleObjectWriter {
1396        fn get_schema() -> Schema {
1397            let schema = r#"
1398            {
1399                "type":"record",
1400                "name":"TestSingleObjectWrtierSerialize",
1401                "fields":[
1402                    {
1403                        "name":"a",
1404                        "type":"long"
1405                    },
1406                    {
1407                        "name":"b",
1408                        "type":"double"
1409                    },
1410                    {
1411                        "name":"c",
1412                        "type":{
1413                            "type":"array",
1414                            "items":"string"
1415                        }
1416                    }
1417                ]
1418            }
1419            "#;
1420            Schema::parse_str(schema).unwrap()
1421        }
1422    }
1423
1424    impl From<TestSingleObjectWriter> for Value {
1425        fn from(obj: TestSingleObjectWriter) -> Value {
1426            Value::Record(vec![
1427                ("a".into(), obj.a.into()),
1428                ("b".into(), obj.b.into()),
1429                (
1430                    "c".into(),
1431                    Value::Array(obj.c.into_iter().map(|s| s.into()).collect()),
1432                ),
1433            ])
1434        }
1435    }
1436
1437    #[test]
1438    fn test_single_object_writer() -> TestResult {
1439        let mut buf: Vec<u8> = Vec::new();
1440        let obj = TestSingleObjectWriter {
1441            a: 300,
1442            b: 34.555,
1443            c: vec!["cat".into(), "dog".into()],
1444        };
1445        let mut writer = GenericSingleObjectWriter::new_with_capacity(
1446            &TestSingleObjectWriter::get_schema(),
1447            1024,
1448        )
1449        .expect("Should resolve schema");
1450        let value = obj.into();
1451        let written_bytes = writer
1452            .write_value_ref(&value, &mut buf)
1453            .expect("Error serializing properly");
1454
1455        assert!(buf.len() > 10, "no bytes written");
1456        assert_eq!(buf.len(), written_bytes);
1457        assert_eq!(buf[0], 0xC3);
1458        assert_eq!(buf[1], 0x01);
1459        assert_eq!(
1460            &buf[2..10],
1461            &TestSingleObjectWriter::get_schema()
1462                .fingerprint::<Rabin>()
1463                .bytes[..]
1464        );
1465        let mut msg_binary = Vec::new();
1466        encode(
1467            &value,
1468            &TestSingleObjectWriter::get_schema(),
1469            &mut msg_binary,
1470        )
1471        .expect("encode should have failed by here as a dependency of any writing");
1472        assert_eq!(&buf[10..], &msg_binary[..]);
1473
1474        Ok(())
1475    }
1476
1477    #[test]
1478    fn test_single_object_writer_with_header_builder() -> TestResult {
1479        let mut buf: Vec<u8> = Vec::new();
1480        let obj = TestSingleObjectWriter {
1481            a: 300,
1482            b: 34.555,
1483            c: vec!["cat".into(), "dog".into()],
1484        };
1485        let schema_uuid = Uuid::parse_str("b2f1cf00-0434-013e-439a-125eb8485a5f")?;
1486        let header_builder = GlueSchemaUuidHeader::from_uuid(schema_uuid);
1487        let mut writer = GenericSingleObjectWriter::new_with_capacity_and_header_builder(
1488            &TestSingleObjectWriter::get_schema(),
1489            1024,
1490            header_builder,
1491        )
1492        .expect("Should resolve schema");
1493        let value = obj.into();
1494        writer
1495            .write_value_ref(&value, &mut buf)
1496            .expect("Error serializing properly");
1497
1498        assert_eq!(buf[0], 0x03);
1499        assert_eq!(buf[1], 0x00);
1500        assert_eq!(buf[2..18], schema_uuid.into_bytes()[..]);
1501        Ok(())
1502    }
1503
1504    #[test]
1505    fn test_writer_parity() -> TestResult {
1506        let obj1 = TestSingleObjectWriter {
1507            a: 300,
1508            b: 34.555,
1509            c: vec!["cat".into(), "dog".into()],
1510        };
1511
1512        let mut buf1: Vec<u8> = Vec::new();
1513        let mut buf2: Vec<u8> = Vec::new();
1514        let mut buf3: Vec<u8> = Vec::new();
1515
1516        let mut generic_writer = GenericSingleObjectWriter::new_with_capacity(
1517            &TestSingleObjectWriter::get_schema(),
1518            1024,
1519        )
1520        .expect("Should resolve schema");
1521        let mut specific_writer =
1522            SpecificSingleObjectWriter::<TestSingleObjectWriter>::with_capacity(1024)
1523                .expect("Resolved should pass");
1524        specific_writer
1525            .write(obj1.clone(), &mut buf1)
1526            .expect("Serialization expected");
1527        specific_writer
1528            .write_value(obj1.clone(), &mut buf2)
1529            .expect("Serialization expected");
1530        generic_writer
1531            .write_value(obj1.into(), &mut buf3)
1532            .expect("Serialization expected");
1533        assert_eq!(buf1, buf2);
1534        assert_eq!(buf1, buf3);
1535
1536        Ok(())
1537    }
1538
1539    #[test]
1540    fn avro_3894_take_aliases_into_account_when_serializing() -> TestResult {
1541        const SCHEMA: &str = r#"
1542  {
1543      "type": "record",
1544      "name": "Conference",
1545      "fields": [
1546          {"type": "string", "name": "name"},
1547          {"type": ["null", "long"], "name": "date", "aliases" : [ "time2", "time" ]}
1548      ]
1549  }"#;
1550
1551        #[derive(Debug, PartialEq, Eq, Clone, Serialize)]
1552        pub struct Conference {
1553            pub name: String,
1554            pub time: Option<i64>,
1555        }
1556
1557        let conf = Conference {
1558            name: "RustConf".to_string(),
1559            time: Some(1234567890),
1560        };
1561
1562        let schema = Schema::parse_str(SCHEMA)?;
1563        let mut writer = Writer::new(&schema, Vec::new());
1564
1565        let bytes = writer.append_ser(conf)?;
1566
1567        assert_eq!(198, bytes);
1568
1569        Ok(())
1570    }
1571
1572    #[test]
1573    fn avro_4014_validation_returns_a_detailed_error() -> TestResult {
1574        const SCHEMA: &str = r#"
1575  {
1576      "type": "record",
1577      "name": "Conference",
1578      "fields": [
1579          {"type": "string", "name": "name"},
1580          {"type": ["null", "long"], "name": "date", "aliases" : [ "time2", "time" ]}
1581      ]
1582  }"#;
1583
1584        #[derive(Debug, PartialEq, Clone, Serialize)]
1585        pub struct Conference {
1586            pub name: String,
1587            pub time: Option<f64>, // wrong type: f64 instead of i64
1588        }
1589
1590        let conf = Conference {
1591            name: "RustConf".to_string(),
1592            time: Some(12345678.90),
1593        };
1594
1595        let schema = Schema::parse_str(SCHEMA)?;
1596        let mut writer = Writer::new(&schema, Vec::new());
1597
1598        match writer.append_ser(conf) {
1599            Ok(bytes) => panic!("Expected an error, but got {bytes} bytes written"),
1600            Err(e) => {
1601                assert_eq!(
1602                    e.to_string(),
1603                    r#"Failed to serialize field 'time' for record Record(RecordSchema { name: Name { name: "Conference", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "name", doc: None, aliases: None, default: None, schema: String, order: Ascending, position: 0, custom_attributes: {} }, RecordField { name: "date", doc: None, aliases: Some(["time2", "time"]), default: None, schema: Union(UnionSchema { schemas: [Null, Long], variant_index: {Null: 0, Long: 1} }), order: Ascending, position: 1, custom_attributes: {} }], lookup: {"date": 1, "name": 0, "time": 1, "time2": 1}, attributes: {} }): Failed to serialize value of type f64 using schema Long: 12345678.9. Cause: Expected: Long. Got: Double"#
1604                );
1605            }
1606        }
1607        Ok(())
1608    }
1609
1610    #[test]
1611    fn avro_4063_flush_applies_to_inner_writer() -> TestResult {
1612        const SCHEMA: &str = r#"
1613        {
1614            "type": "record",
1615            "name": "ExampleSchema",
1616            "fields": [
1617                {"name": "exampleField", "type": "string"}
1618            ]
1619        }
1620        "#;
1621
1622        #[derive(Clone, Default)]
1623        struct TestBuffer(Rc<RefCell<Vec<u8>>>);
1624
1625        impl TestBuffer {
1626            fn len(&self) -> usize {
1627                self.0.borrow().len()
1628            }
1629        }
1630
1631        impl Write for TestBuffer {
1632            fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
1633                self.0.borrow_mut().write(buf)
1634            }
1635
1636            fn flush(&mut self) -> std::io::Result<()> {
1637                Ok(())
1638            }
1639        }
1640
1641        let shared_buffer = TestBuffer::default();
1642
1643        let buffered_writer = std::io::BufWriter::new(shared_buffer.clone());
1644
1645        let schema = Schema::parse_str(SCHEMA)?;
1646
1647        let mut writer = Writer::new(&schema, buffered_writer);
1648
1649        let mut record = Record::new(writer.schema()).unwrap();
1650        record.put("exampleField", "value");
1651
1652        writer.append(record)?;
1653        writer.flush()?;
1654
1655        assert_eq!(
1656            shared_buffer.len(),
1657            167,
1658            "the test buffer was not fully written to after Writer::flush was called"
1659        );
1660
1661        Ok(())
1662    }
1663}