apache_avro/
writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Logic handling writing in Avro format at user level.
19use crate::{
20    AvroResult, Codec, Error,
21    encode::{encode, encode_internal, encode_to_vec},
22    error::Details,
23    headers::{HeaderBuilder, RabinFingerprintHeader},
24    schema::{NamesRef, ResolvedOwnedSchema, ResolvedSchema, Schema},
25    serde::{AvroSchema, ser_schema::SchemaAwareWriteSerializer},
26    types::Value,
27};
28use serde::Serialize;
29use std::{
30    collections::HashMap, io::Write, marker::PhantomData, mem::ManuallyDrop, ops::RangeInclusive,
31};
32
33const DEFAULT_BLOCK_SIZE: usize = 16000;
34const AVRO_OBJECT_HEADER: &[u8] = b"Obj\x01";
35
36/// Main interface for writing Avro formatted values.
37///
38/// It is critical to call flush before `Writer<W>` is dropped. Though dropping will attempt to flush
39/// the contents of the buffer, any errors that happen in the process of dropping will be ignored.
40/// Calling flush ensures that the buffer is empty and thus dropping will not even attempt file operations.
41pub struct Writer<'a, W: Write> {
42    schema: &'a Schema,
43    writer: W,
44    resolved_schema: ResolvedSchema<'a>,
45    codec: Codec,
46    block_size: usize,
47    buffer: Vec<u8>,
48    num_values: usize,
49    marker: [u8; 16],
50    has_header: bool,
51    user_metadata: HashMap<String, Value>,
52}
53
54#[bon::bon]
55impl<'a, W: Write> Writer<'a, W> {
56    #[builder]
57    pub fn builder(
58        schema: &'a Schema,
59        schemata: Option<Vec<&'a Schema>>,
60        writer: W,
61        #[builder(default = Codec::Null)] codec: Codec,
62        #[builder(default = DEFAULT_BLOCK_SIZE)] block_size: usize,
63        #[builder(default = generate_sync_marker())] marker: [u8; 16],
64        /// Has the header already been written.
65        ///
66        /// To disable writing the header, this can be set to `true`.
67        #[builder(default = false)]
68        has_header: bool,
69        #[builder(default)] user_metadata: HashMap<String, Value>,
70    ) -> AvroResult<Self> {
71        let resolved_schema = if let Some(schemata) = schemata {
72            ResolvedSchema::try_from(schemata)?
73        } else {
74            ResolvedSchema::try_from(schema)?
75        };
76        Ok(Self {
77            schema,
78            writer,
79            resolved_schema,
80            codec,
81            block_size,
82            buffer: Vec::with_capacity(block_size),
83            num_values: 0,
84            marker,
85            has_header,
86            user_metadata,
87        })
88    }
89}
90
91impl<'a, W: Write> Writer<'a, W> {
92    /// Creates a `Writer` given a `Schema` and something implementing the `io::Write` trait to write
93    /// to.
94    /// No compression `Codec` will be used.
95    pub fn new(schema: &'a Schema, writer: W) -> AvroResult<Self> {
96        Writer::with_codec(schema, writer, Codec::Null)
97    }
98
99    /// Creates a `Writer` with a specific `Codec` given a `Schema` and something implementing the
100    /// `io::Write` trait to write to.
101    pub fn with_codec(schema: &'a Schema, writer: W, codec: Codec) -> AvroResult<Self> {
102        Self::builder()
103            .schema(schema)
104            .writer(writer)
105            .codec(codec)
106            .build()
107    }
108
109    /// Creates a `Writer` with a specific `Codec` given a `Schema` and something implementing the
110    /// `io::Write` trait to write to.
111    /// If the `schema` is incomplete, i.e. contains `Schema::Ref`s then all dependencies must
112    /// be provided in `schemata`.
113    pub fn with_schemata(
114        schema: &'a Schema,
115        schemata: Vec<&'a Schema>,
116        writer: W,
117        codec: Codec,
118    ) -> AvroResult<Self> {
119        Self::builder()
120            .schema(schema)
121            .schemata(schemata)
122            .writer(writer)
123            .codec(codec)
124            .build()
125    }
126
127    /// Creates a `Writer` that will append values to already populated
128    /// `std::io::Write` using the provided `marker`
129    /// No compression `Codec` will be used.
130    pub fn append_to(schema: &'a Schema, writer: W, marker: [u8; 16]) -> AvroResult<Self> {
131        Writer::append_to_with_codec(schema, writer, Codec::Null, marker)
132    }
133
134    /// Creates a `Writer` that will append values to already populated
135    /// `std::io::Write` using the provided `marker`
136    pub fn append_to_with_codec(
137        schema: &'a Schema,
138        writer: W,
139        codec: Codec,
140        marker: [u8; 16],
141    ) -> AvroResult<Self> {
142        Self::builder()
143            .schema(schema)
144            .writer(writer)
145            .codec(codec)
146            .marker(marker)
147            .has_header(true)
148            .build()
149    }
150
151    /// Creates a `Writer` that will append values to already populated
152    /// `std::io::Write` using the provided `marker`
153    pub fn append_to_with_codec_schemata(
154        schema: &'a Schema,
155        schemata: Vec<&'a Schema>,
156        writer: W,
157        codec: Codec,
158        marker: [u8; 16],
159    ) -> AvroResult<Self> {
160        Self::builder()
161            .schema(schema)
162            .schemata(schemata)
163            .writer(writer)
164            .codec(codec)
165            .marker(marker)
166            .has_header(true)
167            .build()
168    }
169
170    /// Get a reference to the `Schema` associated to a `Writer`.
171    pub fn schema(&self) -> &'a Schema {
172        self.schema
173    }
174
175    /// Deprecated. Use [`Writer::append_value`] instead.
176    #[deprecated(since = "0.22.0", note = "Use `Writer::append_value` instead")]
177    pub fn append<T: Into<Value>>(&mut self, value: T) -> AvroResult<usize> {
178        self.append_value(value)
179    }
180
181    /// Append a value to the `Writer`, also performs schema validation.
182    ///
183    /// Returns the number of bytes written (it might be 0, see below).
184    ///
185    /// **NOTE**: This function is not guaranteed to perform any actual write, since it relies on
186    /// internal buffering for performance reasons. If you want to be sure the value has been
187    /// written, then call [`flush`](Writer::flush).
188    pub fn append_value<T: Into<Value>>(&mut self, value: T) -> AvroResult<usize> {
189        let avro = value.into();
190        self.append_value_ref(&avro)
191    }
192
193    /// Append a compatible value to a `Writer`, also performs schema validation.
194    ///
195    /// Returns the number of bytes written (it might be 0, see below).
196    ///
197    /// **NOTE**: This function is not guaranteed to perform any actual write, since it relies on
198    /// internal buffering for performance reasons. If you want to be sure the value has been
199    /// written, then call [`flush`](Writer::flush).
200    pub fn append_value_ref(&mut self, value: &Value) -> AvroResult<usize> {
201        if let Some(reason) = value.validate_internal(
202            self.schema,
203            self.resolved_schema.get_names(),
204            &self.schema.namespace(),
205        ) {
206            return Err(Details::ValidationWithReason {
207                value: value.clone(),
208                schema: self.schema.clone(),
209                reason,
210            }
211            .into());
212        }
213        self.unvalidated_append_value_ref(value)
214    }
215
216    /// Append a compatible value to a `Writer`.
217    ///
218    /// This function does **not** validate that the provided value matches the schema. If it does
219    /// not match, the file will contain corrupt data. Use [`Writer::append_value`] to have the
220    /// value validated during write or use [`Value::validate`] to validate the value.
221    ///
222    /// Returns the number of bytes written (it might be 0, see below).
223    ///
224    /// **NOTE**: This function is not guaranteed to perform any actual write, since it relies on
225    /// internal buffering for performance reasons. If you want to be sure the value has been
226    /// written, then call [`flush`](Writer::flush).
227    pub fn unvalidated_append_value<T: Into<Value>>(&mut self, value: T) -> AvroResult<usize> {
228        let value = value.into();
229        self.unvalidated_append_value_ref(&value)
230    }
231
232    /// Append a compatible value to a `Writer`.
233    ///
234    /// This function does **not** validate that the provided value matches the schema. If it does
235    /// not match, the file will contain corrupt data. Use [`Writer::append_value_ref`] to have the
236    /// value validated during write or use [`Value::validate`] to validate the value.
237    ///
238    /// Returns the number of bytes written (it might be 0, see below).
239    ///
240    /// **NOTE**: This function is not guaranteed to perform any actual write, since it relies on
241    /// internal buffering for performance reasons. If you want to be sure the value has been
242    /// written, then call [`flush`](Writer::flush).
243    pub fn unvalidated_append_value_ref(&mut self, value: &Value) -> AvroResult<usize> {
244        let n = self.maybe_write_header()?;
245        encode_internal(
246            value,
247            self.schema,
248            self.resolved_schema.get_names(),
249            &self.schema.namespace(),
250            &mut self.buffer,
251        )?;
252
253        self.num_values += 1;
254
255        if self.buffer.len() >= self.block_size {
256            return self.flush().map(|b| b + n);
257        }
258
259        Ok(n)
260    }
261
262    /// Append anything implementing the `Serialize` trait to a `Writer` for
263    /// [`serde`](https://docs.serde.rs/serde/index.html) compatibility, also performing schema
264    /// validation.
265    ///
266    /// Returns the number of bytes written.
267    ///
268    /// **NOTE**: This function is not guaranteed to perform any actual write, since it relies on
269    /// internal buffering for performance reasons. If you want to be sure the value has been
270    /// written, then call [`flush`](Writer::flush).
271    pub fn append_ser<S: Serialize>(&mut self, value: S) -> AvroResult<usize> {
272        let n = self.maybe_write_header()?;
273
274        let mut serializer = SchemaAwareWriteSerializer::new(
275            &mut self.buffer,
276            self.schema,
277            self.resolved_schema.get_names(),
278            None,
279        );
280        value.serialize(&mut serializer)?;
281        self.num_values += 1;
282
283        if self.buffer.len() >= self.block_size {
284            return self.flush().map(|b| b + n);
285        }
286
287        Ok(n)
288    }
289
290    /// Extend a `Writer` with an `Iterator` of values, also performs schema validation.
291    ///
292    /// Returns the number of bytes written.
293    ///
294    /// **NOTE**: This function forces the written data to be flushed (an implicit
295    /// call to [`flush`](Writer::flush) is performed).
296    pub fn extend<I, T: Into<Value>>(&mut self, values: I) -> AvroResult<usize>
297    where
298        I: IntoIterator<Item = T>,
299    {
300        /*
301        https://github.com/rust-lang/rfcs/issues/811 :(
302        let mut stream = values
303            .filter_map(|value| value.serialize(&mut self.serializer).ok())
304            .map(|value| value.encode(self.schema))
305            .collect::<Option<Vec<_>>>()
306            .ok_or_else(|| err_msg("value does not match given schema"))?
307            .into_iter()
308            .fold(Vec::new(), |mut acc, stream| {
309                num_values += 1;
310                acc.extend(stream); acc
311            });
312        */
313
314        let mut num_bytes = 0;
315        for value in values {
316            num_bytes += self.append_value(value)?;
317        }
318        num_bytes += self.flush()?;
319
320        Ok(num_bytes)
321    }
322
323    /// Extend a `Writer` with an `Iterator` of anything implementing the `Serialize` trait for
324    /// [`serde`](https://docs.serde.rs/serde/index.html) compatibility, also performing schema
325    /// validation.
326    ///
327    /// Returns the number of bytes written.
328    ///
329    /// **NOTE**: This function forces the written data to be flushed (an implicit
330    /// call to [`flush`](Writer::flush) is performed).
331    pub fn extend_ser<I, T: Serialize>(&mut self, values: I) -> AvroResult<usize>
332    where
333        I: IntoIterator<Item = T>,
334    {
335        /*
336        https://github.com/rust-lang/rfcs/issues/811 :(
337        let mut stream = values
338            .filter_map(|value| value.serialize(&mut self.serializer).ok())
339            .map(|value| value.encode(self.schema))
340            .collect::<Option<Vec<_>>>()
341            .ok_or_else(|| err_msg("value does not match given schema"))?
342            .into_iter()
343            .fold(Vec::new(), |mut acc, stream| {
344                num_values += 1;
345                acc.extend(stream); acc
346            });
347        */
348
349        let mut num_bytes = 0;
350        for value in values {
351            num_bytes += self.append_ser(value)?;
352        }
353        num_bytes += self.flush()?;
354
355        Ok(num_bytes)
356    }
357
358    /// Extend a `Writer` by appending each `Value` from a slice, while also performing schema
359    /// validation on each value appended.
360    ///
361    /// Returns the number of bytes written.
362    ///
363    /// **NOTE**: This function forces the written data to be flushed (an implicit
364    /// call to [`flush`](Writer::flush) is performed).
365    pub fn extend_from_slice(&mut self, values: &[Value]) -> AvroResult<usize> {
366        let mut num_bytes = 0;
367        for value in values {
368            num_bytes += self.append_value_ref(value)?;
369        }
370        num_bytes += self.flush()?;
371
372        Ok(num_bytes)
373    }
374
375    /// Flush the content to the inner `Writer`.
376    ///
377    /// Call this function to make sure all the content has been written before releasing the `Writer`.
378    /// This will also write the header if it wasn't written yet and hasn't been disabled using
379    /// [`WriterBuilder::has_header`].
380    ///
381    /// Returns the number of bytes written.
382    pub fn flush(&mut self) -> AvroResult<usize> {
383        let mut num_bytes = self.maybe_write_header()?;
384        if self.num_values == 0 {
385            return Ok(num_bytes);
386        }
387
388        self.codec.compress(&mut self.buffer)?;
389
390        let num_values = self.num_values;
391        let stream_len = self.buffer.len();
392
393        num_bytes += self.append_raw(&num_values.into(), &Schema::Long)?
394            + self.append_raw(&stream_len.into(), &Schema::Long)?
395            + self
396                .writer
397                .write(self.buffer.as_ref())
398                .map_err(Details::WriteBytes)?
399            + self.append_marker()?;
400
401        self.buffer.clear();
402        self.num_values = 0;
403
404        self.writer.flush().map_err(Details::FlushWriter)?;
405
406        Ok(num_bytes)
407    }
408
409    /// Return what the `Writer` is writing to, consuming the `Writer` itself.
410    ///
411    /// **NOTE**: This function forces the written data to be flushed (an implicit
412    /// call to [`flush`](Writer::flush) is performed).
413    pub fn into_inner(mut self) -> AvroResult<W> {
414        self.maybe_write_header()?;
415        self.flush()?;
416
417        let mut this = ManuallyDrop::new(self);
418
419        // Extract every member that is not Copy and therefore should be dropped
420        let _buffer = std::mem::take(&mut this.buffer);
421        let _user_metadata = std::mem::take(&mut this.user_metadata);
422        // SAFETY: resolved schema is not accessed after this and won't be dropped because of ManuallyDrop
423        unsafe { std::ptr::drop_in_place(&mut this.resolved_schema) };
424
425        // SAFETY: double-drops are prevented by putting `this` in a ManuallyDrop that is never dropped
426        let writer = unsafe { std::ptr::read(&this.writer) };
427
428        Ok(writer)
429    }
430
431    /// Gets a reference to the underlying writer.
432    ///
433    /// **NOTE**: There is likely data still in the buffer. To have all the data
434    /// in the writer call [`flush`](Writer::flush) first.
435    pub fn get_ref(&self) -> &W {
436        &self.writer
437    }
438
439    /// Gets a mutable reference to the underlying writer.
440    ///
441    /// It is inadvisable to directly write to the underlying writer.
442    ///
443    /// **NOTE**: There is likely data still in the buffer. To have all the data
444    /// in the writer call [`flush`](Writer::flush) first.
445    pub fn get_mut(&mut self) -> &mut W {
446        &mut self.writer
447    }
448
449    /// Generate and append synchronization marker to the payload.
450    fn append_marker(&mut self) -> AvroResult<usize> {
451        // using .writer.write directly to avoid mutable borrow of self
452        // with ref borrowing of self.marker
453        self.writer
454            .write(&self.marker)
455            .map_err(|e| Details::WriteMarker(e).into())
456    }
457
458    /// Append a raw Avro Value to the payload avoiding to encode it again.
459    fn append_raw(&mut self, value: &Value, schema: &Schema) -> AvroResult<usize> {
460        self.append_bytes(encode_to_vec(value, schema)?.as_ref())
461    }
462
463    /// Append pure bytes to the payload.
464    fn append_bytes(&mut self, bytes: &[u8]) -> AvroResult<usize> {
465        self.writer
466            .write(bytes)
467            .map_err(|e| Details::WriteBytes(e).into())
468    }
469
470    /// Adds custom metadata to the file.
471    /// This method could be used only before adding the first record to the writer.
472    pub fn add_user_metadata<T: AsRef<[u8]>>(&mut self, key: String, value: T) -> AvroResult<()> {
473        if !self.has_header {
474            if key.starts_with("avro.") {
475                return Err(Details::InvalidMetadataKey(key).into());
476            }
477            self.user_metadata
478                .insert(key, Value::Bytes(value.as_ref().to_vec()));
479            Ok(())
480        } else {
481            Err(Details::FileHeaderAlreadyWritten.into())
482        }
483    }
484
485    /// Create an Avro header based on schema, codec and sync marker.
486    fn header(&self) -> Result<Vec<u8>, Error> {
487        let schema_bytes = serde_json::to_string(self.schema)
488            .map_err(Details::ConvertJsonToString)?
489            .into_bytes();
490
491        let mut metadata = HashMap::with_capacity(2);
492        metadata.insert("avro.schema", Value::Bytes(schema_bytes));
493        if self.codec != Codec::Null {
494            metadata.insert("avro.codec", self.codec.into());
495        }
496        match self.codec {
497            #[cfg(feature = "bzip")]
498            Codec::Bzip2(settings) => {
499                metadata.insert(
500                    "avro.codec.compression_level",
501                    Value::Bytes(vec![settings.compression_level]),
502                );
503            }
504            #[cfg(feature = "xz")]
505            Codec::Xz(settings) => {
506                metadata.insert(
507                    "avro.codec.compression_level",
508                    Value::Bytes(vec![settings.compression_level]),
509                );
510            }
511            #[cfg(feature = "zstandard")]
512            Codec::Zstandard(settings) => {
513                metadata.insert(
514                    "avro.codec.compression_level",
515                    Value::Bytes(vec![settings.compression_level]),
516                );
517            }
518            _ => {}
519        }
520
521        for (k, v) in &self.user_metadata {
522            metadata.insert(k.as_str(), v.clone());
523        }
524
525        let mut header = Vec::new();
526        header.extend_from_slice(AVRO_OBJECT_HEADER);
527        encode(&metadata.into(), &Schema::map(Schema::Bytes), &mut header)?;
528        header.extend_from_slice(&self.marker);
529
530        Ok(header)
531    }
532
533    fn maybe_write_header(&mut self) -> AvroResult<usize> {
534        if !self.has_header {
535            let header = self.header()?;
536            let n = self.append_bytes(header.as_ref())?;
537            self.has_header = true;
538            Ok(n)
539        } else {
540            Ok(0)
541        }
542    }
543}
544
545impl<W: Write> Drop for Writer<'_, W> {
546    /// Drop the writer, will try to flush ignoring any errors.
547    fn drop(&mut self) {
548        let _ = self.maybe_write_header();
549        let _ = self.flush();
550    }
551}
552
553/// Encode a value into raw Avro data, also performs schema validation.
554///
555/// This is an internal function which gets the bytes buffer where to write as parameter instead of
556/// creating a new one like `to_avro_datum`.
557fn write_avro_datum<T: Into<Value>, W: Write>(
558    schema: &Schema,
559    value: T,
560    writer: &mut W,
561) -> Result<(), Error> {
562    let avro = value.into();
563    if !avro.validate(schema) {
564        return Err(Details::Validation.into());
565    }
566    encode(&avro, schema, writer)?;
567    Ok(())
568}
569
570fn write_avro_datum_schemata<T: Into<Value>>(
571    schema: &Schema,
572    schemata: Vec<&Schema>,
573    value: T,
574    buffer: &mut Vec<u8>,
575) -> AvroResult<usize> {
576    let avro = value.into();
577    let rs = ResolvedSchema::try_from(schemata)?;
578    let names = rs.get_names();
579    let enclosing_namespace = schema.namespace();
580    if let Some(_err) = avro.validate_internal(schema, names, &enclosing_namespace) {
581        return Err(Details::Validation.into());
582    }
583    encode_internal(&avro, schema, names, &enclosing_namespace, buffer)
584}
585
586/// Writer that encodes messages according to the single object encoding v1 spec
587/// Uses an API similar to the current File Writer
588/// Writes all object bytes at once, and drains internal buffer
589pub struct GenericSingleObjectWriter {
590    buffer: Vec<u8>,
591    resolved: ResolvedOwnedSchema,
592}
593
594impl GenericSingleObjectWriter {
595    pub fn new_with_capacity(
596        schema: &Schema,
597        initial_buffer_cap: usize,
598    ) -> AvroResult<GenericSingleObjectWriter> {
599        let header_builder = RabinFingerprintHeader::from_schema(schema);
600        Self::new_with_capacity_and_header_builder(schema, initial_buffer_cap, header_builder)
601    }
602
603    pub fn new_with_capacity_and_header_builder<HB: HeaderBuilder>(
604        schema: &Schema,
605        initial_buffer_cap: usize,
606        header_builder: HB,
607    ) -> AvroResult<GenericSingleObjectWriter> {
608        let mut buffer = Vec::with_capacity(initial_buffer_cap);
609        let header = header_builder.build_header();
610        buffer.extend_from_slice(&header);
611
612        Ok(GenericSingleObjectWriter {
613            buffer,
614            resolved: ResolvedOwnedSchema::try_from(schema.clone())?,
615        })
616    }
617
618    const HEADER_LENGTH_RANGE: RangeInclusive<usize> = 10_usize..=20_usize;
619
620    /// Write the referenced Value to the provided Write object. Returns a result with the number of bytes written including the header
621    pub fn write_value_ref<W: Write>(&mut self, v: &Value, writer: &mut W) -> AvroResult<usize> {
622        let original_length = self.buffer.len();
623        if !Self::HEADER_LENGTH_RANGE.contains(&original_length) {
624            Err(Details::IllegalSingleObjectWriterState.into())
625        } else {
626            write_value_ref_owned_resolved(&self.resolved, v, &mut self.buffer)?;
627            writer
628                .write_all(&self.buffer)
629                .map_err(Details::WriteBytes)?;
630            let len = self.buffer.len();
631            self.buffer.truncate(original_length);
632            Ok(len)
633        }
634    }
635
636    /// Write the Value to the provided Write object. Returns a result with the number of bytes written including the header
637    pub fn write_value<W: Write>(&mut self, v: Value, writer: &mut W) -> AvroResult<usize> {
638        self.write_value_ref(&v, writer)
639    }
640}
641
642/// Writer that encodes messages according to the single object encoding v1 spec
643pub struct SpecificSingleObjectWriter<T>
644where
645    T: AvroSchema,
646{
647    resolved: ResolvedOwnedSchema,
648    header: Vec<u8>,
649    _model: PhantomData<T>,
650}
651
652impl<T> SpecificSingleObjectWriter<T>
653where
654    T: AvroSchema,
655{
656    pub fn new() -> AvroResult<Self> {
657        let schema = T::get_schema();
658        let header = RabinFingerprintHeader::from_schema(&schema).build_header();
659        let resolved = ResolvedOwnedSchema::new(schema)?;
660        // We don't use Self::new_with_header_builder as that would mean calling T::get_schema() twice
661        Ok(Self {
662            resolved,
663            header,
664            _model: PhantomData,
665        })
666    }
667
668    pub fn new_with_header_builder(header_builder: impl HeaderBuilder) -> AvroResult<Self> {
669        let header = header_builder.build_header();
670        let resolved = ResolvedOwnedSchema::new(T::get_schema())?;
671        Ok(Self {
672            resolved,
673            header,
674            _model: PhantomData,
675        })
676    }
677
678    /// Deprecated. Use [`SpecificSingleObjectWriter::new`] instead.
679    #[deprecated(since = "0.22.0", note = "Use new() instead")]
680    pub fn with_capacity(_buffer_cap: usize) -> AvroResult<Self> {
681        Self::new()
682    }
683}
684
685impl<T> SpecificSingleObjectWriter<T>
686where
687    T: AvroSchema + Into<Value>,
688{
689    /// Write the value to the writer
690    ///
691    /// Returns the number of bytes written.
692    ///
693    /// Each call writes a complete single-object encoded message (header + data),
694    /// making each message independently decodable.
695    pub fn write_value<W: Write>(&self, data: T, writer: &mut W) -> AvroResult<usize> {
696        writer
697            .write_all(&self.header)
698            .map_err(Details::WriteBytes)?;
699        let value: Value = data.into();
700        let bytes = write_value_ref_owned_resolved(&self.resolved, &value, writer)?;
701        Ok(bytes + self.header.len())
702    }
703}
704
705impl<T> SpecificSingleObjectWriter<T>
706where
707    T: AvroSchema + Serialize,
708{
709    /// Write the object to the writer.
710    ///
711    /// Returns the number of bytes written.
712    ///
713    /// Each call writes a complete single-object encoded message (header + data),
714    /// making each message independently decodable.
715    pub fn write_ref<W: Write>(&self, data: &T, writer: &mut W) -> AvroResult<usize> {
716        writer
717            .write_all(&self.header)
718            .map_err(Details::WriteBytes)?;
719
720        let bytes = write_avro_datum_ref(
721            self.resolved.get_root_schema(),
722            self.resolved.get_names(),
723            data,
724            writer,
725        )?;
726
727        Ok(bytes + self.header.len())
728    }
729
730    /// Write the object to the writer.
731    ///
732    /// Returns the number of bytes written.
733    ///
734    /// Each call writes a complete single-object encoded message (header + data),
735    /// making each message independently decodable.
736    pub fn write<W: Write>(&self, data: T, writer: &mut W) -> AvroResult<usize> {
737        self.write_ref(&data, writer)
738    }
739}
740
741fn write_value_ref_owned_resolved<W: Write>(
742    resolved_schema: &ResolvedOwnedSchema,
743    value: &Value,
744    writer: &mut W,
745) -> AvroResult<usize> {
746    let root_schema = resolved_schema.get_root_schema();
747    if let Some(reason) = value.validate_internal(
748        root_schema,
749        resolved_schema.get_names(),
750        &root_schema.namespace(),
751    ) {
752        return Err(Details::ValidationWithReason {
753            value: value.clone(),
754            schema: root_schema.clone(),
755            reason,
756        }
757        .into());
758    }
759    encode_internal(
760        value,
761        root_schema,
762        resolved_schema.get_names(),
763        &root_schema.namespace(),
764        writer,
765    )
766}
767
768/// Encode a value into raw Avro data, also performs schema validation.
769///
770/// **NOTE**: This function has a quite small niche of usage and does NOT generate headers and sync
771/// markers; use [`Writer`] to be fully Avro-compatible if you don't know what
772/// you are doing, instead.
773pub fn to_avro_datum<T: Into<Value>>(schema: &Schema, value: T) -> AvroResult<Vec<u8>> {
774    let mut buffer = Vec::new();
775    write_avro_datum(schema, value, &mut buffer)?;
776    Ok(buffer)
777}
778
779/// Write the referenced [Serialize]able object to the provided [Write] object.
780///
781/// Returns a result with the number of bytes written.
782///
783/// **NOTE**: This function has a quite small niche of usage and does **NOT** generate headers and sync
784/// markers; use [`append_ser`](Writer::append_ser) to be fully Avro-compatible
785/// if you don't know what you are doing, instead.
786pub fn write_avro_datum_ref<T: Serialize, W: Write>(
787    schema: &Schema,
788    names: &NamesRef,
789    data: &T,
790    writer: &mut W,
791) -> AvroResult<usize> {
792    let mut serializer = SchemaAwareWriteSerializer::new(writer, schema, names, None);
793    data.serialize(&mut serializer)
794}
795
796/// Encode a value into raw Avro data, also performs schema validation.
797///
798/// If the provided `schema` is incomplete then its dependencies must be
799/// provided in `schemata`
800pub fn to_avro_datum_schemata<T: Into<Value>>(
801    schema: &Schema,
802    schemata: Vec<&Schema>,
803    value: T,
804) -> AvroResult<Vec<u8>> {
805    let mut buffer = Vec::new();
806    write_avro_datum_schemata(schema, schemata, value, &mut buffer)?;
807    Ok(buffer)
808}
809
810#[cfg(not(target_arch = "wasm32"))]
811fn generate_sync_marker() -> [u8; 16] {
812    let mut marker = [0_u8; 16];
813    std::iter::repeat_with(rand::random)
814        .take(16)
815        .enumerate()
816        .for_each(|(i, n)| marker[i] = n);
817    marker
818}
819
820#[cfg(target_arch = "wasm32")]
821fn generate_sync_marker() -> [u8; 16] {
822    let mut marker = [0_u8; 16];
823    std::iter::repeat_with(quad_rand::rand)
824        .take(4)
825        .flat_map(|i| i.to_be_bytes())
826        .enumerate()
827        .for_each(|(i, n)| marker[i] = n);
828    marker
829}
830
831#[cfg(test)]
832mod tests {
833    use std::{cell::RefCell, rc::Rc};
834
835    use super::*;
836    use crate::{
837        Reader,
838        decimal::Decimal,
839        duration::{Days, Duration, Millis, Months},
840        headers::GlueSchemaUuidHeader,
841        rabin::Rabin,
842        schema::{DecimalSchema, FixedSchema, Name},
843        types::Record,
844        util::zig_i64,
845    };
846    use pretty_assertions::assert_eq;
847    use serde::{Deserialize, Serialize};
848    use uuid::Uuid;
849
850    use crate::schema::InnerDecimalSchema;
851    use crate::{codec::DeflateSettings, error::Details};
852    use apache_avro_test_helper::TestResult;
853
854    const AVRO_OBJECT_HEADER_LEN: usize = AVRO_OBJECT_HEADER.len();
855
856    const SCHEMA: &str = r#"
857    {
858      "type": "record",
859      "name": "test",
860      "fields": [
861        {
862          "name": "a",
863          "type": "long",
864          "default": 42
865        },
866        {
867          "name": "b",
868          "type": "string"
869        }
870      ]
871    }
872    "#;
873
874    const UNION_SCHEMA: &str = r#"["null", "long"]"#;
875
876    #[test]
877    fn test_to_avro_datum() -> TestResult {
878        let schema = Schema::parse_str(SCHEMA)?;
879        let mut record = Record::new(&schema).unwrap();
880        record.put("a", 27i64);
881        record.put("b", "foo");
882
883        let mut expected = Vec::new();
884        zig_i64(27, &mut expected)?;
885        zig_i64(3, &mut expected)?;
886        expected.extend([b'f', b'o', b'o']);
887
888        assert_eq!(to_avro_datum(&schema, record)?, expected);
889
890        Ok(())
891    }
892
893    #[test]
894    fn avro_rs_193_write_avro_datum_ref() -> TestResult {
895        #[derive(Serialize)]
896        struct TestStruct {
897            a: i64,
898            b: String,
899        }
900
901        let schema = Schema::parse_str(SCHEMA)?;
902        let mut writer: Vec<u8> = Vec::new();
903        let data = TestStruct {
904            a: 27,
905            b: "foo".to_string(),
906        };
907
908        let mut expected = Vec::new();
909        zig_i64(27, &mut expected)?;
910        zig_i64(3, &mut expected)?;
911        expected.extend([b'f', b'o', b'o']);
912
913        let bytes = write_avro_datum_ref(&schema, &HashMap::new(), &data, &mut writer)?;
914
915        assert_eq!(bytes, expected.len());
916        assert_eq!(writer, expected);
917
918        Ok(())
919    }
920
921    #[test]
922    fn avro_rs_220_flush_write_header() -> TestResult {
923        let schema = Schema::parse_str(SCHEMA)?;
924
925        // By default flush should write the header even if nothing was added yet
926        let mut writer = Writer::new(&schema, Vec::new())?;
927        writer.flush()?;
928        let result = writer.into_inner()?;
929        assert_eq!(result.len(), 147);
930
931        // Unless the user indicates via the builder that the header has already been written
932        let mut writer = Writer::builder()
933            .has_header(true)
934            .schema(&schema)
935            .writer(Vec::new())
936            .build()?;
937        writer.flush()?;
938        let result = writer.into_inner()?;
939        assert_eq!(result.len(), 0);
940
941        Ok(())
942    }
943
944    #[test]
945    fn test_union_not_null() -> TestResult {
946        let schema = Schema::parse_str(UNION_SCHEMA)?;
947        let union = Value::Union(1, Box::new(Value::Long(3)));
948
949        let mut expected = Vec::new();
950        zig_i64(1, &mut expected)?;
951        zig_i64(3, &mut expected)?;
952
953        assert_eq!(to_avro_datum(&schema, union)?, expected);
954
955        Ok(())
956    }
957
958    #[test]
959    fn test_union_null() -> TestResult {
960        let schema = Schema::parse_str(UNION_SCHEMA)?;
961        let union = Value::Union(0, Box::new(Value::Null));
962
963        let mut expected = Vec::new();
964        zig_i64(0, &mut expected)?;
965
966        assert_eq!(to_avro_datum(&schema, union)?, expected);
967
968        Ok(())
969    }
970
971    fn logical_type_test<T: Into<Value> + Clone>(
972        schema_str: &'static str,
973
974        expected_schema: &Schema,
975        value: Value,
976
977        raw_schema: &Schema,
978        raw_value: T,
979    ) -> TestResult {
980        let schema = Schema::parse_str(schema_str)?;
981        assert_eq!(&schema, expected_schema);
982        // The serialized format should be the same as the schema.
983        let ser = to_avro_datum(&schema, value.clone())?;
984        let raw_ser = to_avro_datum(raw_schema, raw_value)?;
985        assert_eq!(ser, raw_ser);
986
987        // Should deserialize from the schema into the logical type.
988        let mut r = ser.as_slice();
989        let de = crate::from_avro_datum(&schema, &mut r, None)?;
990        assert_eq!(de, value);
991        Ok(())
992    }
993
994    #[test]
995    fn date() -> TestResult {
996        logical_type_test(
997            r#"{"type": "int", "logicalType": "date"}"#,
998            &Schema::Date,
999            Value::Date(1_i32),
1000            &Schema::Int,
1001            1_i32,
1002        )
1003    }
1004
1005    #[test]
1006    fn time_millis() -> TestResult {
1007        logical_type_test(
1008            r#"{"type": "int", "logicalType": "time-millis"}"#,
1009            &Schema::TimeMillis,
1010            Value::TimeMillis(1_i32),
1011            &Schema::Int,
1012            1_i32,
1013        )
1014    }
1015
1016    #[test]
1017    fn time_micros() -> TestResult {
1018        logical_type_test(
1019            r#"{"type": "long", "logicalType": "time-micros"}"#,
1020            &Schema::TimeMicros,
1021            Value::TimeMicros(1_i64),
1022            &Schema::Long,
1023            1_i64,
1024        )
1025    }
1026
1027    #[test]
1028    fn timestamp_millis() -> TestResult {
1029        logical_type_test(
1030            r#"{"type": "long", "logicalType": "timestamp-millis"}"#,
1031            &Schema::TimestampMillis,
1032            Value::TimestampMillis(1_i64),
1033            &Schema::Long,
1034            1_i64,
1035        )
1036    }
1037
1038    #[test]
1039    fn timestamp_micros() -> TestResult {
1040        logical_type_test(
1041            r#"{"type": "long", "logicalType": "timestamp-micros"}"#,
1042            &Schema::TimestampMicros,
1043            Value::TimestampMicros(1_i64),
1044            &Schema::Long,
1045            1_i64,
1046        )
1047    }
1048
1049    #[test]
1050    fn decimal_fixed() -> TestResult {
1051        let size = 30;
1052        let fixed = FixedSchema {
1053            name: Name::new("decimal")?,
1054            aliases: None,
1055            doc: None,
1056            size,
1057            attributes: Default::default(),
1058        };
1059        let inner = InnerDecimalSchema::Fixed(fixed.clone());
1060        let value = vec![0u8; size];
1061        logical_type_test(
1062            r#"{"type": {"type": "fixed", "size": 30, "name": "decimal"}, "logicalType": "decimal", "precision": 20, "scale": 5}"#,
1063            &Schema::Decimal(DecimalSchema {
1064                precision: 20,
1065                scale: 5,
1066                inner,
1067            }),
1068            Value::Decimal(Decimal::from(value.clone())),
1069            &Schema::Fixed(fixed),
1070            Value::Fixed(size, value),
1071        )
1072    }
1073
1074    #[test]
1075    fn decimal_bytes() -> TestResult {
1076        let value = vec![0u8; 10];
1077        logical_type_test(
1078            r#"{"type": "bytes", "logicalType": "decimal", "precision": 4, "scale": 3}"#,
1079            &Schema::Decimal(DecimalSchema {
1080                precision: 4,
1081                scale: 3,
1082                inner: InnerDecimalSchema::Bytes,
1083            }),
1084            Value::Decimal(Decimal::from(value.clone())),
1085            &Schema::Bytes,
1086            value,
1087        )
1088    }
1089
1090    #[test]
1091    fn duration() -> TestResult {
1092        let inner = Schema::Fixed(FixedSchema {
1093            name: Name::new("duration")?,
1094            aliases: None,
1095            doc: None,
1096            size: 12,
1097            attributes: Default::default(),
1098        });
1099        let value = Value::Duration(Duration::new(
1100            Months::new(256),
1101            Days::new(512),
1102            Millis::new(1024),
1103        ));
1104        logical_type_test(
1105            r#"{"type": {"type": "fixed", "name": "duration", "size": 12}, "logicalType": "duration"}"#,
1106            &Schema::Duration(FixedSchema {
1107                name: Name::try_from("duration").expect("Name is valid"),
1108                aliases: None,
1109                doc: None,
1110                size: 12,
1111                attributes: Default::default(),
1112            }),
1113            value,
1114            &inner,
1115            Value::Fixed(12, vec![0, 1, 0, 0, 0, 2, 0, 0, 0, 4, 0, 0]),
1116        )
1117    }
1118
1119    #[test]
1120    fn test_writer_append() -> TestResult {
1121        let schema = Schema::parse_str(SCHEMA)?;
1122        let mut writer = Writer::new(&schema, Vec::new())?;
1123
1124        let mut record = Record::new(&schema).unwrap();
1125        record.put("a", 27i64);
1126        record.put("b", "foo");
1127
1128        let n1 = writer.append_value(record.clone())?;
1129        let n2 = writer.append_value(record.clone())?;
1130        let n3 = writer.flush()?;
1131        let result = writer.into_inner()?;
1132
1133        assert_eq!(n1 + n2 + n3, result.len());
1134
1135        let mut data = Vec::new();
1136        zig_i64(27, &mut data)?;
1137        zig_i64(3, &mut data)?;
1138        data.extend(b"foo");
1139        data.extend(data.clone());
1140
1141        // starts with magic
1142        assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1143        // ends with data and sync marker
1144        let last_data_byte = result.len() - 16;
1145        assert_eq!(
1146            &result[last_data_byte - data.len()..last_data_byte],
1147            data.as_slice()
1148        );
1149
1150        Ok(())
1151    }
1152
1153    #[test]
1154    fn test_writer_extend() -> TestResult {
1155        let schema = Schema::parse_str(SCHEMA)?;
1156        let mut writer = Writer::new(&schema, Vec::new())?;
1157
1158        let mut record = Record::new(&schema).unwrap();
1159        record.put("a", 27i64);
1160        record.put("b", "foo");
1161        let record_copy = record.clone();
1162        let records = vec![record, record_copy];
1163
1164        let n1 = writer.extend(records)?;
1165        let n2 = writer.flush()?;
1166        let result = writer.into_inner()?;
1167
1168        assert_eq!(n1 + n2, result.len());
1169
1170        let mut data = Vec::new();
1171        zig_i64(27, &mut data)?;
1172        zig_i64(3, &mut data)?;
1173        data.extend(b"foo");
1174        data.extend(data.clone());
1175
1176        // starts with magic
1177        assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1178        // ends with data and sync marker
1179        let last_data_byte = result.len() - 16;
1180        assert_eq!(
1181            &result[last_data_byte - data.len()..last_data_byte],
1182            data.as_slice()
1183        );
1184
1185        Ok(())
1186    }
1187
1188    #[derive(Debug, Clone, Deserialize, Serialize)]
1189    struct TestSerdeSerialize {
1190        a: i64,
1191        b: String,
1192    }
1193
1194    #[test]
1195    fn test_writer_append_ser() -> TestResult {
1196        let schema = Schema::parse_str(SCHEMA)?;
1197        let mut writer = Writer::new(&schema, Vec::new())?;
1198
1199        let record = TestSerdeSerialize {
1200            a: 27,
1201            b: "foo".to_owned(),
1202        };
1203
1204        let n1 = writer.append_ser(record)?;
1205        let n2 = writer.flush()?;
1206        let result = writer.into_inner()?;
1207
1208        assert_eq!(n1 + n2, result.len());
1209
1210        let mut data = Vec::new();
1211        zig_i64(27, &mut data)?;
1212        zig_i64(3, &mut data)?;
1213        data.extend(b"foo");
1214
1215        // starts with magic
1216        assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1217        // ends with data and sync marker
1218        let last_data_byte = result.len() - 16;
1219        assert_eq!(
1220            &result[last_data_byte - data.len()..last_data_byte],
1221            data.as_slice()
1222        );
1223
1224        Ok(())
1225    }
1226
1227    #[test]
1228    fn test_writer_extend_ser() -> TestResult {
1229        let schema = Schema::parse_str(SCHEMA)?;
1230        let mut writer = Writer::new(&schema, Vec::new())?;
1231
1232        let record = TestSerdeSerialize {
1233            a: 27,
1234            b: "foo".to_owned(),
1235        };
1236        let record_copy = record.clone();
1237        let records = vec![record, record_copy];
1238
1239        let n1 = writer.extend_ser(records)?;
1240        let n2 = writer.flush()?;
1241        let result = writer.into_inner()?;
1242
1243        assert_eq!(n1 + n2, result.len());
1244
1245        let mut data = Vec::new();
1246        zig_i64(27, &mut data)?;
1247        zig_i64(3, &mut data)?;
1248        data.extend(b"foo");
1249        data.extend(data.clone());
1250
1251        // starts with magic
1252        assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1253        // ends with data and sync marker
1254        let last_data_byte = result.len() - 16;
1255        assert_eq!(
1256            &result[last_data_byte - data.len()..last_data_byte],
1257            data.as_slice()
1258        );
1259
1260        Ok(())
1261    }
1262
1263    fn make_writer_with_codec(schema: &Schema) -> AvroResult<Writer<'_, Vec<u8>>> {
1264        Writer::with_codec(
1265            schema,
1266            Vec::new(),
1267            Codec::Deflate(DeflateSettings::default()),
1268        )
1269    }
1270
1271    fn make_writer_with_builder(schema: &Schema) -> AvroResult<Writer<'_, Vec<u8>>> {
1272        Writer::builder()
1273            .writer(Vec::new())
1274            .schema(schema)
1275            .codec(Codec::Deflate(DeflateSettings::default()))
1276            .block_size(100)
1277            .build()
1278    }
1279
1280    fn check_writer(mut writer: Writer<'_, Vec<u8>>, schema: &Schema) -> TestResult {
1281        let mut record = Record::new(schema).unwrap();
1282        record.put("a", 27i64);
1283        record.put("b", "foo");
1284
1285        let n1 = writer.append_value(record.clone())?;
1286        let n2 = writer.append_value(record.clone())?;
1287        let n3 = writer.flush()?;
1288        let result = writer.into_inner()?;
1289
1290        assert_eq!(n1 + n2 + n3, result.len());
1291
1292        let mut data = Vec::new();
1293        zig_i64(27, &mut data)?;
1294        zig_i64(3, &mut data)?;
1295        data.extend(b"foo");
1296        data.extend(data.clone());
1297        Codec::Deflate(DeflateSettings::default()).compress(&mut data)?;
1298
1299        // starts with magic
1300        assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1301        // ends with data and sync marker
1302        let last_data_byte = result.len() - 16;
1303        assert_eq!(
1304            &result[last_data_byte - data.len()..last_data_byte],
1305            data.as_slice()
1306        );
1307
1308        Ok(())
1309    }
1310
1311    #[test]
1312    fn test_writer_with_codec() -> TestResult {
1313        let schema = Schema::parse_str(SCHEMA)?;
1314        let writer = make_writer_with_codec(&schema)?;
1315        check_writer(writer, &schema)
1316    }
1317
1318    #[test]
1319    fn test_writer_with_builder() -> TestResult {
1320        let schema = Schema::parse_str(SCHEMA)?;
1321        let writer = make_writer_with_builder(&schema)?;
1322        check_writer(writer, &schema)
1323    }
1324
1325    #[test]
1326    fn test_logical_writer() -> TestResult {
1327        const LOGICAL_TYPE_SCHEMA: &str = r#"
1328        {
1329          "type": "record",
1330          "name": "logical_type_test",
1331          "fields": [
1332            {
1333              "name": "a",
1334              "type": [
1335                "null",
1336                {
1337                  "type": "long",
1338                  "logicalType": "timestamp-micros"
1339                }
1340              ]
1341            }
1342          ]
1343        }
1344        "#;
1345        let codec = Codec::Deflate(DeflateSettings::default());
1346        let schema = Schema::parse_str(LOGICAL_TYPE_SCHEMA)?;
1347        let mut writer = Writer::builder()
1348            .schema(&schema)
1349            .codec(codec)
1350            .writer(Vec::new())
1351            .build()?;
1352
1353        let mut record1 = Record::new(&schema).unwrap();
1354        record1.put(
1355            "a",
1356            Value::Union(1, Box::new(Value::TimestampMicros(1234_i64))),
1357        );
1358
1359        let mut record2 = Record::new(&schema).unwrap();
1360        record2.put("a", Value::Union(0, Box::new(Value::Null)));
1361
1362        let n1 = writer.append_value(record1)?;
1363        let n2 = writer.append_value(record2)?;
1364        let n3 = writer.flush()?;
1365        let result = writer.into_inner()?;
1366
1367        assert_eq!(n1 + n2 + n3, result.len());
1368
1369        let mut data = Vec::new();
1370        // byte indicating not null
1371        zig_i64(1, &mut data)?;
1372        zig_i64(1234, &mut data)?;
1373
1374        // byte indicating null
1375        zig_i64(0, &mut data)?;
1376        codec.compress(&mut data)?;
1377
1378        // starts with magic
1379        assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1380        // ends with data and sync marker
1381        let last_data_byte = result.len() - 16;
1382        assert_eq!(
1383            &result[last_data_byte - data.len()..last_data_byte],
1384            data.as_slice()
1385        );
1386
1387        Ok(())
1388    }
1389
1390    #[test]
1391    fn test_avro_3405_writer_add_metadata_success() -> TestResult {
1392        let schema = Schema::parse_str(SCHEMA)?;
1393        let mut writer = Writer::new(&schema, Vec::new())?;
1394
1395        writer.add_user_metadata("stringKey".to_string(), String::from("stringValue"))?;
1396        writer.add_user_metadata("strKey".to_string(), "strValue")?;
1397        writer.add_user_metadata("bytesKey".to_string(), b"bytesValue")?;
1398        writer.add_user_metadata("vecKey".to_string(), vec![1, 2, 3])?;
1399
1400        let mut record = Record::new(&schema).unwrap();
1401        record.put("a", 27i64);
1402        record.put("b", "foo");
1403
1404        writer.append_value(record.clone())?;
1405        writer.append_value(record.clone())?;
1406        writer.flush()?;
1407        let result = writer.into_inner()?;
1408
1409        assert_eq!(result.len(), 244);
1410
1411        Ok(())
1412    }
1413
1414    #[test]
1415    fn test_avro_3881_metadata_empty_body() -> TestResult {
1416        let schema = Schema::parse_str(SCHEMA)?;
1417        let mut writer = Writer::new(&schema, Vec::new())?;
1418        writer.add_user_metadata("a".to_string(), "b")?;
1419        let result = writer.into_inner()?;
1420
1421        let reader = Reader::with_schema(&schema, &result[..])?;
1422        let mut expected = HashMap::new();
1423        expected.insert("a".to_string(), vec![b'b']);
1424        assert_eq!(reader.user_metadata(), &expected);
1425        assert_eq!(reader.into_iter().count(), 0);
1426
1427        Ok(())
1428    }
1429
1430    #[test]
1431    fn test_avro_3405_writer_add_metadata_failure() -> TestResult {
1432        let schema = Schema::parse_str(SCHEMA)?;
1433        let mut writer = Writer::new(&schema, Vec::new())?;
1434
1435        let mut record = Record::new(&schema).unwrap();
1436        record.put("a", 27i64);
1437        record.put("b", "foo");
1438        writer.append_value(record.clone())?;
1439
1440        match writer
1441            .add_user_metadata("stringKey".to_string(), String::from("value2"))
1442            .map_err(Error::into_details)
1443        {
1444            Err(e @ Details::FileHeaderAlreadyWritten) => {
1445                assert_eq!(e.to_string(), "The file metadata is already flushed.")
1446            }
1447            Err(e) => panic!("Unexpected error occurred while writing user metadata: {e:?}"),
1448            Ok(_) => panic!("Expected an error that metadata cannot be added after adding data"),
1449        }
1450
1451        Ok(())
1452    }
1453
1454    #[test]
1455    fn test_avro_3405_writer_add_metadata_reserved_prefix_failure() -> TestResult {
1456        let schema = Schema::parse_str(SCHEMA)?;
1457        let mut writer = Writer::new(&schema, Vec::new())?;
1458
1459        let key = "avro.stringKey".to_string();
1460        match writer
1461            .add_user_metadata(key.clone(), "value")
1462            .map_err(Error::into_details)
1463        {
1464            Err(ref e @ Details::InvalidMetadataKey(_)) => {
1465                assert_eq!(
1466                    e.to_string(),
1467                    format!(
1468                        "Metadata keys starting with 'avro.' are reserved for internal usage: {key}."
1469                    )
1470                )
1471            }
1472            Err(e) => panic!(
1473                "Unexpected error occurred while writing user metadata with reserved prefix ('avro.'): {e:?}"
1474            ),
1475            Ok(_) => {
1476                panic!("Expected an error that the metadata key cannot be prefixed with 'avro.'")
1477            }
1478        }
1479
1480        Ok(())
1481    }
1482
1483    #[test]
1484    fn test_avro_3405_writer_add_metadata_with_builder_api_success() -> TestResult {
1485        let schema = Schema::parse_str(SCHEMA)?;
1486
1487        let mut user_meta_data: HashMap<String, Value> = HashMap::new();
1488        user_meta_data.insert(
1489            "stringKey".to_string(),
1490            Value::String("stringValue".to_string()),
1491        );
1492        user_meta_data.insert("bytesKey".to_string(), Value::Bytes(b"bytesValue".to_vec()));
1493        user_meta_data.insert("vecKey".to_string(), Value::Bytes(vec![1, 2, 3]));
1494
1495        let writer: Writer<'_, Vec<u8>> = Writer::builder()
1496            .writer(Vec::new())
1497            .schema(&schema)
1498            .user_metadata(user_meta_data.clone())
1499            .build()?;
1500
1501        assert_eq!(writer.user_metadata, user_meta_data);
1502
1503        Ok(())
1504    }
1505
1506    #[derive(Serialize, Clone)]
1507    struct TestSingleObjectWriter {
1508        a: i64,
1509        b: f64,
1510        c: Vec<String>,
1511    }
1512
1513    impl AvroSchema for TestSingleObjectWriter {
1514        fn get_schema() -> Schema {
1515            let schema = r#"
1516            {
1517                "type":"record",
1518                "name":"TestSingleObjectWrtierSerialize",
1519                "fields":[
1520                    {
1521                        "name":"a",
1522                        "type":"long"
1523                    },
1524                    {
1525                        "name":"b",
1526                        "type":"double"
1527                    },
1528                    {
1529                        "name":"c",
1530                        "type":{
1531                            "type":"array",
1532                            "items":"string"
1533                        }
1534                    }
1535                ]
1536            }
1537            "#;
1538            Schema::parse_str(schema).unwrap()
1539        }
1540    }
1541
1542    impl From<TestSingleObjectWriter> for Value {
1543        fn from(obj: TestSingleObjectWriter) -> Value {
1544            Value::Record(vec![
1545                ("a".into(), obj.a.into()),
1546                ("b".into(), obj.b.into()),
1547                (
1548                    "c".into(),
1549                    Value::Array(obj.c.into_iter().map(|s| s.into()).collect()),
1550                ),
1551            ])
1552        }
1553    }
1554
1555    #[test]
1556    fn test_single_object_writer() -> TestResult {
1557        let mut buf: Vec<u8> = Vec::new();
1558        let obj = TestSingleObjectWriter {
1559            a: 300,
1560            b: 34.555,
1561            c: vec!["cat".into(), "dog".into()],
1562        };
1563        let mut writer = GenericSingleObjectWriter::new_with_capacity(
1564            &TestSingleObjectWriter::get_schema(),
1565            1024,
1566        )
1567        .expect("Should resolve schema");
1568        let value = obj.into();
1569        let written_bytes = writer
1570            .write_value_ref(&value, &mut buf)
1571            .expect("Error serializing properly");
1572
1573        assert!(buf.len() > 10, "no bytes written");
1574        assert_eq!(buf.len(), written_bytes);
1575        assert_eq!(buf[0], 0xC3);
1576        assert_eq!(buf[1], 0x01);
1577        assert_eq!(
1578            &buf[2..10],
1579            &TestSingleObjectWriter::get_schema()
1580                .fingerprint::<Rabin>()
1581                .bytes[..]
1582        );
1583        let mut msg_binary = Vec::new();
1584        encode(
1585            &value,
1586            &TestSingleObjectWriter::get_schema(),
1587            &mut msg_binary,
1588        )
1589        .expect("encode should have failed by here as a dependency of any writing");
1590        assert_eq!(&buf[10..], &msg_binary[..]);
1591
1592        Ok(())
1593    }
1594
1595    #[test]
1596    fn test_single_object_writer_with_header_builder() -> TestResult {
1597        let mut buf: Vec<u8> = Vec::new();
1598        let obj = TestSingleObjectWriter {
1599            a: 300,
1600            b: 34.555,
1601            c: vec!["cat".into(), "dog".into()],
1602        };
1603        let schema_uuid = Uuid::parse_str("b2f1cf00-0434-013e-439a-125eb8485a5f")?;
1604        let header_builder = GlueSchemaUuidHeader::from_uuid(schema_uuid);
1605        let mut writer = GenericSingleObjectWriter::new_with_capacity_and_header_builder(
1606            &TestSingleObjectWriter::get_schema(),
1607            1024,
1608            header_builder,
1609        )
1610        .expect("Should resolve schema");
1611        let value = obj.into();
1612        writer
1613            .write_value_ref(&value, &mut buf)
1614            .expect("Error serializing properly");
1615
1616        assert_eq!(buf[0], 0x03);
1617        assert_eq!(buf[1], 0x00);
1618        assert_eq!(buf[2..18], schema_uuid.into_bytes()[..]);
1619        Ok(())
1620    }
1621
1622    #[test]
1623    fn test_writer_parity() -> TestResult {
1624        let obj1 = TestSingleObjectWriter {
1625            a: 300,
1626            b: 34.555,
1627            c: vec!["cat".into(), "dog".into()],
1628        };
1629
1630        let mut buf1: Vec<u8> = Vec::new();
1631        let mut buf2: Vec<u8> = Vec::new();
1632        let mut buf3: Vec<u8> = Vec::new();
1633        let mut buf4: Vec<u8> = Vec::new();
1634
1635        let mut generic_writer = GenericSingleObjectWriter::new_with_capacity(
1636            &TestSingleObjectWriter::get_schema(),
1637            1024,
1638        )
1639        .expect("Should resolve schema");
1640        let specific_writer = SpecificSingleObjectWriter::<TestSingleObjectWriter>::new()
1641            .expect("Resolved should pass");
1642        specific_writer
1643            .write_ref(&obj1, &mut buf1)
1644            .expect("Serialization expected");
1645        specific_writer
1646            .write_ref(&obj1, &mut buf2)
1647            .expect("Serialization expected");
1648        specific_writer
1649            .write_value(obj1.clone(), &mut buf3)
1650            .expect("Serialization expected");
1651
1652        generic_writer
1653            .write_value(obj1.into(), &mut buf4)
1654            .expect("Serialization expected");
1655
1656        assert_eq!(buf1, buf2);
1657        assert_eq!(buf2, buf3);
1658        assert_eq!(buf3, buf4);
1659
1660        Ok(())
1661    }
1662
1663    #[test]
1664    fn avro_3894_take_aliases_into_account_when_serializing() -> TestResult {
1665        const SCHEMA: &str = r#"
1666  {
1667      "type": "record",
1668      "name": "Conference",
1669      "fields": [
1670          {"type": "string", "name": "name"},
1671          {"type": ["null", "long"], "name": "date", "aliases" : [ "time2", "time" ]}
1672      ]
1673  }"#;
1674
1675        #[derive(Debug, PartialEq, Eq, Clone, Serialize)]
1676        pub struct Conference {
1677            pub name: String,
1678            pub time: Option<i64>,
1679        }
1680
1681        let conf = Conference {
1682            name: "RustConf".to_string(),
1683            time: Some(1234567890),
1684        };
1685
1686        let schema = Schema::parse_str(SCHEMA)?;
1687        let mut writer = Writer::new(&schema, Vec::new())?;
1688
1689        let bytes = writer.append_ser(conf)?;
1690
1691        assert_eq!(182, bytes);
1692
1693        Ok(())
1694    }
1695
1696    #[test]
1697    fn avro_4014_validation_returns_a_detailed_error() -> TestResult {
1698        const SCHEMA: &str = r#"
1699  {
1700      "type": "record",
1701      "name": "Conference",
1702      "fields": [
1703          {"type": "string", "name": "name"},
1704          {"type": ["null", "long"], "name": "date", "aliases" : [ "time2", "time" ]}
1705      ]
1706  }"#;
1707
1708        #[derive(Debug, PartialEq, Clone, Serialize)]
1709        pub struct Conference {
1710            pub name: String,
1711            pub time: Option<f64>, // wrong type: f64 instead of i64
1712        }
1713
1714        let conf = Conference {
1715            name: "RustConf".to_string(),
1716            time: Some(12345678.90),
1717        };
1718
1719        let schema = Schema::parse_str(SCHEMA)?;
1720        let mut writer = Writer::new(&schema, Vec::new())?;
1721
1722        match writer.append_ser(conf) {
1723            Ok(bytes) => panic!("Expected an error, but got {bytes} bytes written"),
1724            Err(e) => {
1725                assert_eq!(
1726                    e.to_string(),
1727                    r#"Failed to serialize field 'time' for record Record(RecordSchema { name: Name { name: "Conference", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "name", doc: None, aliases: None, default: None, schema: String, order: Ascending, position: 0, custom_attributes: {} }, RecordField { name: "date", doc: None, aliases: Some(["time2", "time"]), default: None, schema: Union(UnionSchema { schemas: [Null, Long], variant_index: {Null: 0, Long: 1} }), order: Ascending, position: 1, custom_attributes: {} }], lookup: {"date": 1, "name": 0, "time": 1, "time2": 1}, attributes: {} }): Failed to serialize value of type f64 using schema Union(UnionSchema { schemas: [Null, Long], variant_index: {Null: 0, Long: 1} }): 12345678.9. Cause: Cannot find a Double schema in [Null, Long]"#
1728                );
1729            }
1730        }
1731        Ok(())
1732    }
1733
1734    #[test]
1735    fn avro_4063_flush_applies_to_inner_writer() -> TestResult {
1736        const SCHEMA: &str = r#"
1737        {
1738            "type": "record",
1739            "name": "ExampleSchema",
1740            "fields": [
1741                {"name": "exampleField", "type": "string"}
1742            ]
1743        }
1744        "#;
1745
1746        #[derive(Clone, Default)]
1747        struct TestBuffer(Rc<RefCell<Vec<u8>>>);
1748
1749        impl TestBuffer {
1750            fn len(&self) -> usize {
1751                self.0.borrow().len()
1752            }
1753        }
1754
1755        impl Write for TestBuffer {
1756            fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
1757                self.0.borrow_mut().write(buf)
1758            }
1759
1760            fn flush(&mut self) -> std::io::Result<()> {
1761                Ok(())
1762            }
1763        }
1764
1765        let shared_buffer = TestBuffer::default();
1766
1767        let buffered_writer = std::io::BufWriter::new(shared_buffer.clone());
1768
1769        let schema = Schema::parse_str(SCHEMA)?;
1770
1771        let mut writer = Writer::new(&schema, buffered_writer)?;
1772
1773        let mut record = Record::new(writer.schema()).unwrap();
1774        record.put("exampleField", "value");
1775
1776        writer.append_value(record)?;
1777        writer.flush()?;
1778
1779        assert_eq!(
1780            shared_buffer.len(),
1781            151,
1782            "the test buffer was not fully written to after Writer::flush was called"
1783        );
1784
1785        Ok(())
1786    }
1787
1788    #[test]
1789    fn avro_rs_439_specific_single_object_writer_ref() -> TestResult {
1790        #[derive(Serialize)]
1791        struct Recursive {
1792            field: bool,
1793            recurse: Option<Box<Recursive>>,
1794        }
1795
1796        impl AvroSchema for Recursive {
1797            fn get_schema() -> Schema {
1798                Schema::parse_str(
1799                    r#"{
1800                    "name": "Recursive",
1801                    "type": "record",
1802                    "fields": [
1803                        { "name": "field", "type": "boolean" },
1804                        { "name": "recurse", "type": ["null", "Recursive"] }
1805                    ]
1806                }"#,
1807                )
1808                .unwrap()
1809            }
1810        }
1811
1812        let mut buffer = Vec::new();
1813        let writer = SpecificSingleObjectWriter::new()?;
1814
1815        writer.write(
1816            Recursive {
1817                field: true,
1818                recurse: Some(Box::new(Recursive {
1819                    field: false,
1820                    recurse: None,
1821                })),
1822            },
1823            &mut buffer,
1824        )?;
1825        assert_eq!(
1826            buffer,
1827            &[195, 1, 83, 223, 43, 26, 181, 179, 227, 224, 1, 2, 0, 0][..]
1828        );
1829
1830        Ok(())
1831    }
1832
1833    #[test]
1834    fn avro_rs_310_append_unvalidated_value() -> TestResult {
1835        let schema = Schema::String;
1836        let value = Value::Int(1);
1837
1838        let mut writer = Writer::new(&schema, Vec::new())?;
1839        writer.unvalidated_append_value_ref(&value)?;
1840        writer.unvalidated_append_value(value)?;
1841        let buffer = writer.into_inner()?;
1842
1843        // Check the last two bytes for the sync marker
1844        assert_eq!(&buffer[buffer.len() - 18..buffer.len() - 16], &[2, 2]);
1845
1846        let mut writer = Writer::new(&schema, Vec::new())?;
1847        let value = Value::Int(1);
1848        let err = writer.append_value_ref(&value).unwrap_err();
1849        assert_eq!(
1850            err.to_string(),
1851            "Value Int(1) does not match schema String: Reason: Unsupported value-schema combination! Value: Int(1), schema: String"
1852        );
1853        let err = writer.append_value(value).unwrap_err();
1854        assert_eq!(
1855            err.to_string(),
1856            "Value Int(1) does not match schema String: Reason: Unsupported value-schema combination! Value: Int(1), schema: String"
1857        );
1858
1859        Ok(())
1860    }
1861}