apache_avro/
writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Logic handling writing in Avro format at user level.
19use crate::{
20    encode::{encode, encode_internal, encode_to_vec},
21    headers::{HeaderBuilder, RabinFingerprintHeader},
22    schema::{AvroSchema, Name, ResolvedOwnedSchema, ResolvedSchema, Schema},
23    ser_schema::SchemaAwareWriteSerializer,
24    types::Value,
25    AvroResult, Codec, Error,
26};
27use serde::Serialize;
28use std::{collections::HashMap, io::Write, marker::PhantomData, ops::RangeInclusive};
29
30const DEFAULT_BLOCK_SIZE: usize = 16000;
31const AVRO_OBJECT_HEADER: &[u8] = b"Obj\x01";
32
33/// Main interface for writing Avro formatted values.
34#[derive(bon::Builder)]
35pub struct Writer<'a, W: Write> {
36    schema: &'a Schema,
37    writer: W,
38    #[builder(skip)]
39    resolved_schema: Option<ResolvedSchema<'a>>,
40    #[builder(default = Codec::Null)]
41    codec: Codec,
42    #[builder(default = DEFAULT_BLOCK_SIZE)]
43    block_size: usize,
44    #[builder(skip = Vec::with_capacity(block_size))]
45    buffer: Vec<u8>,
46    #[builder(skip)]
47    num_values: usize,
48    #[builder(default = generate_sync_marker())]
49    marker: [u8; 16],
50    #[builder(default = false)]
51    has_header: bool,
52    #[builder(default)]
53    user_metadata: HashMap<String, Value>,
54}
55
56impl<'a, W: Write> Writer<'a, W> {
57    /// Creates a `Writer` given a `Schema` and something implementing the `io::Write` trait to write
58    /// to.
59    /// No compression `Codec` will be used.
60    pub fn new(schema: &'a Schema, writer: W) -> Self {
61        Writer::with_codec(schema, writer, Codec::Null)
62    }
63
64    /// Creates a `Writer` with a specific `Codec` given a `Schema` and something implementing the
65    /// `io::Write` trait to write to.
66    pub fn with_codec(schema: &'a Schema, writer: W, codec: Codec) -> Self {
67        let mut w = Self::builder()
68            .schema(schema)
69            .writer(writer)
70            .codec(codec)
71            .build();
72        w.resolved_schema = ResolvedSchema::try_from(schema).ok();
73        w
74    }
75
76    /// Creates a `Writer` with a specific `Codec` given a `Schema` and something implementing the
77    /// `io::Write` trait to write to.
78    /// If the `schema` is incomplete, i.e. contains `Schema::Ref`s then all dependencies must
79    /// be provided in `schemata`.
80    pub fn with_schemata(
81        schema: &'a Schema,
82        schemata: Vec<&'a Schema>,
83        writer: W,
84        codec: Codec,
85    ) -> Self {
86        let mut w = Self::builder()
87            .schema(schema)
88            .writer(writer)
89            .codec(codec)
90            .build();
91        w.resolved_schema = ResolvedSchema::try_from(schemata).ok();
92        w
93    }
94
95    /// Creates a `Writer` that will append values to already populated
96    /// `std::io::Write` using the provided `marker`
97    /// No compression `Codec` will be used.
98    pub fn append_to(schema: &'a Schema, writer: W, marker: [u8; 16]) -> Self {
99        Writer::append_to_with_codec(schema, writer, Codec::Null, marker)
100    }
101
102    /// Creates a `Writer` that will append values to already populated
103    /// `std::io::Write` using the provided `marker`
104    pub fn append_to_with_codec(
105        schema: &'a Schema,
106        writer: W,
107        codec: Codec,
108        marker: [u8; 16],
109    ) -> Self {
110        let mut w = Self::builder()
111            .schema(schema)
112            .writer(writer)
113            .codec(codec)
114            .marker(marker)
115            .has_header(true)
116            .build();
117        w.resolved_schema = ResolvedSchema::try_from(schema).ok();
118        w
119    }
120
121    /// Creates a `Writer` that will append values to already populated
122    /// `std::io::Write` using the provided `marker`
123    pub fn append_to_with_codec_schemata(
124        schema: &'a Schema,
125        schemata: Vec<&'a Schema>,
126        writer: W,
127        codec: Codec,
128        marker: [u8; 16],
129    ) -> Self {
130        let mut w = Self::builder()
131            .schema(schema)
132            .writer(writer)
133            .codec(codec)
134            .marker(marker)
135            .has_header(true)
136            .build();
137        w.resolved_schema = ResolvedSchema::try_from(schemata).ok();
138        w
139    }
140
141    /// Get a reference to the `Schema` associated to a `Writer`.
142    pub fn schema(&self) -> &'a Schema {
143        self.schema
144    }
145
146    /// Append a compatible value (implementing the `ToAvro` trait) to a `Writer`, also performing
147    /// schema validation.
148    ///
149    /// Return the number of bytes written (it might be 0, see below).
150    ///
151    /// **NOTE**: This function is not guaranteed to perform any actual write, since it relies on
152    /// internal buffering for performance reasons. If you want to be sure the value has been
153    /// written, then call [`flush`](Writer::flush).
154    pub fn append<T: Into<Value>>(&mut self, value: T) -> AvroResult<usize> {
155        let n = self.maybe_write_header()?;
156
157        let avro = value.into();
158        self.append_value_ref(&avro).map(|m| m + n)
159    }
160
161    /// Append a compatible value to a `Writer`, also performing schema validation.
162    ///
163    /// Return the number of bytes written (it might be 0, see below).
164    ///
165    /// **NOTE**: This function is not guaranteed to perform any actual write, since it relies on
166    /// internal buffering for performance reasons. If you want to be sure the value has been
167    /// written, then call [`flush`](Writer::flush).
168    pub fn append_value_ref(&mut self, value: &Value) -> AvroResult<usize> {
169        let n = self.maybe_write_header()?;
170
171        // Lazy init for users using the builder pattern with error throwing
172        match self.resolved_schema {
173            Some(ref rs) => {
174                write_value_ref_resolved(self.schema, rs, value, &mut self.buffer)?;
175                self.num_values += 1;
176
177                if self.buffer.len() >= self.block_size {
178                    return self.flush().map(|b| b + n);
179                }
180
181                Ok(n)
182            }
183            None => {
184                let rs = ResolvedSchema::try_from(self.schema)?;
185                self.resolved_schema = Some(rs);
186                self.append_value_ref(value)
187            }
188        }
189    }
190
191    /// Append anything implementing the `Serialize` trait to a `Writer` for
192    /// [`serde`](https://docs.serde.rs/serde/index.html) compatibility, also performing schema
193    /// validation.
194    ///
195    /// Return the number of bytes written.
196    ///
197    /// **NOTE**: This function is not guaranteed to perform any actual write, since it relies on
198    /// internal buffering for performance reasons. If you want to be sure the value has been
199    /// written, then call [`flush`](Writer::flush).
200    pub fn append_ser<S: Serialize>(&mut self, value: S) -> AvroResult<usize> {
201        let n = self.maybe_write_header()?;
202
203        match self.resolved_schema {
204            Some(ref rs) => {
205                let mut serializer = SchemaAwareWriteSerializer::new(
206                    &mut self.buffer,
207                    self.schema,
208                    rs.get_names(),
209                    None,
210                );
211                value.serialize(&mut serializer)?;
212                self.num_values += 1;
213
214                if self.buffer.len() >= self.block_size {
215                    return self.flush().map(|b| b + n);
216                }
217
218                Ok(n)
219            }
220            None => {
221                let rs = ResolvedSchema::try_from(self.schema)?;
222                self.resolved_schema = Some(rs);
223                self.append_ser(value)
224            }
225        }
226    }
227
228    /// Extend a `Writer` with an `Iterator` of compatible values (implementing the `ToAvro`
229    /// trait), also performing schema validation.
230    ///
231    /// Return the number of bytes written.
232    ///
233    /// **NOTE**: This function forces the written data to be flushed (an implicit
234    /// call to [`flush`](Writer::flush) is performed).
235    pub fn extend<I, T: Into<Value>>(&mut self, values: I) -> AvroResult<usize>
236    where
237        I: IntoIterator<Item = T>,
238    {
239        /*
240        https://github.com/rust-lang/rfcs/issues/811 :(
241        let mut stream = values
242            .filter_map(|value| value.serialize(&mut self.serializer).ok())
243            .map(|value| value.encode(self.schema))
244            .collect::<Option<Vec<_>>>()
245            .ok_or_else(|| err_msg("value does not match given schema"))?
246            .into_iter()
247            .fold(Vec::new(), |mut acc, stream| {
248                num_values += 1;
249                acc.extend(stream); acc
250            });
251        */
252
253        let mut num_bytes = 0;
254        for value in values {
255            num_bytes += self.append(value)?;
256        }
257        num_bytes += self.flush()?;
258
259        Ok(num_bytes)
260    }
261
262    /// Extend a `Writer` with an `Iterator` of anything implementing the `Serialize` trait for
263    /// [`serde`](https://docs.serde.rs/serde/index.html) compatibility, also performing schema
264    /// validation.
265    ///
266    /// Return the number of bytes written.
267    ///
268    /// **NOTE**: This function forces the written data to be flushed (an implicit
269    /// call to [`flush`](Writer::flush) is performed).
270    pub fn extend_ser<I, T: Serialize>(&mut self, values: I) -> AvroResult<usize>
271    where
272        I: IntoIterator<Item = T>,
273    {
274        /*
275        https://github.com/rust-lang/rfcs/issues/811 :(
276        let mut stream = values
277            .filter_map(|value| value.serialize(&mut self.serializer).ok())
278            .map(|value| value.encode(self.schema))
279            .collect::<Option<Vec<_>>>()
280            .ok_or_else(|| err_msg("value does not match given schema"))?
281            .into_iter()
282            .fold(Vec::new(), |mut acc, stream| {
283                num_values += 1;
284                acc.extend(stream); acc
285            });
286        */
287
288        let mut num_bytes = 0;
289        for value in values {
290            num_bytes += self.append_ser(value)?;
291        }
292        num_bytes += self.flush()?;
293
294        Ok(num_bytes)
295    }
296
297    /// Extend a `Writer` by appending each `Value` from a slice, while also performing schema
298    /// validation on each value appended.
299    ///
300    /// Return the number of bytes written.
301    ///
302    /// **NOTE**: This function forces the written data to be flushed (an implicit
303    /// call to [`flush`](Writer::flush) is performed).
304    pub fn extend_from_slice(&mut self, values: &[Value]) -> AvroResult<usize> {
305        let mut num_bytes = 0;
306        for value in values {
307            num_bytes += self.append_value_ref(value)?;
308        }
309        num_bytes += self.flush()?;
310
311        Ok(num_bytes)
312    }
313
314    /// Flush the content appended to a `Writer`. Call this function to make sure all the content
315    /// has been written before releasing the `Writer`.
316    ///
317    /// Return the number of bytes written.
318    pub fn flush(&mut self) -> AvroResult<usize> {
319        if self.num_values == 0 {
320            return Ok(0);
321        }
322
323        self.codec.compress(&mut self.buffer)?;
324
325        let num_values = self.num_values;
326        let stream_len = self.buffer.len();
327
328        let num_bytes = self.append_raw(&num_values.into(), &Schema::Long)?
329            + self.append_raw(&stream_len.into(), &Schema::Long)?
330            + self
331                .writer
332                .write(self.buffer.as_ref())
333                .map_err(Error::WriteBytes)?
334            + self.append_marker()?;
335
336        self.buffer.clear();
337        self.num_values = 0;
338
339        self.writer.flush().map_err(Error::FlushWriter)?;
340
341        Ok(num_bytes)
342    }
343
344    /// Return what the `Writer` is writing to, consuming the `Writer` itself.
345    ///
346    /// **NOTE**: This function forces the written data to be flushed (an implicit
347    /// call to [`flush`](Writer::flush) is performed).
348    pub fn into_inner(mut self) -> AvroResult<W> {
349        self.maybe_write_header()?;
350        self.flush()?;
351        Ok(self.writer)
352    }
353
354    /// Gets a reference to the underlying writer.
355    ///
356    /// **NOTE**: There is likely data still in the buffer. To have all the data
357    /// in the writer call [`flush`](Writer::flush) first.
358    pub fn get_ref(&self) -> &W {
359        &self.writer
360    }
361
362    /// Gets a mutable reference to the underlying writer.
363    ///
364    /// It is inadvisable to directly write to the underlying writer.
365    ///
366    /// **NOTE**: There is likely data still in the buffer. To have all the data
367    /// in the writer call [`flush`](Writer::flush) first.
368    pub fn get_mut(&mut self) -> &mut W {
369        &mut self.writer
370    }
371
372    /// Generate and append synchronization marker to the payload.
373    fn append_marker(&mut self) -> AvroResult<usize> {
374        // using .writer.write directly to avoid mutable borrow of self
375        // with ref borrowing of self.marker
376        self.writer.write(&self.marker).map_err(Error::WriteMarker)
377    }
378
379    /// Append a raw Avro Value to the payload avoiding to encode it again.
380    fn append_raw(&mut self, value: &Value, schema: &Schema) -> AvroResult<usize> {
381        self.append_bytes(encode_to_vec(value, schema)?.as_ref())
382    }
383
384    /// Append pure bytes to the payload.
385    fn append_bytes(&mut self, bytes: &[u8]) -> AvroResult<usize> {
386        self.writer.write(bytes).map_err(Error::WriteBytes)
387    }
388
389    /// Adds custom metadata to the file.
390    /// This method could be used only before adding the first record to the writer.
391    pub fn add_user_metadata<T: AsRef<[u8]>>(&mut self, key: String, value: T) -> AvroResult<()> {
392        if !self.has_header {
393            if key.starts_with("avro.") {
394                return Err(Error::InvalidMetadataKey(key));
395            }
396            self.user_metadata
397                .insert(key, Value::Bytes(value.as_ref().to_vec()));
398            Ok(())
399        } else {
400            Err(Error::FileHeaderAlreadyWritten)
401        }
402    }
403
404    /// Create an Avro header based on schema, codec and sync marker.
405    fn header(&self) -> Result<Vec<u8>, Error> {
406        let schema_bytes = serde_json::to_string(self.schema)
407            .map_err(Error::ConvertJsonToString)?
408            .into_bytes();
409
410        let mut metadata = HashMap::with_capacity(2);
411        metadata.insert("avro.schema", Value::Bytes(schema_bytes));
412        metadata.insert("avro.codec", self.codec.into());
413        match self.codec {
414            #[cfg(feature = "bzip")]
415            Codec::Bzip2(settings) => {
416                metadata.insert(
417                    "avro.codec.compression_level",
418                    Value::Bytes(vec![settings.compression_level]),
419                );
420            }
421            #[cfg(feature = "xz")]
422            Codec::Xz(settings) => {
423                metadata.insert(
424                    "avro.codec.compression_level",
425                    Value::Bytes(vec![settings.compression_level]),
426                );
427            }
428            #[cfg(feature = "zstandard")]
429            Codec::Zstandard(settings) => {
430                metadata.insert(
431                    "avro.codec.compression_level",
432                    Value::Bytes(vec![settings.compression_level]),
433                );
434            }
435            _ => {}
436        }
437
438        for (k, v) in &self.user_metadata {
439            metadata.insert(k.as_str(), v.clone());
440        }
441
442        let mut header = Vec::new();
443        header.extend_from_slice(AVRO_OBJECT_HEADER);
444        encode(&metadata.into(), &Schema::map(Schema::Bytes), &mut header)?;
445        header.extend_from_slice(&self.marker);
446
447        Ok(header)
448    }
449
450    fn maybe_write_header(&mut self) -> AvroResult<usize> {
451        if !self.has_header {
452            let header = self.header()?;
453            let n = self.append_bytes(header.as_ref())?;
454            self.has_header = true;
455            Ok(n)
456        } else {
457            Ok(0)
458        }
459    }
460}
461
462/// Encode a compatible value (implementing the `ToAvro` trait) into Avro format, also performing
463/// schema validation.
464///
465/// This is an internal function which gets the bytes buffer where to write as parameter instead of
466/// creating a new one like `to_avro_datum`.
467fn write_avro_datum<T: Into<Value>, W: Write>(
468    schema: &Schema,
469    value: T,
470    writer: &mut W,
471) -> Result<(), Error> {
472    let avro = value.into();
473    if !avro.validate(schema) {
474        return Err(Error::Validation);
475    }
476    encode(&avro, schema, writer)?;
477    Ok(())
478}
479
480fn write_avro_datum_schemata<T: Into<Value>>(
481    schema: &Schema,
482    schemata: Vec<&Schema>,
483    value: T,
484    buffer: &mut Vec<u8>,
485) -> AvroResult<usize> {
486    let avro = value.into();
487    let rs = ResolvedSchema::try_from(schemata)?;
488    let names = rs.get_names();
489    let enclosing_namespace = schema.namespace();
490    if let Some(_err) = avro.validate_internal(schema, names, &enclosing_namespace) {
491        return Err(Error::Validation);
492    }
493    encode_internal(&avro, schema, names, &enclosing_namespace, buffer)
494}
495
496/// Writer that encodes messages according to the single object encoding v1 spec
497/// Uses an API similar to the current File Writer
498/// Writes all object bytes at once, and drains internal buffer
499pub struct GenericSingleObjectWriter {
500    buffer: Vec<u8>,
501    resolved: ResolvedOwnedSchema,
502}
503
504impl GenericSingleObjectWriter {
505    pub fn new_with_capacity(
506        schema: &Schema,
507        initial_buffer_cap: usize,
508    ) -> AvroResult<GenericSingleObjectWriter> {
509        let header_builder = RabinFingerprintHeader::from_schema(schema);
510        Self::new_with_capacity_and_header_builder(schema, initial_buffer_cap, header_builder)
511    }
512
513    pub fn new_with_capacity_and_header_builder<HB: HeaderBuilder>(
514        schema: &Schema,
515        initial_buffer_cap: usize,
516        header_builder: HB,
517    ) -> AvroResult<GenericSingleObjectWriter> {
518        let mut buffer = Vec::with_capacity(initial_buffer_cap);
519        let header = header_builder.build_header();
520        buffer.extend_from_slice(&header);
521
522        Ok(GenericSingleObjectWriter {
523            buffer,
524            resolved: ResolvedOwnedSchema::try_from(schema.clone())?,
525        })
526    }
527
528    const HEADER_LENGTH_RANGE: RangeInclusive<usize> = 10_usize..=20_usize;
529
530    /// Write the referenced Value to the provided Write object. Returns a result with the number of bytes written including the header
531    pub fn write_value_ref<W: Write>(&mut self, v: &Value, writer: &mut W) -> AvroResult<usize> {
532        let original_length = self.buffer.len();
533        if !Self::HEADER_LENGTH_RANGE.contains(&original_length) {
534            Err(Error::IllegalSingleObjectWriterState)
535        } else {
536            write_value_ref_owned_resolved(&self.resolved, v, &mut self.buffer)?;
537            writer.write_all(&self.buffer).map_err(Error::WriteBytes)?;
538            let len = self.buffer.len();
539            self.buffer.truncate(original_length);
540            Ok(len)
541        }
542    }
543
544    /// Write the Value to the provided Write object. Returns a result with the number of bytes written including the header
545    pub fn write_value<W: Write>(&mut self, v: Value, writer: &mut W) -> AvroResult<usize> {
546        self.write_value_ref(&v, writer)
547    }
548}
549
550/// Writer that encodes messages according to the single object encoding v1 spec
551pub struct SpecificSingleObjectWriter<T>
552where
553    T: AvroSchema,
554{
555    inner: GenericSingleObjectWriter,
556    schema: Schema,
557    header_written: bool,
558    _model: PhantomData<T>,
559}
560
561impl<T> SpecificSingleObjectWriter<T>
562where
563    T: AvroSchema,
564{
565    pub fn with_capacity(buffer_cap: usize) -> AvroResult<SpecificSingleObjectWriter<T>> {
566        let schema = T::get_schema();
567        Ok(SpecificSingleObjectWriter {
568            inner: GenericSingleObjectWriter::new_with_capacity(&schema, buffer_cap)?,
569            schema,
570            header_written: false,
571            _model: PhantomData,
572        })
573    }
574}
575
576impl<T> SpecificSingleObjectWriter<T>
577where
578    T: AvroSchema + Into<Value>,
579{
580    /// Write the `Into<Value>` to the provided Write object. Returns a result with the number
581    /// of bytes written including the header
582    pub fn write_value<W: Write>(&mut self, data: T, writer: &mut W) -> AvroResult<usize> {
583        let v: Value = data.into();
584        self.inner.write_value_ref(&v, writer)
585    }
586}
587
588impl<T> SpecificSingleObjectWriter<T>
589where
590    T: AvroSchema + Serialize,
591{
592    /// Write the referenced `Serialize` object to the provided `Write` object. Returns a result with
593    /// the number of bytes written including the header
594    pub fn write_ref<W: Write>(&mut self, data: &T, writer: &mut W) -> AvroResult<usize> {
595        let mut bytes_written: usize = 0;
596
597        if !self.header_written {
598            bytes_written += writer
599                .write(self.inner.buffer.as_slice())
600                .map_err(Error::WriteBytes)?;
601            self.header_written = true;
602        }
603
604        bytes_written += write_avro_datum_ref(&self.schema, data, writer)?;
605
606        Ok(bytes_written)
607    }
608
609    /// Write the Serialize object to the provided Write object. Returns a result with the number
610    /// of bytes written including the header
611    pub fn write<W: Write>(&mut self, data: T, writer: &mut W) -> AvroResult<usize> {
612        self.write_ref(&data, writer)
613    }
614}
615
616fn write_value_ref_resolved(
617    schema: &Schema,
618    resolved_schema: &ResolvedSchema,
619    value: &Value,
620    buffer: &mut Vec<u8>,
621) -> AvroResult<usize> {
622    match value.validate_internal(schema, resolved_schema.get_names(), &schema.namespace()) {
623        Some(reason) => Err(Error::ValidationWithReason {
624            value: value.clone(),
625            schema: Box::new(schema.clone()),
626            reason,
627        }),
628        None => encode_internal(
629            value,
630            schema,
631            resolved_schema.get_names(),
632            &schema.namespace(),
633            buffer,
634        ),
635    }
636}
637
638fn write_value_ref_owned_resolved(
639    resolved_schema: &ResolvedOwnedSchema,
640    value: &Value,
641    buffer: &mut Vec<u8>,
642) -> AvroResult<()> {
643    let root_schema = resolved_schema.get_root_schema();
644    if let Some(reason) = value.validate_internal(
645        root_schema,
646        resolved_schema.get_names(),
647        &root_schema.namespace(),
648    ) {
649        return Err(Error::ValidationWithReason {
650            value: value.clone(),
651            schema: Box::new(root_schema.clone()),
652            reason,
653        });
654    }
655    encode_internal(
656        value,
657        root_schema,
658        resolved_schema.get_names(),
659        &root_schema.namespace(),
660        buffer,
661    )?;
662    Ok(())
663}
664
665/// Encode a compatible value (implementing the `ToAvro` trait) into Avro format, also
666/// performing schema validation.
667///
668/// **NOTE**: This function has a quite small niche of usage and does NOT generate headers and sync
669/// markers; use [`Writer`] to be fully Avro-compatible if you don't know what
670/// you are doing, instead.
671pub fn to_avro_datum<T: Into<Value>>(schema: &Schema, value: T) -> AvroResult<Vec<u8>> {
672    let mut buffer = Vec::new();
673    write_avro_datum(schema, value, &mut buffer)?;
674    Ok(buffer)
675}
676
677/// Write the referenced [Serialize]able object to the provided [Write] object.
678/// Returns a result with the number of bytes written.
679///
680/// **NOTE**: This function has a quite small niche of usage and does **NOT** generate headers and sync
681/// markers; use [`append_ser`](Writer::append_ser) to be fully Avro-compatible
682/// if you don't know what you are doing, instead.
683pub fn write_avro_datum_ref<T: Serialize, W: Write>(
684    schema: &Schema,
685    data: &T,
686    writer: &mut W,
687) -> AvroResult<usize> {
688    let names: HashMap<Name, &Schema> = HashMap::new();
689    let mut serializer = SchemaAwareWriteSerializer::new(writer, schema, &names, None);
690    let bytes_written = data.serialize(&mut serializer)?;
691    Ok(bytes_written)
692}
693
694/// Encode a compatible value (implementing the `ToAvro` trait) into Avro format, also
695/// performing schema validation.
696/// If the provided `schema` is incomplete then its dependencies must be
697/// provided in `schemata`
698pub fn to_avro_datum_schemata<T: Into<Value>>(
699    schema: &Schema,
700    schemata: Vec<&Schema>,
701    value: T,
702) -> AvroResult<Vec<u8>> {
703    let mut buffer = Vec::new();
704    write_avro_datum_schemata(schema, schemata, value, &mut buffer)?;
705    Ok(buffer)
706}
707
708#[cfg(not(target_arch = "wasm32"))]
709fn generate_sync_marker() -> [u8; 16] {
710    let mut marker = [0_u8; 16];
711    std::iter::repeat_with(rand::random)
712        .take(16)
713        .enumerate()
714        .for_each(|(i, n)| marker[i] = n);
715    marker
716}
717
718#[cfg(target_arch = "wasm32")]
719fn generate_sync_marker() -> [u8; 16] {
720    let mut marker = [0_u8; 16];
721    std::iter::repeat_with(quad_rand::rand)
722        .take(4)
723        .flat_map(|i| i.to_be_bytes())
724        .enumerate()
725        .for_each(|(i, n)| marker[i] = n);
726    marker
727}
728
729#[cfg(test)]
730mod tests {
731    use std::{cell::RefCell, rc::Rc};
732
733    use super::*;
734    use crate::{
735        decimal::Decimal,
736        duration::{Days, Duration, Millis, Months},
737        headers::GlueSchemaUuidHeader,
738        rabin::Rabin,
739        schema::{DecimalSchema, FixedSchema, Name},
740        types::Record,
741        util::zig_i64,
742        Reader,
743    };
744    use pretty_assertions::assert_eq;
745    use serde::{Deserialize, Serialize};
746    use uuid::Uuid;
747
748    use crate::codec::DeflateSettings;
749    use apache_avro_test_helper::TestResult;
750
751    const AVRO_OBJECT_HEADER_LEN: usize = AVRO_OBJECT_HEADER.len();
752
753    const SCHEMA: &str = r#"
754    {
755      "type": "record",
756      "name": "test",
757      "fields": [
758        {
759          "name": "a",
760          "type": "long",
761          "default": 42
762        },
763        {
764          "name": "b",
765          "type": "string"
766        }
767      ]
768    }
769    "#;
770
771    const UNION_SCHEMA: &str = r#"["null", "long"]"#;
772
773    #[test]
774    fn test_to_avro_datum() -> TestResult {
775        let schema = Schema::parse_str(SCHEMA)?;
776        let mut record = Record::new(&schema).unwrap();
777        record.put("a", 27i64);
778        record.put("b", "foo");
779
780        let mut expected = Vec::new();
781        zig_i64(27, &mut expected)?;
782        zig_i64(3, &mut expected)?;
783        expected.extend([b'f', b'o', b'o']);
784
785        assert_eq!(to_avro_datum(&schema, record)?, expected);
786
787        Ok(())
788    }
789
790    #[test]
791    fn avro_rs_193_write_avro_datum_ref() -> TestResult {
792        #[derive(Serialize)]
793        struct TestStruct {
794            a: i64,
795            b: String,
796        }
797
798        let schema = Schema::parse_str(SCHEMA)?;
799        let mut writer: Vec<u8> = Vec::new();
800        let data = TestStruct {
801            a: 27,
802            b: "foo".to_string(),
803        };
804
805        let mut expected = Vec::new();
806        zig_i64(27, &mut expected)?;
807        zig_i64(3, &mut expected)?;
808        expected.extend([b'f', b'o', b'o']);
809
810        let bytes = write_avro_datum_ref(&schema, &data, &mut writer)?;
811
812        assert_eq!(bytes, expected.len());
813        assert_eq!(writer, expected);
814
815        Ok(())
816    }
817
818    #[test]
819    fn test_union_not_null() -> TestResult {
820        let schema = Schema::parse_str(UNION_SCHEMA)?;
821        let union = Value::Union(1, Box::new(Value::Long(3)));
822
823        let mut expected = Vec::new();
824        zig_i64(1, &mut expected)?;
825        zig_i64(3, &mut expected)?;
826
827        assert_eq!(to_avro_datum(&schema, union)?, expected);
828
829        Ok(())
830    }
831
832    #[test]
833    fn test_union_null() -> TestResult {
834        let schema = Schema::parse_str(UNION_SCHEMA)?;
835        let union = Value::Union(0, Box::new(Value::Null));
836
837        let mut expected = Vec::new();
838        zig_i64(0, &mut expected)?;
839
840        assert_eq!(to_avro_datum(&schema, union)?, expected);
841
842        Ok(())
843    }
844
845    fn logical_type_test<T: Into<Value> + Clone>(
846        schema_str: &'static str,
847
848        expected_schema: &Schema,
849        value: Value,
850
851        raw_schema: &Schema,
852        raw_value: T,
853    ) -> TestResult {
854        let schema = Schema::parse_str(schema_str)?;
855        assert_eq!(&schema, expected_schema);
856        // The serialized format should be the same as the schema.
857        let ser = to_avro_datum(&schema, value.clone())?;
858        let raw_ser = to_avro_datum(raw_schema, raw_value)?;
859        assert_eq!(ser, raw_ser);
860
861        // Should deserialize from the schema into the logical type.
862        let mut r = ser.as_slice();
863        let de = crate::from_avro_datum(&schema, &mut r, None)?;
864        assert_eq!(de, value);
865        Ok(())
866    }
867
868    #[test]
869    fn date() -> TestResult {
870        logical_type_test(
871            r#"{"type": "int", "logicalType": "date"}"#,
872            &Schema::Date,
873            Value::Date(1_i32),
874            &Schema::Int,
875            1_i32,
876        )
877    }
878
879    #[test]
880    fn time_millis() -> TestResult {
881        logical_type_test(
882            r#"{"type": "int", "logicalType": "time-millis"}"#,
883            &Schema::TimeMillis,
884            Value::TimeMillis(1_i32),
885            &Schema::Int,
886            1_i32,
887        )
888    }
889
890    #[test]
891    fn time_micros() -> TestResult {
892        logical_type_test(
893            r#"{"type": "long", "logicalType": "time-micros"}"#,
894            &Schema::TimeMicros,
895            Value::TimeMicros(1_i64),
896            &Schema::Long,
897            1_i64,
898        )
899    }
900
901    #[test]
902    fn timestamp_millis() -> TestResult {
903        logical_type_test(
904            r#"{"type": "long", "logicalType": "timestamp-millis"}"#,
905            &Schema::TimestampMillis,
906            Value::TimestampMillis(1_i64),
907            &Schema::Long,
908            1_i64,
909        )
910    }
911
912    #[test]
913    fn timestamp_micros() -> TestResult {
914        logical_type_test(
915            r#"{"type": "long", "logicalType": "timestamp-micros"}"#,
916            &Schema::TimestampMicros,
917            Value::TimestampMicros(1_i64),
918            &Schema::Long,
919            1_i64,
920        )
921    }
922
923    #[test]
924    fn decimal_fixed() -> TestResult {
925        let size = 30;
926        let inner = Schema::Fixed(FixedSchema {
927            name: Name::new("decimal")?,
928            aliases: None,
929            doc: None,
930            size,
931            default: None,
932            attributes: Default::default(),
933        });
934        let value = vec![0u8; size];
935        logical_type_test(
936            r#"{"type": {"type": "fixed", "size": 30, "name": "decimal"}, "logicalType": "decimal", "precision": 20, "scale": 5}"#,
937            &Schema::Decimal(DecimalSchema {
938                precision: 20,
939                scale: 5,
940                inner: Box::new(inner.clone()),
941            }),
942            Value::Decimal(Decimal::from(value.clone())),
943            &inner,
944            Value::Fixed(size, value),
945        )
946    }
947
948    #[test]
949    fn decimal_bytes() -> TestResult {
950        let inner = Schema::Bytes;
951        let value = vec![0u8; 10];
952        logical_type_test(
953            r#"{"type": "bytes", "logicalType": "decimal", "precision": 4, "scale": 3}"#,
954            &Schema::Decimal(DecimalSchema {
955                precision: 4,
956                scale: 3,
957                inner: Box::new(inner.clone()),
958            }),
959            Value::Decimal(Decimal::from(value.clone())),
960            &inner,
961            value,
962        )
963    }
964
965    #[test]
966    fn duration() -> TestResult {
967        let inner = Schema::Fixed(FixedSchema {
968            name: Name::new("duration")?,
969            aliases: None,
970            doc: None,
971            size: 12,
972            default: None,
973            attributes: Default::default(),
974        });
975        let value = Value::Duration(Duration::new(
976            Months::new(256),
977            Days::new(512),
978            Millis::new(1024),
979        ));
980        logical_type_test(
981            r#"{"type": {"type": "fixed", "name": "duration", "size": 12}, "logicalType": "duration"}"#,
982            &Schema::Duration,
983            value,
984            &inner,
985            Value::Fixed(12, vec![0, 1, 0, 0, 0, 2, 0, 0, 0, 4, 0, 0]),
986        )
987    }
988
989    #[test]
990    fn test_writer_append() -> TestResult {
991        let schema = Schema::parse_str(SCHEMA)?;
992        let mut writer = Writer::new(&schema, Vec::new());
993
994        let mut record = Record::new(&schema).unwrap();
995        record.put("a", 27i64);
996        record.put("b", "foo");
997
998        let n1 = writer.append(record.clone())?;
999        let n2 = writer.append(record.clone())?;
1000        let n3 = writer.flush()?;
1001        let result = writer.into_inner()?;
1002
1003        assert_eq!(n1 + n2 + n3, result.len());
1004
1005        let mut data = Vec::new();
1006        zig_i64(27, &mut data)?;
1007        zig_i64(3, &mut data)?;
1008        data.extend(b"foo");
1009        data.extend(data.clone());
1010
1011        // starts with magic
1012        assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1013        // ends with data and sync marker
1014        let last_data_byte = result.len() - 16;
1015        assert_eq!(
1016            &result[last_data_byte - data.len()..last_data_byte],
1017            data.as_slice()
1018        );
1019
1020        Ok(())
1021    }
1022
1023    #[test]
1024    fn test_writer_extend() -> TestResult {
1025        let schema = Schema::parse_str(SCHEMA)?;
1026        let mut writer = Writer::new(&schema, Vec::new());
1027
1028        let mut record = Record::new(&schema).unwrap();
1029        record.put("a", 27i64);
1030        record.put("b", "foo");
1031        let record_copy = record.clone();
1032        let records = vec![record, record_copy];
1033
1034        let n1 = writer.extend(records)?;
1035        let n2 = writer.flush()?;
1036        let result = writer.into_inner()?;
1037
1038        assert_eq!(n1 + n2, result.len());
1039
1040        let mut data = Vec::new();
1041        zig_i64(27, &mut data)?;
1042        zig_i64(3, &mut data)?;
1043        data.extend(b"foo");
1044        data.extend(data.clone());
1045
1046        // starts with magic
1047        assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1048        // ends with data and sync marker
1049        let last_data_byte = result.len() - 16;
1050        assert_eq!(
1051            &result[last_data_byte - data.len()..last_data_byte],
1052            data.as_slice()
1053        );
1054
1055        Ok(())
1056    }
1057
1058    #[derive(Debug, Clone, Deserialize, Serialize)]
1059    struct TestSerdeSerialize {
1060        a: i64,
1061        b: String,
1062    }
1063
1064    #[test]
1065    fn test_writer_append_ser() -> TestResult {
1066        let schema = Schema::parse_str(SCHEMA)?;
1067        let mut writer = Writer::new(&schema, Vec::new());
1068
1069        let record = TestSerdeSerialize {
1070            a: 27,
1071            b: "foo".to_owned(),
1072        };
1073
1074        let n1 = writer.append_ser(record)?;
1075        let n2 = writer.flush()?;
1076        let result = writer.into_inner()?;
1077
1078        assert_eq!(n1 + n2, result.len());
1079
1080        let mut data = Vec::new();
1081        zig_i64(27, &mut data)?;
1082        zig_i64(3, &mut data)?;
1083        data.extend(b"foo");
1084
1085        // starts with magic
1086        assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1087        // ends with data and sync marker
1088        let last_data_byte = result.len() - 16;
1089        assert_eq!(
1090            &result[last_data_byte - data.len()..last_data_byte],
1091            data.as_slice()
1092        );
1093
1094        Ok(())
1095    }
1096
1097    #[test]
1098    fn test_writer_extend_ser() -> TestResult {
1099        let schema = Schema::parse_str(SCHEMA)?;
1100        let mut writer = Writer::new(&schema, Vec::new());
1101
1102        let record = TestSerdeSerialize {
1103            a: 27,
1104            b: "foo".to_owned(),
1105        };
1106        let record_copy = record.clone();
1107        let records = vec![record, record_copy];
1108
1109        let n1 = writer.extend_ser(records)?;
1110        let n2 = writer.flush()?;
1111        let result = writer.into_inner()?;
1112
1113        assert_eq!(n1 + n2, result.len());
1114
1115        let mut data = Vec::new();
1116        zig_i64(27, &mut data)?;
1117        zig_i64(3, &mut data)?;
1118        data.extend(b"foo");
1119        data.extend(data.clone());
1120
1121        // starts with magic
1122        assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1123        // ends with data and sync marker
1124        let last_data_byte = result.len() - 16;
1125        assert_eq!(
1126            &result[last_data_byte - data.len()..last_data_byte],
1127            data.as_slice()
1128        );
1129
1130        Ok(())
1131    }
1132
1133    fn make_writer_with_codec(schema: &Schema) -> Writer<'_, Vec<u8>> {
1134        Writer::with_codec(
1135            schema,
1136            Vec::new(),
1137            Codec::Deflate(DeflateSettings::default()),
1138        )
1139    }
1140
1141    fn make_writer_with_builder(schema: &Schema) -> Writer<'_, Vec<u8>> {
1142        Writer::builder()
1143            .writer(Vec::new())
1144            .schema(schema)
1145            .codec(Codec::Deflate(DeflateSettings::default()))
1146            .block_size(100)
1147            .build()
1148    }
1149
1150    fn check_writer(mut writer: Writer<'_, Vec<u8>>, schema: &Schema) -> TestResult {
1151        let mut record = Record::new(schema).unwrap();
1152        record.put("a", 27i64);
1153        record.put("b", "foo");
1154
1155        let n1 = writer.append(record.clone())?;
1156        let n2 = writer.append(record.clone())?;
1157        let n3 = writer.flush()?;
1158        let result = writer.into_inner()?;
1159
1160        assert_eq!(n1 + n2 + n3, result.len());
1161
1162        let mut data = Vec::new();
1163        zig_i64(27, &mut data)?;
1164        zig_i64(3, &mut data)?;
1165        data.extend(b"foo");
1166        data.extend(data.clone());
1167        Codec::Deflate(DeflateSettings::default()).compress(&mut data)?;
1168
1169        // starts with magic
1170        assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1171        // ends with data and sync marker
1172        let last_data_byte = result.len() - 16;
1173        assert_eq!(
1174            &result[last_data_byte - data.len()..last_data_byte],
1175            data.as_slice()
1176        );
1177
1178        Ok(())
1179    }
1180
1181    #[test]
1182    fn test_writer_with_codec() -> TestResult {
1183        let schema = Schema::parse_str(SCHEMA)?;
1184        let writer = make_writer_with_codec(&schema);
1185        check_writer(writer, &schema)
1186    }
1187
1188    #[test]
1189    fn test_writer_with_builder() -> TestResult {
1190        let schema = Schema::parse_str(SCHEMA)?;
1191        let writer = make_writer_with_builder(&schema);
1192        check_writer(writer, &schema)
1193    }
1194
1195    #[test]
1196    fn test_logical_writer() -> TestResult {
1197        const LOGICAL_TYPE_SCHEMA: &str = r#"
1198        {
1199          "type": "record",
1200          "name": "logical_type_test",
1201          "fields": [
1202            {
1203              "name": "a",
1204              "type": [
1205                "null",
1206                {
1207                  "type": "long",
1208                  "logicalType": "timestamp-micros"
1209                }
1210              ]
1211            }
1212          ]
1213        }
1214        "#;
1215        let codec = Codec::Deflate(DeflateSettings::default());
1216        let schema = Schema::parse_str(LOGICAL_TYPE_SCHEMA)?;
1217        let mut writer = Writer::builder()
1218            .schema(&schema)
1219            .codec(codec)
1220            .writer(Vec::new())
1221            .build();
1222
1223        let mut record1 = Record::new(&schema).unwrap();
1224        record1.put(
1225            "a",
1226            Value::Union(1, Box::new(Value::TimestampMicros(1234_i64))),
1227        );
1228
1229        let mut record2 = Record::new(&schema).unwrap();
1230        record2.put("a", Value::Union(0, Box::new(Value::Null)));
1231
1232        let n1 = writer.append(record1)?;
1233        let n2 = writer.append(record2)?;
1234        let n3 = writer.flush()?;
1235        let result = writer.into_inner()?;
1236
1237        assert_eq!(n1 + n2 + n3, result.len());
1238
1239        let mut data = Vec::new();
1240        // byte indicating not null
1241        zig_i64(1, &mut data)?;
1242        zig_i64(1234, &mut data)?;
1243
1244        // byte indicating null
1245        zig_i64(0, &mut data)?;
1246        codec.compress(&mut data)?;
1247
1248        // starts with magic
1249        assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1250        // ends with data and sync marker
1251        let last_data_byte = result.len() - 16;
1252        assert_eq!(
1253            &result[last_data_byte - data.len()..last_data_byte],
1254            data.as_slice()
1255        );
1256
1257        Ok(())
1258    }
1259
1260    #[test]
1261    fn test_avro_3405_writer_add_metadata_success() -> TestResult {
1262        let schema = Schema::parse_str(SCHEMA)?;
1263        let mut writer = Writer::new(&schema, Vec::new());
1264
1265        writer.add_user_metadata("stringKey".to_string(), String::from("stringValue"))?;
1266        writer.add_user_metadata("strKey".to_string(), "strValue")?;
1267        writer.add_user_metadata("bytesKey".to_string(), b"bytesValue")?;
1268        writer.add_user_metadata("vecKey".to_string(), vec![1, 2, 3])?;
1269
1270        let mut record = Record::new(&schema).unwrap();
1271        record.put("a", 27i64);
1272        record.put("b", "foo");
1273
1274        writer.append(record.clone())?;
1275        writer.append(record.clone())?;
1276        writer.flush()?;
1277        let result = writer.into_inner()?;
1278
1279        assert_eq!(result.len(), 260);
1280
1281        Ok(())
1282    }
1283
1284    #[test]
1285    fn test_avro_3881_metadata_empty_body() -> TestResult {
1286        let schema = Schema::parse_str(SCHEMA)?;
1287        let mut writer = Writer::new(&schema, Vec::new());
1288        writer.add_user_metadata("a".to_string(), "b")?;
1289        let result = writer.into_inner()?;
1290
1291        let reader = Reader::with_schema(&schema, &result[..])?;
1292        let mut expected = HashMap::new();
1293        expected.insert("a".to_string(), vec![b'b']);
1294        assert_eq!(reader.user_metadata(), &expected);
1295        assert_eq!(reader.into_iter().count(), 0);
1296
1297        Ok(())
1298    }
1299
1300    #[test]
1301    fn test_avro_3405_writer_add_metadata_failure() -> TestResult {
1302        let schema = Schema::parse_str(SCHEMA)?;
1303        let mut writer = Writer::new(&schema, Vec::new());
1304
1305        let mut record = Record::new(&schema).unwrap();
1306        record.put("a", 27i64);
1307        record.put("b", "foo");
1308        writer.append(record.clone())?;
1309
1310        match writer.add_user_metadata("stringKey".to_string(), String::from("value2")) {
1311            Err(e @ Error::FileHeaderAlreadyWritten) => {
1312                assert_eq!(e.to_string(), "The file metadata is already flushed.")
1313            }
1314            Err(e) => panic!("Unexpected error occurred while writing user metadata: {e:?}"),
1315            Ok(_) => panic!("Expected an error that metadata cannot be added after adding data"),
1316        }
1317
1318        Ok(())
1319    }
1320
1321    #[test]
1322    fn test_avro_3405_writer_add_metadata_reserved_prefix_failure() -> TestResult {
1323        let schema = Schema::parse_str(SCHEMA)?;
1324        let mut writer = Writer::new(&schema, Vec::new());
1325
1326        let key = "avro.stringKey".to_string();
1327        match writer.add_user_metadata(key.clone(), "value") {
1328            Err(ref e @ Error::InvalidMetadataKey(_)) => {
1329                assert_eq!(e.to_string(), format!("Metadata keys starting with 'avro.' are reserved for internal usage: {key}."))
1330            }
1331            Err(e) => panic!(
1332                "Unexpected error occurred while writing user metadata with reserved prefix ('avro.'): {e:?}"
1333            ),
1334            Ok(_) => panic!("Expected an error that the metadata key cannot be prefixed with 'avro.'"),
1335        }
1336
1337        Ok(())
1338    }
1339
1340    #[test]
1341    fn test_avro_3405_writer_add_metadata_with_builder_api_success() -> TestResult {
1342        let schema = Schema::parse_str(SCHEMA)?;
1343
1344        let mut user_meta_data: HashMap<String, Value> = HashMap::new();
1345        user_meta_data.insert(
1346            "stringKey".to_string(),
1347            Value::String("stringValue".to_string()),
1348        );
1349        user_meta_data.insert("bytesKey".to_string(), Value::Bytes(b"bytesValue".to_vec()));
1350        user_meta_data.insert("vecKey".to_string(), Value::Bytes(vec![1, 2, 3]));
1351
1352        let writer: Writer<'_, Vec<u8>> = Writer::builder()
1353            .writer(Vec::new())
1354            .schema(&schema)
1355            .user_metadata(user_meta_data.clone())
1356            .build();
1357
1358        assert_eq!(writer.user_metadata, user_meta_data);
1359
1360        Ok(())
1361    }
1362
1363    #[derive(Serialize, Clone)]
1364    struct TestSingleObjectWriter {
1365        a: i64,
1366        b: f64,
1367        c: Vec<String>,
1368    }
1369
1370    impl AvroSchema for TestSingleObjectWriter {
1371        fn get_schema() -> Schema {
1372            let schema = r#"
1373            {
1374                "type":"record",
1375                "name":"TestSingleObjectWrtierSerialize",
1376                "fields":[
1377                    {
1378                        "name":"a",
1379                        "type":"long"
1380                    },
1381                    {
1382                        "name":"b",
1383                        "type":"double"
1384                    },
1385                    {
1386                        "name":"c",
1387                        "type":{
1388                            "type":"array",
1389                            "items":"string"
1390                        }
1391                    }
1392                ]
1393            }
1394            "#;
1395            Schema::parse_str(schema).unwrap()
1396        }
1397    }
1398
1399    impl From<TestSingleObjectWriter> for Value {
1400        fn from(obj: TestSingleObjectWriter) -> Value {
1401            Value::Record(vec![
1402                ("a".into(), obj.a.into()),
1403                ("b".into(), obj.b.into()),
1404                (
1405                    "c".into(),
1406                    Value::Array(obj.c.into_iter().map(|s| s.into()).collect()),
1407                ),
1408            ])
1409        }
1410    }
1411
1412    #[test]
1413    fn test_single_object_writer() -> TestResult {
1414        let mut buf: Vec<u8> = Vec::new();
1415        let obj = TestSingleObjectWriter {
1416            a: 300,
1417            b: 34.555,
1418            c: vec!["cat".into(), "dog".into()],
1419        };
1420        let mut writer = GenericSingleObjectWriter::new_with_capacity(
1421            &TestSingleObjectWriter::get_schema(),
1422            1024,
1423        )
1424        .expect("Should resolve schema");
1425        let value = obj.into();
1426        let written_bytes = writer
1427            .write_value_ref(&value, &mut buf)
1428            .expect("Error serializing properly");
1429
1430        assert!(buf.len() > 10, "no bytes written");
1431        assert_eq!(buf.len(), written_bytes);
1432        assert_eq!(buf[0], 0xC3);
1433        assert_eq!(buf[1], 0x01);
1434        assert_eq!(
1435            &buf[2..10],
1436            &TestSingleObjectWriter::get_schema()
1437                .fingerprint::<Rabin>()
1438                .bytes[..]
1439        );
1440        let mut msg_binary = Vec::new();
1441        encode(
1442            &value,
1443            &TestSingleObjectWriter::get_schema(),
1444            &mut msg_binary,
1445        )
1446        .expect("encode should have failed by here as a dependency of any writing");
1447        assert_eq!(&buf[10..], &msg_binary[..]);
1448
1449        Ok(())
1450    }
1451
1452    #[test]
1453    fn test_single_object_writer_with_header_builder() -> TestResult {
1454        let mut buf: Vec<u8> = Vec::new();
1455        let obj = TestSingleObjectWriter {
1456            a: 300,
1457            b: 34.555,
1458            c: vec!["cat".into(), "dog".into()],
1459        };
1460        let schema_uuid = Uuid::parse_str("b2f1cf00-0434-013e-439a-125eb8485a5f")?;
1461        let header_builder = GlueSchemaUuidHeader::from_uuid(schema_uuid);
1462        let mut writer = GenericSingleObjectWriter::new_with_capacity_and_header_builder(
1463            &TestSingleObjectWriter::get_schema(),
1464            1024,
1465            header_builder,
1466        )
1467        .expect("Should resolve schema");
1468        let value = obj.into();
1469        writer
1470            .write_value_ref(&value, &mut buf)
1471            .expect("Error serializing properly");
1472
1473        assert_eq!(buf[0], 0x03);
1474        assert_eq!(buf[1], 0x00);
1475        assert_eq!(buf[2..18], schema_uuid.into_bytes()[..]);
1476        Ok(())
1477    }
1478
1479    #[test]
1480    fn test_writer_parity() -> TestResult {
1481        let obj1 = TestSingleObjectWriter {
1482            a: 300,
1483            b: 34.555,
1484            c: vec!["cat".into(), "dog".into()],
1485        };
1486
1487        let mut buf1: Vec<u8> = Vec::new();
1488        let mut buf2: Vec<u8> = Vec::new();
1489        let mut buf3: Vec<u8> = Vec::new();
1490
1491        let mut generic_writer = GenericSingleObjectWriter::new_with_capacity(
1492            &TestSingleObjectWriter::get_schema(),
1493            1024,
1494        )
1495        .expect("Should resolve schema");
1496        let mut specific_writer =
1497            SpecificSingleObjectWriter::<TestSingleObjectWriter>::with_capacity(1024)
1498                .expect("Resolved should pass");
1499        specific_writer
1500            .write(obj1.clone(), &mut buf1)
1501            .expect("Serialization expected");
1502        specific_writer
1503            .write_value(obj1.clone(), &mut buf2)
1504            .expect("Serialization expected");
1505        generic_writer
1506            .write_value(obj1.into(), &mut buf3)
1507            .expect("Serialization expected");
1508        assert_eq!(buf1, buf2);
1509        assert_eq!(buf1, buf3);
1510
1511        Ok(())
1512    }
1513
1514    #[test]
1515    fn avro_3894_take_aliases_into_account_when_serializing() -> TestResult {
1516        const SCHEMA: &str = r#"
1517  {
1518      "type": "record",
1519      "name": "Conference",
1520      "fields": [
1521          {"type": "string", "name": "name"},
1522          {"type": ["null", "long"], "name": "date", "aliases" : [ "time2", "time" ]}
1523      ]
1524  }"#;
1525
1526        #[derive(Debug, PartialEq, Eq, Clone, Serialize)]
1527        pub struct Conference {
1528            pub name: String,
1529            pub time: Option<i64>,
1530        }
1531
1532        let conf = Conference {
1533            name: "RustConf".to_string(),
1534            time: Some(1234567890),
1535        };
1536
1537        let schema = Schema::parse_str(SCHEMA)?;
1538        let mut writer = Writer::new(&schema, Vec::new());
1539
1540        let bytes = writer.append_ser(conf)?;
1541
1542        assert_eq!(198, bytes);
1543
1544        Ok(())
1545    }
1546
1547    #[test]
1548    fn avro_4014_validation_returns_a_detailed_error() -> TestResult {
1549        const SCHEMA: &str = r#"
1550  {
1551      "type": "record",
1552      "name": "Conference",
1553      "fields": [
1554          {"type": "string", "name": "name"},
1555          {"type": ["null", "long"], "name": "date", "aliases" : [ "time2", "time" ]}
1556      ]
1557  }"#;
1558
1559        #[derive(Debug, PartialEq, Clone, Serialize)]
1560        pub struct Conference {
1561            pub name: String,
1562            pub time: Option<f64>, // wrong type: f64 instead of i64
1563        }
1564
1565        let conf = Conference {
1566            name: "RustConf".to_string(),
1567            time: Some(12345678.90),
1568        };
1569
1570        let schema = Schema::parse_str(SCHEMA)?;
1571        let mut writer = Writer::new(&schema, Vec::new());
1572
1573        match writer.append_ser(conf) {
1574            Ok(bytes) => panic!("Expected an error, but got {bytes} bytes written"),
1575            Err(e) => {
1576                assert_eq!(
1577                    e.to_string(),
1578                    r#"Failed to serialize field 'time' for record Record(RecordSchema { name: Name { name: "Conference", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "name", doc: None, aliases: None, default: None, schema: String, order: Ascending, position: 0, custom_attributes: {} }, RecordField { name: "date", doc: None, aliases: Some(["time2", "time"]), default: None, schema: Union(UnionSchema { schemas: [Null, Long], variant_index: {Null: 0, Long: 1} }), order: Ascending, position: 1, custom_attributes: {} }], lookup: {"date": 1, "name": 0, "time": 1, "time2": 1}, attributes: {} }): Failed to serialize value of type f64 using schema Long: 12345678.9. Cause: Expected: Long. Got: Double"#
1579                );
1580            }
1581        }
1582        Ok(())
1583    }
1584
1585    #[test]
1586    fn avro_4063_flush_applies_to_inner_writer() -> TestResult {
1587        const SCHEMA: &str = r#"
1588        {
1589            "type": "record",
1590            "name": "ExampleSchema",
1591            "fields": [
1592                {"name": "exampleField", "type": "string"}
1593            ]
1594        }
1595        "#;
1596
1597        #[derive(Clone, Default)]
1598        struct TestBuffer(Rc<RefCell<Vec<u8>>>);
1599
1600        impl TestBuffer {
1601            fn len(&self) -> usize {
1602                self.0.borrow().len()
1603            }
1604        }
1605
1606        impl Write for TestBuffer {
1607            fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
1608                self.0.borrow_mut().write(buf)
1609            }
1610
1611            fn flush(&mut self) -> std::io::Result<()> {
1612                Ok(())
1613            }
1614        }
1615
1616        let shared_buffer = TestBuffer::default();
1617
1618        let buffered_writer = std::io::BufWriter::new(shared_buffer.clone());
1619
1620        let schema = Schema::parse_str(SCHEMA)?;
1621
1622        let mut writer = Writer::new(&schema, buffered_writer);
1623
1624        let mut record = Record::new(writer.schema()).unwrap();
1625        record.put("exampleField", "value");
1626
1627        writer.append(record)?;
1628        writer.flush()?;
1629
1630        assert_eq!(
1631            shared_buffer.len(),
1632            167,
1633            "the test buffer was not fully written to after Writer::flush was called"
1634        );
1635
1636        Ok(())
1637    }
1638}