apache_avro/
writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Logic handling writing in Avro format at user level.
19use crate::{
20    AvroResult, Codec, Error,
21    encode::{encode, encode_internal, encode_to_vec},
22    error::Details,
23    headers::{HeaderBuilder, RabinFingerprintHeader},
24    schema::{NamesRef, ResolvedOwnedSchema, ResolvedSchema, Schema},
25    serde::{AvroSchema, ser_schema::SchemaAwareWriteSerializer},
26    types::Value,
27};
28use serde::Serialize;
29use std::{
30    collections::HashMap, io::Write, marker::PhantomData, mem::ManuallyDrop, ops::RangeInclusive,
31};
32
33const DEFAULT_BLOCK_SIZE: usize = 16000;
34const AVRO_OBJECT_HEADER: &[u8] = b"Obj\x01";
35
36/// Main interface for writing Avro formatted values.
37///
38/// It is critical to call flush before `Writer<W>` is dropped. Though dropping will attempt to flush
39/// the contents of the buffer, any errors that happen in the process of dropping will be ignored.
40/// Calling flush ensures that the buffer is empty and thus dropping will not even attempt file operations.
41pub struct Writer<'a, W: Write> {
42    schema: &'a Schema,
43    writer: W,
44    resolved_schema: ResolvedSchema<'a>,
45    codec: Codec,
46    block_size: usize,
47    buffer: Vec<u8>,
48    num_values: usize,
49    marker: [u8; 16],
50    has_header: bool,
51    user_metadata: HashMap<String, Value>,
52}
53
54#[bon::bon]
55impl<'a, W: Write> Writer<'a, W> {
56    #[builder]
57    pub fn builder(
58        schema: &'a Schema,
59        schemata: Option<Vec<&'a Schema>>,
60        writer: W,
61        #[builder(default = Codec::Null)] codec: Codec,
62        #[builder(default = DEFAULT_BLOCK_SIZE)] block_size: usize,
63        #[builder(default = generate_sync_marker())] marker: [u8; 16],
64        /// Has the header already been written.
65        ///
66        /// To disable writing the header, this can be set to `true`.
67        #[builder(default = false)]
68        has_header: bool,
69        #[builder(default)] user_metadata: HashMap<String, Value>,
70    ) -> AvroResult<Self> {
71        let resolved_schema = if let Some(schemata) = schemata {
72            ResolvedSchema::try_from(schemata)?
73        } else {
74            ResolvedSchema::try_from(schema)?
75        };
76        Ok(Self {
77            schema,
78            writer,
79            resolved_schema,
80            codec,
81            block_size,
82            buffer: Vec::with_capacity(block_size),
83            num_values: 0,
84            marker,
85            has_header,
86            user_metadata,
87        })
88    }
89}
90
91impl<'a, W: Write> Writer<'a, W> {
92    /// Creates a `Writer` given a `Schema` and something implementing the `io::Write` trait to write
93    /// to.
94    /// No compression `Codec` will be used.
95    pub fn new(schema: &'a Schema, writer: W) -> AvroResult<Self> {
96        Writer::with_codec(schema, writer, Codec::Null)
97    }
98
99    /// Creates a `Writer` with a specific `Codec` given a `Schema` and something implementing the
100    /// `io::Write` trait to write to.
101    pub fn with_codec(schema: &'a Schema, writer: W, codec: Codec) -> AvroResult<Self> {
102        Self::builder()
103            .schema(schema)
104            .writer(writer)
105            .codec(codec)
106            .build()
107    }
108
109    /// Creates a `Writer` with a specific `Codec` given a `Schema` and something implementing the
110    /// `io::Write` trait to write to.
111    /// If the `schema` is incomplete, i.e. contains `Schema::Ref`s then all dependencies must
112    /// be provided in `schemata`.
113    pub fn with_schemata(
114        schema: &'a Schema,
115        schemata: Vec<&'a Schema>,
116        writer: W,
117        codec: Codec,
118    ) -> AvroResult<Self> {
119        Self::builder()
120            .schema(schema)
121            .schemata(schemata)
122            .writer(writer)
123            .codec(codec)
124            .build()
125    }
126
127    /// Creates a `Writer` that will append values to already populated
128    /// `std::io::Write` using the provided `marker`
129    /// No compression `Codec` will be used.
130    pub fn append_to(schema: &'a Schema, writer: W, marker: [u8; 16]) -> AvroResult<Self> {
131        Writer::append_to_with_codec(schema, writer, Codec::Null, marker)
132    }
133
134    /// Creates a `Writer` that will append values to already populated
135    /// `std::io::Write` using the provided `marker`
136    pub fn append_to_with_codec(
137        schema: &'a Schema,
138        writer: W,
139        codec: Codec,
140        marker: [u8; 16],
141    ) -> AvroResult<Self> {
142        Self::builder()
143            .schema(schema)
144            .writer(writer)
145            .codec(codec)
146            .marker(marker)
147            .has_header(true)
148            .build()
149    }
150
151    /// Creates a `Writer` that will append values to already populated
152    /// `std::io::Write` using the provided `marker`
153    pub fn append_to_with_codec_schemata(
154        schema: &'a Schema,
155        schemata: Vec<&'a Schema>,
156        writer: W,
157        codec: Codec,
158        marker: [u8; 16],
159    ) -> AvroResult<Self> {
160        Self::builder()
161            .schema(schema)
162            .schemata(schemata)
163            .writer(writer)
164            .codec(codec)
165            .marker(marker)
166            .has_header(true)
167            .build()
168    }
169
170    /// Get a reference to the `Schema` associated to a `Writer`.
171    pub fn schema(&self) -> &'a Schema {
172        self.schema
173    }
174
175    /// Deprecated. Use [`Writer::append_value`] instead.
176    #[deprecated(since = "0.22.0", note = "Use `Writer::append_value` instead")]
177    pub fn append<T: Into<Value>>(&mut self, value: T) -> AvroResult<usize> {
178        self.append_value(value)
179    }
180
181    /// Append a compatible value to a `Writer`, also performing schema validation.
182    ///
183    /// Returns the number of bytes written (it might be 0, see below).
184    ///
185    /// **NOTE**: This function is not guaranteed to perform any actual write, since it relies on
186    /// internal buffering for performance reasons. If you want to be sure the value has been
187    /// written, then call [`flush`](Writer::flush).
188    pub fn append_value<T: Into<Value>>(&mut self, value: T) -> AvroResult<usize> {
189        let avro = value.into();
190        self.append_value_ref(&avro)
191    }
192
193    /// Append a compatible value to a `Writer`, also performing schema validation.
194    ///
195    /// Returns the number of bytes written (it might be 0, see below).
196    ///
197    /// **NOTE**: This function is not guaranteed to perform any actual write, since it relies on
198    /// internal buffering for performance reasons. If you want to be sure the value has been
199    /// written, then call [`flush`](Writer::flush).
200    pub fn append_value_ref(&mut self, value: &Value) -> AvroResult<usize> {
201        if let Some(reason) = value.validate_internal(
202            self.schema,
203            self.resolved_schema.get_names(),
204            &self.schema.namespace(),
205        ) {
206            return Err(Details::ValidationWithReason {
207                value: value.clone(),
208                schema: self.schema.clone(),
209                reason,
210            }
211            .into());
212        }
213        self.unvalidated_append_value_ref(value)
214    }
215
216    /// Append a compatible value to a `Writer`.
217    ///
218    /// This function does **not** validate that the provided value matches the schema. If it does
219    /// not match, the file will contain corrupt data. Use [`Writer::append_value`] to have the
220    /// value validated during write or use [`Value::validate`] to validate the value.
221    ///
222    /// Returns the number of bytes written (it might be 0, see below).
223    ///
224    /// **NOTE**: This function is not guaranteed to perform any actual write, since it relies on
225    /// internal buffering for performance reasons. If you want to be sure the value has been
226    /// written, then call [`flush`](Writer::flush).
227    pub fn unvalidated_append_value<T: Into<Value>>(&mut self, value: T) -> AvroResult<usize> {
228        let value = value.into();
229        self.unvalidated_append_value_ref(&value)
230    }
231
232    /// Append a compatible value to a `Writer`.
233    ///
234    /// This function does **not** validate that the provided value matches the schema. If it does
235    /// not match, the file will contain corrupt data. Use [`Writer::append_value_ref`] to have the
236    /// value validated during write or use [`Value::validate`] to validate the value.
237    ///
238    /// Returns the number of bytes written (it might be 0, see below).
239    ///
240    /// **NOTE**: This function is not guaranteed to perform any actual write, since it relies on
241    /// internal buffering for performance reasons. If you want to be sure the value has been
242    /// written, then call [`flush`](Writer::flush).
243    pub fn unvalidated_append_value_ref(&mut self, value: &Value) -> AvroResult<usize> {
244        let n = self.maybe_write_header()?;
245        encode_internal(
246            value,
247            self.schema,
248            self.resolved_schema.get_names(),
249            &self.schema.namespace(),
250            &mut self.buffer,
251        )?;
252
253        self.num_values += 1;
254
255        if self.buffer.len() >= self.block_size {
256            return self.flush().map(|b| b + n);
257        }
258
259        Ok(n)
260    }
261
262    /// Append anything implementing the `Serialize` trait to a `Writer` for
263    /// [`serde`](https://docs.serde.rs/serde/index.html) compatibility, also performing schema
264    /// validation.
265    ///
266    /// Returns the number of bytes written.
267    ///
268    /// **NOTE**: This function is not guaranteed to perform any actual write, since it relies on
269    /// internal buffering for performance reasons. If you want to be sure the value has been
270    /// written, then call [`flush`](Writer::flush).
271    pub fn append_ser<S: Serialize>(&mut self, value: S) -> AvroResult<usize> {
272        let n = self.maybe_write_header()?;
273
274        let mut serializer = SchemaAwareWriteSerializer::new(
275            &mut self.buffer,
276            self.schema,
277            self.resolved_schema.get_names(),
278            None,
279        );
280        value.serialize(&mut serializer)?;
281        self.num_values += 1;
282
283        if self.buffer.len() >= self.block_size {
284            return self.flush().map(|b| b + n);
285        }
286
287        Ok(n)
288    }
289
290    /// Extend a `Writer` with an `Iterator` of compatible values (implementing the `ToAvro`
291    /// trait), also performing schema validation.
292    ///
293    /// Returns the number of bytes written.
294    ///
295    /// **NOTE**: This function forces the written data to be flushed (an implicit
296    /// call to [`flush`](Writer::flush) is performed).
297    pub fn extend<I, T: Into<Value>>(&mut self, values: I) -> AvroResult<usize>
298    where
299        I: IntoIterator<Item = T>,
300    {
301        /*
302        https://github.com/rust-lang/rfcs/issues/811 :(
303        let mut stream = values
304            .filter_map(|value| value.serialize(&mut self.serializer).ok())
305            .map(|value| value.encode(self.schema))
306            .collect::<Option<Vec<_>>>()
307            .ok_or_else(|| err_msg("value does not match given schema"))?
308            .into_iter()
309            .fold(Vec::new(), |mut acc, stream| {
310                num_values += 1;
311                acc.extend(stream); acc
312            });
313        */
314
315        let mut num_bytes = 0;
316        for value in values {
317            num_bytes += self.append_value(value)?;
318        }
319        num_bytes += self.flush()?;
320
321        Ok(num_bytes)
322    }
323
324    /// Extend a `Writer` with an `Iterator` of anything implementing the `Serialize` trait for
325    /// [`serde`](https://docs.serde.rs/serde/index.html) compatibility, also performing schema
326    /// validation.
327    ///
328    /// Returns the number of bytes written.
329    ///
330    /// **NOTE**: This function forces the written data to be flushed (an implicit
331    /// call to [`flush`](Writer::flush) is performed).
332    pub fn extend_ser<I, T: Serialize>(&mut self, values: I) -> AvroResult<usize>
333    where
334        I: IntoIterator<Item = T>,
335    {
336        /*
337        https://github.com/rust-lang/rfcs/issues/811 :(
338        let mut stream = values
339            .filter_map(|value| value.serialize(&mut self.serializer).ok())
340            .map(|value| value.encode(self.schema))
341            .collect::<Option<Vec<_>>>()
342            .ok_or_else(|| err_msg("value does not match given schema"))?
343            .into_iter()
344            .fold(Vec::new(), |mut acc, stream| {
345                num_values += 1;
346                acc.extend(stream); acc
347            });
348        */
349
350        let mut num_bytes = 0;
351        for value in values {
352            num_bytes += self.append_ser(value)?;
353        }
354        num_bytes += self.flush()?;
355
356        Ok(num_bytes)
357    }
358
359    /// Extend a `Writer` by appending each `Value` from a slice, while also performing schema
360    /// validation on each value appended.
361    ///
362    /// Returns the number of bytes written.
363    ///
364    /// **NOTE**: This function forces the written data to be flushed (an implicit
365    /// call to [`flush`](Writer::flush) is performed).
366    pub fn extend_from_slice(&mut self, values: &[Value]) -> AvroResult<usize> {
367        let mut num_bytes = 0;
368        for value in values {
369            num_bytes += self.append_value_ref(value)?;
370        }
371        num_bytes += self.flush()?;
372
373        Ok(num_bytes)
374    }
375
376    /// Flush the content to the inner `Writer`.
377    ///
378    /// Call this function to make sure all the content has been written before releasing the `Writer`.
379    /// This will also write the header if it wasn't written yet and hasn't been disabled using
380    /// [`WriterBuilder::has_header`].
381    ///
382    /// Returns the number of bytes written.
383    pub fn flush(&mut self) -> AvroResult<usize> {
384        let mut num_bytes = self.maybe_write_header()?;
385        if self.num_values == 0 {
386            return Ok(num_bytes);
387        }
388
389        self.codec.compress(&mut self.buffer)?;
390
391        let num_values = self.num_values;
392        let stream_len = self.buffer.len();
393
394        num_bytes += self.append_raw(&num_values.into(), &Schema::Long)?
395            + self.append_raw(&stream_len.into(), &Schema::Long)?
396            + self
397                .writer
398                .write(self.buffer.as_ref())
399                .map_err(Details::WriteBytes)?
400            + self.append_marker()?;
401
402        self.buffer.clear();
403        self.num_values = 0;
404
405        self.writer.flush().map_err(Details::FlushWriter)?;
406
407        Ok(num_bytes)
408    }
409
410    /// Return what the `Writer` is writing to, consuming the `Writer` itself.
411    ///
412    /// **NOTE**: This function forces the written data to be flushed (an implicit
413    /// call to [`flush`](Writer::flush) is performed).
414    pub fn into_inner(mut self) -> AvroResult<W> {
415        self.maybe_write_header()?;
416        self.flush()?;
417
418        let mut this = ManuallyDrop::new(self);
419
420        // Extract every member that is not Copy and therefore should be dropped
421        let _buffer = std::mem::take(&mut this.buffer);
422        let _user_metadata = std::mem::take(&mut this.user_metadata);
423        // SAFETY: resolved schema is not accessed after this and won't be dropped because of ManuallyDrop
424        unsafe { std::ptr::drop_in_place(&mut this.resolved_schema) };
425
426        // SAFETY: double-drops are prevented by putting `this` in a ManuallyDrop that is never dropped
427        let writer = unsafe { std::ptr::read(&this.writer) };
428
429        Ok(writer)
430    }
431
432    /// Gets a reference to the underlying writer.
433    ///
434    /// **NOTE**: There is likely data still in the buffer. To have all the data
435    /// in the writer call [`flush`](Writer::flush) first.
436    pub fn get_ref(&self) -> &W {
437        &self.writer
438    }
439
440    /// Gets a mutable reference to the underlying writer.
441    ///
442    /// It is inadvisable to directly write to the underlying writer.
443    ///
444    /// **NOTE**: There is likely data still in the buffer. To have all the data
445    /// in the writer call [`flush`](Writer::flush) first.
446    pub fn get_mut(&mut self) -> &mut W {
447        &mut self.writer
448    }
449
450    /// Generate and append synchronization marker to the payload.
451    fn append_marker(&mut self) -> AvroResult<usize> {
452        // using .writer.write directly to avoid mutable borrow of self
453        // with ref borrowing of self.marker
454        self.writer
455            .write(&self.marker)
456            .map_err(|e| Details::WriteMarker(e).into())
457    }
458
459    /// Append a raw Avro Value to the payload avoiding to encode it again.
460    fn append_raw(&mut self, value: &Value, schema: &Schema) -> AvroResult<usize> {
461        self.append_bytes(encode_to_vec(value, schema)?.as_ref())
462    }
463
464    /// Append pure bytes to the payload.
465    fn append_bytes(&mut self, bytes: &[u8]) -> AvroResult<usize> {
466        self.writer
467            .write(bytes)
468            .map_err(|e| Details::WriteBytes(e).into())
469    }
470
471    /// Adds custom metadata to the file.
472    /// This method could be used only before adding the first record to the writer.
473    pub fn add_user_metadata<T: AsRef<[u8]>>(&mut self, key: String, value: T) -> AvroResult<()> {
474        if !self.has_header {
475            if key.starts_with("avro.") {
476                return Err(Details::InvalidMetadataKey(key).into());
477            }
478            self.user_metadata
479                .insert(key, Value::Bytes(value.as_ref().to_vec()));
480            Ok(())
481        } else {
482            Err(Details::FileHeaderAlreadyWritten.into())
483        }
484    }
485
486    /// Create an Avro header based on schema, codec and sync marker.
487    fn header(&self) -> Result<Vec<u8>, Error> {
488        let schema_bytes = serde_json::to_string(self.schema)
489            .map_err(Details::ConvertJsonToString)?
490            .into_bytes();
491
492        let mut metadata = HashMap::with_capacity(2);
493        metadata.insert("avro.schema", Value::Bytes(schema_bytes));
494        if self.codec != Codec::Null {
495            metadata.insert("avro.codec", self.codec.into());
496        }
497        match self.codec {
498            #[cfg(feature = "bzip")]
499            Codec::Bzip2(settings) => {
500                metadata.insert(
501                    "avro.codec.compression_level",
502                    Value::Bytes(vec![settings.compression_level]),
503                );
504            }
505            #[cfg(feature = "xz")]
506            Codec::Xz(settings) => {
507                metadata.insert(
508                    "avro.codec.compression_level",
509                    Value::Bytes(vec![settings.compression_level]),
510                );
511            }
512            #[cfg(feature = "zstandard")]
513            Codec::Zstandard(settings) => {
514                metadata.insert(
515                    "avro.codec.compression_level",
516                    Value::Bytes(vec![settings.compression_level]),
517                );
518            }
519            _ => {}
520        }
521
522        for (k, v) in &self.user_metadata {
523            metadata.insert(k.as_str(), v.clone());
524        }
525
526        let mut header = Vec::new();
527        header.extend_from_slice(AVRO_OBJECT_HEADER);
528        encode(&metadata.into(), &Schema::map(Schema::Bytes), &mut header)?;
529        header.extend_from_slice(&self.marker);
530
531        Ok(header)
532    }
533
534    fn maybe_write_header(&mut self) -> AvroResult<usize> {
535        if !self.has_header {
536            let header = self.header()?;
537            let n = self.append_bytes(header.as_ref())?;
538            self.has_header = true;
539            Ok(n)
540        } else {
541            Ok(0)
542        }
543    }
544}
545
546impl<W: Write> Drop for Writer<'_, W> {
547    /// Drop the writer, will try to flush ignoring any errors.
548    fn drop(&mut self) {
549        let _ = self.maybe_write_header();
550        let _ = self.flush();
551    }
552}
553
554/// Encode a compatible value (implementing the `ToAvro` trait) into Avro format, also performing
555/// schema validation.
556///
557/// This is an internal function which gets the bytes buffer where to write as parameter instead of
558/// creating a new one like `to_avro_datum`.
559fn write_avro_datum<T: Into<Value>, W: Write>(
560    schema: &Schema,
561    value: T,
562    writer: &mut W,
563) -> Result<(), Error> {
564    let avro = value.into();
565    if !avro.validate(schema) {
566        return Err(Details::Validation.into());
567    }
568    encode(&avro, schema, writer)?;
569    Ok(())
570}
571
572fn write_avro_datum_schemata<T: Into<Value>>(
573    schema: &Schema,
574    schemata: Vec<&Schema>,
575    value: T,
576    buffer: &mut Vec<u8>,
577) -> AvroResult<usize> {
578    let avro = value.into();
579    let rs = ResolvedSchema::try_from(schemata)?;
580    let names = rs.get_names();
581    let enclosing_namespace = schema.namespace();
582    if let Some(_err) = avro.validate_internal(schema, names, &enclosing_namespace) {
583        return Err(Details::Validation.into());
584    }
585    encode_internal(&avro, schema, names, &enclosing_namespace, buffer)
586}
587
588/// Writer that encodes messages according to the single object encoding v1 spec
589/// Uses an API similar to the current File Writer
590/// Writes all object bytes at once, and drains internal buffer
591pub struct GenericSingleObjectWriter {
592    buffer: Vec<u8>,
593    resolved: ResolvedOwnedSchema,
594}
595
596impl GenericSingleObjectWriter {
597    pub fn new_with_capacity(
598        schema: &Schema,
599        initial_buffer_cap: usize,
600    ) -> AvroResult<GenericSingleObjectWriter> {
601        let header_builder = RabinFingerprintHeader::from_schema(schema);
602        Self::new_with_capacity_and_header_builder(schema, initial_buffer_cap, header_builder)
603    }
604
605    pub fn new_with_capacity_and_header_builder<HB: HeaderBuilder>(
606        schema: &Schema,
607        initial_buffer_cap: usize,
608        header_builder: HB,
609    ) -> AvroResult<GenericSingleObjectWriter> {
610        let mut buffer = Vec::with_capacity(initial_buffer_cap);
611        let header = header_builder.build_header();
612        buffer.extend_from_slice(&header);
613
614        Ok(GenericSingleObjectWriter {
615            buffer,
616            resolved: ResolvedOwnedSchema::try_from(schema.clone())?,
617        })
618    }
619
620    const HEADER_LENGTH_RANGE: RangeInclusive<usize> = 10_usize..=20_usize;
621
622    /// Write the referenced Value to the provided Write object. Returns a result with the number of bytes written including the header
623    pub fn write_value_ref<W: Write>(&mut self, v: &Value, writer: &mut W) -> AvroResult<usize> {
624        let original_length = self.buffer.len();
625        if !Self::HEADER_LENGTH_RANGE.contains(&original_length) {
626            Err(Details::IllegalSingleObjectWriterState.into())
627        } else {
628            write_value_ref_owned_resolved(&self.resolved, v, &mut self.buffer)?;
629            writer
630                .write_all(&self.buffer)
631                .map_err(Details::WriteBytes)?;
632            let len = self.buffer.len();
633            self.buffer.truncate(original_length);
634            Ok(len)
635        }
636    }
637
638    /// Write the Value to the provided Write object. Returns a result with the number of bytes written including the header
639    pub fn write_value<W: Write>(&mut self, v: Value, writer: &mut W) -> AvroResult<usize> {
640        self.write_value_ref(&v, writer)
641    }
642}
643
644/// Writer that encodes messages according to the single object encoding v1 spec
645pub struct SpecificSingleObjectWriter<T>
646where
647    T: AvroSchema,
648{
649    resolved: ResolvedOwnedSchema,
650    header: Vec<u8>,
651    _model: PhantomData<T>,
652}
653
654impl<T> SpecificSingleObjectWriter<T>
655where
656    T: AvroSchema,
657{
658    pub fn new() -> AvroResult<Self> {
659        let schema = T::get_schema();
660        let header = RabinFingerprintHeader::from_schema(&schema).build_header();
661        let resolved = ResolvedOwnedSchema::new(schema)?;
662        // We don't use Self::new_with_header_builder as that would mean calling T::get_schema() twice
663        Ok(Self {
664            resolved,
665            header,
666            _model: PhantomData,
667        })
668    }
669
670    pub fn new_with_header_builder(header_builder: impl HeaderBuilder) -> AvroResult<Self> {
671        let header = header_builder.build_header();
672        let resolved = ResolvedOwnedSchema::new(T::get_schema())?;
673        Ok(Self {
674            resolved,
675            header,
676            _model: PhantomData,
677        })
678    }
679
680    /// Deprecated. Use [`SpecificSingleObjectWriter::new`] instead.
681    #[deprecated(since = "0.22.0", note = "Use new() instead")]
682    pub fn with_capacity(_buffer_cap: usize) -> AvroResult<Self> {
683        Self::new()
684    }
685}
686
687impl<T> SpecificSingleObjectWriter<T>
688where
689    T: AvroSchema + Into<Value>,
690{
691    /// Write the value to the writer
692    ///
693    /// Returns the number of bytes written.
694    ///
695    /// Each call writes a complete single-object encoded message (header + data),
696    /// making each message independently decodable.
697    pub fn write_value<W: Write>(&self, data: T, writer: &mut W) -> AvroResult<usize> {
698        writer
699            .write_all(&self.header)
700            .map_err(Details::WriteBytes)?;
701        let value: Value = data.into();
702        let bytes = write_value_ref_owned_resolved(&self.resolved, &value, writer)?;
703        Ok(bytes + self.header.len())
704    }
705}
706
707impl<T> SpecificSingleObjectWriter<T>
708where
709    T: AvroSchema + Serialize,
710{
711    /// Write the object to the writer.
712    ///
713    /// Returns the number of bytes written.
714    ///
715    /// Each call writes a complete single-object encoded message (header + data),
716    /// making each message independently decodable.
717    pub fn write_ref<W: Write>(&self, data: &T, writer: &mut W) -> AvroResult<usize> {
718        writer
719            .write_all(&self.header)
720            .map_err(Details::WriteBytes)?;
721
722        let bytes = write_avro_datum_ref(
723            self.resolved.get_root_schema(),
724            self.resolved.get_names(),
725            data,
726            writer,
727        )?;
728
729        Ok(bytes + self.header.len())
730    }
731
732    /// Write the object to the writer.
733    ///
734    /// Returns the number of bytes written.
735    ///
736    /// Each call writes a complete single-object encoded message (header + data),
737    /// making each message independently decodable.
738    pub fn write<W: Write>(&self, data: T, writer: &mut W) -> AvroResult<usize> {
739        self.write_ref(&data, writer)
740    }
741}
742
743fn write_value_ref_owned_resolved<W: Write>(
744    resolved_schema: &ResolvedOwnedSchema,
745    value: &Value,
746    writer: &mut W,
747) -> AvroResult<usize> {
748    let root_schema = resolved_schema.get_root_schema();
749    if let Some(reason) = value.validate_internal(
750        root_schema,
751        resolved_schema.get_names(),
752        &root_schema.namespace(),
753    ) {
754        return Err(Details::ValidationWithReason {
755            value: value.clone(),
756            schema: root_schema.clone(),
757            reason,
758        }
759        .into());
760    }
761    encode_internal(
762        value,
763        root_schema,
764        resolved_schema.get_names(),
765        &root_schema.namespace(),
766        writer,
767    )
768}
769
770/// Encode a compatible value (implementing the `ToAvro` trait) into Avro format, also
771/// performing schema validation.
772///
773/// **NOTE**: This function has a quite small niche of usage and does NOT generate headers and sync
774/// markers; use [`Writer`] to be fully Avro-compatible if you don't know what
775/// you are doing, instead.
776pub fn to_avro_datum<T: Into<Value>>(schema: &Schema, value: T) -> AvroResult<Vec<u8>> {
777    let mut buffer = Vec::new();
778    write_avro_datum(schema, value, &mut buffer)?;
779    Ok(buffer)
780}
781
782/// Write the referenced [Serialize]able object to the provided [Write] object.
783/// Returns a result with the number of bytes written.
784///
785/// **NOTE**: This function has a quite small niche of usage and does **NOT** generate headers and sync
786/// markers; use [`append_ser`](Writer::append_ser) to be fully Avro-compatible
787/// if you don't know what you are doing, instead.
788pub fn write_avro_datum_ref<T: Serialize, W: Write>(
789    schema: &Schema,
790    names: &NamesRef,
791    data: &T,
792    writer: &mut W,
793) -> AvroResult<usize> {
794    let mut serializer = SchemaAwareWriteSerializer::new(writer, schema, names, None);
795    data.serialize(&mut serializer)
796}
797
798/// Encode a compatible value (implementing the `ToAvro` trait) into Avro format, also
799/// performing schema validation.
800/// If the provided `schema` is incomplete then its dependencies must be
801/// provided in `schemata`
802pub fn to_avro_datum_schemata<T: Into<Value>>(
803    schema: &Schema,
804    schemata: Vec<&Schema>,
805    value: T,
806) -> AvroResult<Vec<u8>> {
807    let mut buffer = Vec::new();
808    write_avro_datum_schemata(schema, schemata, value, &mut buffer)?;
809    Ok(buffer)
810}
811
812#[cfg(not(target_arch = "wasm32"))]
813fn generate_sync_marker() -> [u8; 16] {
814    let mut marker = [0_u8; 16];
815    std::iter::repeat_with(rand::random)
816        .take(16)
817        .enumerate()
818        .for_each(|(i, n)| marker[i] = n);
819    marker
820}
821
822#[cfg(target_arch = "wasm32")]
823fn generate_sync_marker() -> [u8; 16] {
824    let mut marker = [0_u8; 16];
825    std::iter::repeat_with(quad_rand::rand)
826        .take(4)
827        .flat_map(|i| i.to_be_bytes())
828        .enumerate()
829        .for_each(|(i, n)| marker[i] = n);
830    marker
831}
832
833#[cfg(test)]
834mod tests {
835    use std::{cell::RefCell, rc::Rc};
836
837    use super::*;
838    use crate::{
839        Reader,
840        decimal::Decimal,
841        duration::{Days, Duration, Millis, Months},
842        headers::GlueSchemaUuidHeader,
843        rabin::Rabin,
844        schema::{DecimalSchema, FixedSchema, Name},
845        types::Record,
846        util::zig_i64,
847    };
848    use pretty_assertions::assert_eq;
849    use serde::{Deserialize, Serialize};
850    use uuid::Uuid;
851
852    use crate::schema::InnerDecimalSchema;
853    use crate::{codec::DeflateSettings, error::Details};
854    use apache_avro_test_helper::TestResult;
855
856    const AVRO_OBJECT_HEADER_LEN: usize = AVRO_OBJECT_HEADER.len();
857
858    const SCHEMA: &str = r#"
859    {
860      "type": "record",
861      "name": "test",
862      "fields": [
863        {
864          "name": "a",
865          "type": "long",
866          "default": 42
867        },
868        {
869          "name": "b",
870          "type": "string"
871        }
872      ]
873    }
874    "#;
875
876    const UNION_SCHEMA: &str = r#"["null", "long"]"#;
877
878    #[test]
879    fn test_to_avro_datum() -> TestResult {
880        let schema = Schema::parse_str(SCHEMA)?;
881        let mut record = Record::new(&schema).unwrap();
882        record.put("a", 27i64);
883        record.put("b", "foo");
884
885        let mut expected = Vec::new();
886        zig_i64(27, &mut expected)?;
887        zig_i64(3, &mut expected)?;
888        expected.extend([b'f', b'o', b'o']);
889
890        assert_eq!(to_avro_datum(&schema, record)?, expected);
891
892        Ok(())
893    }
894
895    #[test]
896    fn avro_rs_193_write_avro_datum_ref() -> TestResult {
897        #[derive(Serialize)]
898        struct TestStruct {
899            a: i64,
900            b: String,
901        }
902
903        let schema = Schema::parse_str(SCHEMA)?;
904        let mut writer: Vec<u8> = Vec::new();
905        let data = TestStruct {
906            a: 27,
907            b: "foo".to_string(),
908        };
909
910        let mut expected = Vec::new();
911        zig_i64(27, &mut expected)?;
912        zig_i64(3, &mut expected)?;
913        expected.extend([b'f', b'o', b'o']);
914
915        let bytes = write_avro_datum_ref(&schema, &HashMap::new(), &data, &mut writer)?;
916
917        assert_eq!(bytes, expected.len());
918        assert_eq!(writer, expected);
919
920        Ok(())
921    }
922
923    #[test]
924    fn avro_rs_220_flush_write_header() -> TestResult {
925        let schema = Schema::parse_str(SCHEMA)?;
926
927        // By default flush should write the header even if nothing was added yet
928        let mut writer = Writer::new(&schema, Vec::new())?;
929        writer.flush()?;
930        let result = writer.into_inner()?;
931        assert_eq!(result.len(), 147);
932
933        // Unless the user indicates via the builder that the header has already been written
934        let mut writer = Writer::builder()
935            .has_header(true)
936            .schema(&schema)
937            .writer(Vec::new())
938            .build()?;
939        writer.flush()?;
940        let result = writer.into_inner()?;
941        assert_eq!(result.len(), 0);
942
943        Ok(())
944    }
945
946    #[test]
947    fn test_union_not_null() -> TestResult {
948        let schema = Schema::parse_str(UNION_SCHEMA)?;
949        let union = Value::Union(1, Box::new(Value::Long(3)));
950
951        let mut expected = Vec::new();
952        zig_i64(1, &mut expected)?;
953        zig_i64(3, &mut expected)?;
954
955        assert_eq!(to_avro_datum(&schema, union)?, expected);
956
957        Ok(())
958    }
959
960    #[test]
961    fn test_union_null() -> TestResult {
962        let schema = Schema::parse_str(UNION_SCHEMA)?;
963        let union = Value::Union(0, Box::new(Value::Null));
964
965        let mut expected = Vec::new();
966        zig_i64(0, &mut expected)?;
967
968        assert_eq!(to_avro_datum(&schema, union)?, expected);
969
970        Ok(())
971    }
972
973    fn logical_type_test<T: Into<Value> + Clone>(
974        schema_str: &'static str,
975
976        expected_schema: &Schema,
977        value: Value,
978
979        raw_schema: &Schema,
980        raw_value: T,
981    ) -> TestResult {
982        let schema = Schema::parse_str(schema_str)?;
983        assert_eq!(&schema, expected_schema);
984        // The serialized format should be the same as the schema.
985        let ser = to_avro_datum(&schema, value.clone())?;
986        let raw_ser = to_avro_datum(raw_schema, raw_value)?;
987        assert_eq!(ser, raw_ser);
988
989        // Should deserialize from the schema into the logical type.
990        let mut r = ser.as_slice();
991        let de = crate::from_avro_datum(&schema, &mut r, None)?;
992        assert_eq!(de, value);
993        Ok(())
994    }
995
996    #[test]
997    fn date() -> TestResult {
998        logical_type_test(
999            r#"{"type": "int", "logicalType": "date"}"#,
1000            &Schema::Date,
1001            Value::Date(1_i32),
1002            &Schema::Int,
1003            1_i32,
1004        )
1005    }
1006
1007    #[test]
1008    fn time_millis() -> TestResult {
1009        logical_type_test(
1010            r#"{"type": "int", "logicalType": "time-millis"}"#,
1011            &Schema::TimeMillis,
1012            Value::TimeMillis(1_i32),
1013            &Schema::Int,
1014            1_i32,
1015        )
1016    }
1017
1018    #[test]
1019    fn time_micros() -> TestResult {
1020        logical_type_test(
1021            r#"{"type": "long", "logicalType": "time-micros"}"#,
1022            &Schema::TimeMicros,
1023            Value::TimeMicros(1_i64),
1024            &Schema::Long,
1025            1_i64,
1026        )
1027    }
1028
1029    #[test]
1030    fn timestamp_millis() -> TestResult {
1031        logical_type_test(
1032            r#"{"type": "long", "logicalType": "timestamp-millis"}"#,
1033            &Schema::TimestampMillis,
1034            Value::TimestampMillis(1_i64),
1035            &Schema::Long,
1036            1_i64,
1037        )
1038    }
1039
1040    #[test]
1041    fn timestamp_micros() -> TestResult {
1042        logical_type_test(
1043            r#"{"type": "long", "logicalType": "timestamp-micros"}"#,
1044            &Schema::TimestampMicros,
1045            Value::TimestampMicros(1_i64),
1046            &Schema::Long,
1047            1_i64,
1048        )
1049    }
1050
1051    #[test]
1052    fn decimal_fixed() -> TestResult {
1053        let size = 30;
1054        let fixed = FixedSchema {
1055            name: Name::new("decimal")?,
1056            aliases: None,
1057            doc: None,
1058            size,
1059            default: None,
1060            attributes: Default::default(),
1061        };
1062        let inner = InnerDecimalSchema::Fixed(fixed.clone());
1063        let value = vec![0u8; size];
1064        logical_type_test(
1065            r#"{"type": {"type": "fixed", "size": 30, "name": "decimal"}, "logicalType": "decimal", "precision": 20, "scale": 5}"#,
1066            &Schema::Decimal(DecimalSchema {
1067                precision: 20,
1068                scale: 5,
1069                inner,
1070            }),
1071            Value::Decimal(Decimal::from(value.clone())),
1072            &Schema::Fixed(fixed),
1073            Value::Fixed(size, value),
1074        )
1075    }
1076
1077    #[test]
1078    fn decimal_bytes() -> TestResult {
1079        let value = vec![0u8; 10];
1080        logical_type_test(
1081            r#"{"type": "bytes", "logicalType": "decimal", "precision": 4, "scale": 3}"#,
1082            &Schema::Decimal(DecimalSchema {
1083                precision: 4,
1084                scale: 3,
1085                inner: InnerDecimalSchema::Bytes,
1086            }),
1087            Value::Decimal(Decimal::from(value.clone())),
1088            &Schema::Bytes,
1089            value,
1090        )
1091    }
1092
1093    #[test]
1094    fn duration() -> TestResult {
1095        let inner = Schema::Fixed(FixedSchema {
1096            name: Name::new("duration")?,
1097            aliases: None,
1098            doc: None,
1099            size: 12,
1100            default: None,
1101            attributes: Default::default(),
1102        });
1103        let value = Value::Duration(Duration::new(
1104            Months::new(256),
1105            Days::new(512),
1106            Millis::new(1024),
1107        ));
1108        logical_type_test(
1109            r#"{"type": {"type": "fixed", "name": "duration", "size": 12}, "logicalType": "duration"}"#,
1110            &Schema::Duration(FixedSchema {
1111                name: Name::try_from("duration").expect("Name is valid"),
1112                aliases: None,
1113                doc: None,
1114                size: 12,
1115                default: None,
1116                attributes: Default::default(),
1117            }),
1118            value,
1119            &inner,
1120            Value::Fixed(12, vec![0, 1, 0, 0, 0, 2, 0, 0, 0, 4, 0, 0]),
1121        )
1122    }
1123
1124    #[test]
1125    fn test_writer_append() -> TestResult {
1126        let schema = Schema::parse_str(SCHEMA)?;
1127        let mut writer = Writer::new(&schema, Vec::new())?;
1128
1129        let mut record = Record::new(&schema).unwrap();
1130        record.put("a", 27i64);
1131        record.put("b", "foo");
1132
1133        let n1 = writer.append_value(record.clone())?;
1134        let n2 = writer.append_value(record.clone())?;
1135        let n3 = writer.flush()?;
1136        let result = writer.into_inner()?;
1137
1138        assert_eq!(n1 + n2 + n3, result.len());
1139
1140        let mut data = Vec::new();
1141        zig_i64(27, &mut data)?;
1142        zig_i64(3, &mut data)?;
1143        data.extend(b"foo");
1144        data.extend(data.clone());
1145
1146        // starts with magic
1147        assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1148        // ends with data and sync marker
1149        let last_data_byte = result.len() - 16;
1150        assert_eq!(
1151            &result[last_data_byte - data.len()..last_data_byte],
1152            data.as_slice()
1153        );
1154
1155        Ok(())
1156    }
1157
1158    #[test]
1159    fn test_writer_extend() -> TestResult {
1160        let schema = Schema::parse_str(SCHEMA)?;
1161        let mut writer = Writer::new(&schema, Vec::new())?;
1162
1163        let mut record = Record::new(&schema).unwrap();
1164        record.put("a", 27i64);
1165        record.put("b", "foo");
1166        let record_copy = record.clone();
1167        let records = vec![record, record_copy];
1168
1169        let n1 = writer.extend(records)?;
1170        let n2 = writer.flush()?;
1171        let result = writer.into_inner()?;
1172
1173        assert_eq!(n1 + n2, result.len());
1174
1175        let mut data = Vec::new();
1176        zig_i64(27, &mut data)?;
1177        zig_i64(3, &mut data)?;
1178        data.extend(b"foo");
1179        data.extend(data.clone());
1180
1181        // starts with magic
1182        assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1183        // ends with data and sync marker
1184        let last_data_byte = result.len() - 16;
1185        assert_eq!(
1186            &result[last_data_byte - data.len()..last_data_byte],
1187            data.as_slice()
1188        );
1189
1190        Ok(())
1191    }
1192
1193    #[derive(Debug, Clone, Deserialize, Serialize)]
1194    struct TestSerdeSerialize {
1195        a: i64,
1196        b: String,
1197    }
1198
1199    #[test]
1200    fn test_writer_append_ser() -> TestResult {
1201        let schema = Schema::parse_str(SCHEMA)?;
1202        let mut writer = Writer::new(&schema, Vec::new())?;
1203
1204        let record = TestSerdeSerialize {
1205            a: 27,
1206            b: "foo".to_owned(),
1207        };
1208
1209        let n1 = writer.append_ser(record)?;
1210        let n2 = writer.flush()?;
1211        let result = writer.into_inner()?;
1212
1213        assert_eq!(n1 + n2, result.len());
1214
1215        let mut data = Vec::new();
1216        zig_i64(27, &mut data)?;
1217        zig_i64(3, &mut data)?;
1218        data.extend(b"foo");
1219
1220        // starts with magic
1221        assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1222        // ends with data and sync marker
1223        let last_data_byte = result.len() - 16;
1224        assert_eq!(
1225            &result[last_data_byte - data.len()..last_data_byte],
1226            data.as_slice()
1227        );
1228
1229        Ok(())
1230    }
1231
1232    #[test]
1233    fn test_writer_extend_ser() -> TestResult {
1234        let schema = Schema::parse_str(SCHEMA)?;
1235        let mut writer = Writer::new(&schema, Vec::new())?;
1236
1237        let record = TestSerdeSerialize {
1238            a: 27,
1239            b: "foo".to_owned(),
1240        };
1241        let record_copy = record.clone();
1242        let records = vec![record, record_copy];
1243
1244        let n1 = writer.extend_ser(records)?;
1245        let n2 = writer.flush()?;
1246        let result = writer.into_inner()?;
1247
1248        assert_eq!(n1 + n2, result.len());
1249
1250        let mut data = Vec::new();
1251        zig_i64(27, &mut data)?;
1252        zig_i64(3, &mut data)?;
1253        data.extend(b"foo");
1254        data.extend(data.clone());
1255
1256        // starts with magic
1257        assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1258        // ends with data and sync marker
1259        let last_data_byte = result.len() - 16;
1260        assert_eq!(
1261            &result[last_data_byte - data.len()..last_data_byte],
1262            data.as_slice()
1263        );
1264
1265        Ok(())
1266    }
1267
1268    fn make_writer_with_codec(schema: &Schema) -> AvroResult<Writer<'_, Vec<u8>>> {
1269        Writer::with_codec(
1270            schema,
1271            Vec::new(),
1272            Codec::Deflate(DeflateSettings::default()),
1273        )
1274    }
1275
1276    fn make_writer_with_builder(schema: &Schema) -> AvroResult<Writer<'_, Vec<u8>>> {
1277        Writer::builder()
1278            .writer(Vec::new())
1279            .schema(schema)
1280            .codec(Codec::Deflate(DeflateSettings::default()))
1281            .block_size(100)
1282            .build()
1283    }
1284
1285    fn check_writer(mut writer: Writer<'_, Vec<u8>>, schema: &Schema) -> TestResult {
1286        let mut record = Record::new(schema).unwrap();
1287        record.put("a", 27i64);
1288        record.put("b", "foo");
1289
1290        let n1 = writer.append_value(record.clone())?;
1291        let n2 = writer.append_value(record.clone())?;
1292        let n3 = writer.flush()?;
1293        let result = writer.into_inner()?;
1294
1295        assert_eq!(n1 + n2 + n3, result.len());
1296
1297        let mut data = Vec::new();
1298        zig_i64(27, &mut data)?;
1299        zig_i64(3, &mut data)?;
1300        data.extend(b"foo");
1301        data.extend(data.clone());
1302        Codec::Deflate(DeflateSettings::default()).compress(&mut data)?;
1303
1304        // starts with magic
1305        assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1306        // ends with data and sync marker
1307        let last_data_byte = result.len() - 16;
1308        assert_eq!(
1309            &result[last_data_byte - data.len()..last_data_byte],
1310            data.as_slice()
1311        );
1312
1313        Ok(())
1314    }
1315
1316    #[test]
1317    fn test_writer_with_codec() -> TestResult {
1318        let schema = Schema::parse_str(SCHEMA)?;
1319        let writer = make_writer_with_codec(&schema)?;
1320        check_writer(writer, &schema)
1321    }
1322
1323    #[test]
1324    fn test_writer_with_builder() -> TestResult {
1325        let schema = Schema::parse_str(SCHEMA)?;
1326        let writer = make_writer_with_builder(&schema)?;
1327        check_writer(writer, &schema)
1328    }
1329
1330    #[test]
1331    fn test_logical_writer() -> TestResult {
1332        const LOGICAL_TYPE_SCHEMA: &str = r#"
1333        {
1334          "type": "record",
1335          "name": "logical_type_test",
1336          "fields": [
1337            {
1338              "name": "a",
1339              "type": [
1340                "null",
1341                {
1342                  "type": "long",
1343                  "logicalType": "timestamp-micros"
1344                }
1345              ]
1346            }
1347          ]
1348        }
1349        "#;
1350        let codec = Codec::Deflate(DeflateSettings::default());
1351        let schema = Schema::parse_str(LOGICAL_TYPE_SCHEMA)?;
1352        let mut writer = Writer::builder()
1353            .schema(&schema)
1354            .codec(codec)
1355            .writer(Vec::new())
1356            .build()?;
1357
1358        let mut record1 = Record::new(&schema).unwrap();
1359        record1.put(
1360            "a",
1361            Value::Union(1, Box::new(Value::TimestampMicros(1234_i64))),
1362        );
1363
1364        let mut record2 = Record::new(&schema).unwrap();
1365        record2.put("a", Value::Union(0, Box::new(Value::Null)));
1366
1367        let n1 = writer.append_value(record1)?;
1368        let n2 = writer.append_value(record2)?;
1369        let n3 = writer.flush()?;
1370        let result = writer.into_inner()?;
1371
1372        assert_eq!(n1 + n2 + n3, result.len());
1373
1374        let mut data = Vec::new();
1375        // byte indicating not null
1376        zig_i64(1, &mut data)?;
1377        zig_i64(1234, &mut data)?;
1378
1379        // byte indicating null
1380        zig_i64(0, &mut data)?;
1381        codec.compress(&mut data)?;
1382
1383        // starts with magic
1384        assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1385        // ends with data and sync marker
1386        let last_data_byte = result.len() - 16;
1387        assert_eq!(
1388            &result[last_data_byte - data.len()..last_data_byte],
1389            data.as_slice()
1390        );
1391
1392        Ok(())
1393    }
1394
1395    #[test]
1396    fn test_avro_3405_writer_add_metadata_success() -> TestResult {
1397        let schema = Schema::parse_str(SCHEMA)?;
1398        let mut writer = Writer::new(&schema, Vec::new())?;
1399
1400        writer.add_user_metadata("stringKey".to_string(), String::from("stringValue"))?;
1401        writer.add_user_metadata("strKey".to_string(), "strValue")?;
1402        writer.add_user_metadata("bytesKey".to_string(), b"bytesValue")?;
1403        writer.add_user_metadata("vecKey".to_string(), vec![1, 2, 3])?;
1404
1405        let mut record = Record::new(&schema).unwrap();
1406        record.put("a", 27i64);
1407        record.put("b", "foo");
1408
1409        writer.append_value(record.clone())?;
1410        writer.append_value(record.clone())?;
1411        writer.flush()?;
1412        let result = writer.into_inner()?;
1413
1414        assert_eq!(result.len(), 244);
1415
1416        Ok(())
1417    }
1418
1419    #[test]
1420    fn test_avro_3881_metadata_empty_body() -> TestResult {
1421        let schema = Schema::parse_str(SCHEMA)?;
1422        let mut writer = Writer::new(&schema, Vec::new())?;
1423        writer.add_user_metadata("a".to_string(), "b")?;
1424        let result = writer.into_inner()?;
1425
1426        let reader = Reader::with_schema(&schema, &result[..])?;
1427        let mut expected = HashMap::new();
1428        expected.insert("a".to_string(), vec![b'b']);
1429        assert_eq!(reader.user_metadata(), &expected);
1430        assert_eq!(reader.into_iter().count(), 0);
1431
1432        Ok(())
1433    }
1434
1435    #[test]
1436    fn test_avro_3405_writer_add_metadata_failure() -> TestResult {
1437        let schema = Schema::parse_str(SCHEMA)?;
1438        let mut writer = Writer::new(&schema, Vec::new())?;
1439
1440        let mut record = Record::new(&schema).unwrap();
1441        record.put("a", 27i64);
1442        record.put("b", "foo");
1443        writer.append_value(record.clone())?;
1444
1445        match writer
1446            .add_user_metadata("stringKey".to_string(), String::from("value2"))
1447            .map_err(Error::into_details)
1448        {
1449            Err(e @ Details::FileHeaderAlreadyWritten) => {
1450                assert_eq!(e.to_string(), "The file metadata is already flushed.")
1451            }
1452            Err(e) => panic!("Unexpected error occurred while writing user metadata: {e:?}"),
1453            Ok(_) => panic!("Expected an error that metadata cannot be added after adding data"),
1454        }
1455
1456        Ok(())
1457    }
1458
1459    #[test]
1460    fn test_avro_3405_writer_add_metadata_reserved_prefix_failure() -> TestResult {
1461        let schema = Schema::parse_str(SCHEMA)?;
1462        let mut writer = Writer::new(&schema, Vec::new())?;
1463
1464        let key = "avro.stringKey".to_string();
1465        match writer
1466            .add_user_metadata(key.clone(), "value")
1467            .map_err(Error::into_details)
1468        {
1469            Err(ref e @ Details::InvalidMetadataKey(_)) => {
1470                assert_eq!(
1471                    e.to_string(),
1472                    format!(
1473                        "Metadata keys starting with 'avro.' are reserved for internal usage: {key}."
1474                    )
1475                )
1476            }
1477            Err(e) => panic!(
1478                "Unexpected error occurred while writing user metadata with reserved prefix ('avro.'): {e:?}"
1479            ),
1480            Ok(_) => {
1481                panic!("Expected an error that the metadata key cannot be prefixed with 'avro.'")
1482            }
1483        }
1484
1485        Ok(())
1486    }
1487
1488    #[test]
1489    fn test_avro_3405_writer_add_metadata_with_builder_api_success() -> TestResult {
1490        let schema = Schema::parse_str(SCHEMA)?;
1491
1492        let mut user_meta_data: HashMap<String, Value> = HashMap::new();
1493        user_meta_data.insert(
1494            "stringKey".to_string(),
1495            Value::String("stringValue".to_string()),
1496        );
1497        user_meta_data.insert("bytesKey".to_string(), Value::Bytes(b"bytesValue".to_vec()));
1498        user_meta_data.insert("vecKey".to_string(), Value::Bytes(vec![1, 2, 3]));
1499
1500        let writer: Writer<'_, Vec<u8>> = Writer::builder()
1501            .writer(Vec::new())
1502            .schema(&schema)
1503            .user_metadata(user_meta_data.clone())
1504            .build()?;
1505
1506        assert_eq!(writer.user_metadata, user_meta_data);
1507
1508        Ok(())
1509    }
1510
1511    #[derive(Serialize, Clone)]
1512    struct TestSingleObjectWriter {
1513        a: i64,
1514        b: f64,
1515        c: Vec<String>,
1516    }
1517
1518    impl AvroSchema for TestSingleObjectWriter {
1519        fn get_schema() -> Schema {
1520            let schema = r#"
1521            {
1522                "type":"record",
1523                "name":"TestSingleObjectWrtierSerialize",
1524                "fields":[
1525                    {
1526                        "name":"a",
1527                        "type":"long"
1528                    },
1529                    {
1530                        "name":"b",
1531                        "type":"double"
1532                    },
1533                    {
1534                        "name":"c",
1535                        "type":{
1536                            "type":"array",
1537                            "items":"string"
1538                        }
1539                    }
1540                ]
1541            }
1542            "#;
1543            Schema::parse_str(schema).unwrap()
1544        }
1545    }
1546
1547    impl From<TestSingleObjectWriter> for Value {
1548        fn from(obj: TestSingleObjectWriter) -> Value {
1549            Value::Record(vec![
1550                ("a".into(), obj.a.into()),
1551                ("b".into(), obj.b.into()),
1552                (
1553                    "c".into(),
1554                    Value::Array(obj.c.into_iter().map(|s| s.into()).collect()),
1555                ),
1556            ])
1557        }
1558    }
1559
1560    #[test]
1561    fn test_single_object_writer() -> TestResult {
1562        let mut buf: Vec<u8> = Vec::new();
1563        let obj = TestSingleObjectWriter {
1564            a: 300,
1565            b: 34.555,
1566            c: vec!["cat".into(), "dog".into()],
1567        };
1568        let mut writer = GenericSingleObjectWriter::new_with_capacity(
1569            &TestSingleObjectWriter::get_schema(),
1570            1024,
1571        )
1572        .expect("Should resolve schema");
1573        let value = obj.into();
1574        let written_bytes = writer
1575            .write_value_ref(&value, &mut buf)
1576            .expect("Error serializing properly");
1577
1578        assert!(buf.len() > 10, "no bytes written");
1579        assert_eq!(buf.len(), written_bytes);
1580        assert_eq!(buf[0], 0xC3);
1581        assert_eq!(buf[1], 0x01);
1582        assert_eq!(
1583            &buf[2..10],
1584            &TestSingleObjectWriter::get_schema()
1585                .fingerprint::<Rabin>()
1586                .bytes[..]
1587        );
1588        let mut msg_binary = Vec::new();
1589        encode(
1590            &value,
1591            &TestSingleObjectWriter::get_schema(),
1592            &mut msg_binary,
1593        )
1594        .expect("encode should have failed by here as a dependency of any writing");
1595        assert_eq!(&buf[10..], &msg_binary[..]);
1596
1597        Ok(())
1598    }
1599
1600    #[test]
1601    fn test_single_object_writer_with_header_builder() -> TestResult {
1602        let mut buf: Vec<u8> = Vec::new();
1603        let obj = TestSingleObjectWriter {
1604            a: 300,
1605            b: 34.555,
1606            c: vec!["cat".into(), "dog".into()],
1607        };
1608        let schema_uuid = Uuid::parse_str("b2f1cf00-0434-013e-439a-125eb8485a5f")?;
1609        let header_builder = GlueSchemaUuidHeader::from_uuid(schema_uuid);
1610        let mut writer = GenericSingleObjectWriter::new_with_capacity_and_header_builder(
1611            &TestSingleObjectWriter::get_schema(),
1612            1024,
1613            header_builder,
1614        )
1615        .expect("Should resolve schema");
1616        let value = obj.into();
1617        writer
1618            .write_value_ref(&value, &mut buf)
1619            .expect("Error serializing properly");
1620
1621        assert_eq!(buf[0], 0x03);
1622        assert_eq!(buf[1], 0x00);
1623        assert_eq!(buf[2..18], schema_uuid.into_bytes()[..]);
1624        Ok(())
1625    }
1626
1627    #[test]
1628    fn test_writer_parity() -> TestResult {
1629        let obj1 = TestSingleObjectWriter {
1630            a: 300,
1631            b: 34.555,
1632            c: vec!["cat".into(), "dog".into()],
1633        };
1634
1635        let mut buf1: Vec<u8> = Vec::new();
1636        let mut buf2: Vec<u8> = Vec::new();
1637        let mut buf3: Vec<u8> = Vec::new();
1638        let mut buf4: Vec<u8> = Vec::new();
1639
1640        let mut generic_writer = GenericSingleObjectWriter::new_with_capacity(
1641            &TestSingleObjectWriter::get_schema(),
1642            1024,
1643        )
1644        .expect("Should resolve schema");
1645        let specific_writer = SpecificSingleObjectWriter::<TestSingleObjectWriter>::new()
1646            .expect("Resolved should pass");
1647        specific_writer
1648            .write_ref(&obj1, &mut buf1)
1649            .expect("Serialization expected");
1650        specific_writer
1651            .write_ref(&obj1, &mut buf2)
1652            .expect("Serialization expected");
1653        specific_writer
1654            .write_value(obj1.clone(), &mut buf3)
1655            .expect("Serialization expected");
1656
1657        generic_writer
1658            .write_value(obj1.into(), &mut buf4)
1659            .expect("Serialization expected");
1660
1661        assert_eq!(buf1, buf2);
1662        assert_eq!(buf2, buf3);
1663        assert_eq!(buf3, buf4);
1664
1665        Ok(())
1666    }
1667
1668    #[test]
1669    fn avro_3894_take_aliases_into_account_when_serializing() -> TestResult {
1670        const SCHEMA: &str = r#"
1671  {
1672      "type": "record",
1673      "name": "Conference",
1674      "fields": [
1675          {"type": "string", "name": "name"},
1676          {"type": ["null", "long"], "name": "date", "aliases" : [ "time2", "time" ]}
1677      ]
1678  }"#;
1679
1680        #[derive(Debug, PartialEq, Eq, Clone, Serialize)]
1681        pub struct Conference {
1682            pub name: String,
1683            pub time: Option<i64>,
1684        }
1685
1686        let conf = Conference {
1687            name: "RustConf".to_string(),
1688            time: Some(1234567890),
1689        };
1690
1691        let schema = Schema::parse_str(SCHEMA)?;
1692        let mut writer = Writer::new(&schema, Vec::new())?;
1693
1694        let bytes = writer.append_ser(conf)?;
1695
1696        assert_eq!(182, bytes);
1697
1698        Ok(())
1699    }
1700
1701    #[test]
1702    fn avro_4014_validation_returns_a_detailed_error() -> TestResult {
1703        const SCHEMA: &str = r#"
1704  {
1705      "type": "record",
1706      "name": "Conference",
1707      "fields": [
1708          {"type": "string", "name": "name"},
1709          {"type": ["null", "long"], "name": "date", "aliases" : [ "time2", "time" ]}
1710      ]
1711  }"#;
1712
1713        #[derive(Debug, PartialEq, Clone, Serialize)]
1714        pub struct Conference {
1715            pub name: String,
1716            pub time: Option<f64>, // wrong type: f64 instead of i64
1717        }
1718
1719        let conf = Conference {
1720            name: "RustConf".to_string(),
1721            time: Some(12345678.90),
1722        };
1723
1724        let schema = Schema::parse_str(SCHEMA)?;
1725        let mut writer = Writer::new(&schema, Vec::new())?;
1726
1727        match writer.append_ser(conf) {
1728            Ok(bytes) => panic!("Expected an error, but got {bytes} bytes written"),
1729            Err(e) => {
1730                assert_eq!(
1731                    e.to_string(),
1732                    r#"Failed to serialize field 'time' for record Record(RecordSchema { name: Name { name: "Conference", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "name", doc: None, aliases: None, default: None, schema: String, order: Ascending, position: 0, custom_attributes: {} }, RecordField { name: "date", doc: None, aliases: Some(["time2", "time"]), default: None, schema: Union(UnionSchema { schemas: [Null, Long], variant_index: {Null: 0, Long: 1} }), order: Ascending, position: 1, custom_attributes: {} }], lookup: {"date": 1, "name": 0, "time": 1, "time2": 1}, attributes: {} }): Failed to serialize value of type f64 using schema Union(UnionSchema { schemas: [Null, Long], variant_index: {Null: 0, Long: 1} }): 12345678.9. Cause: Cannot find a Double schema in [Null, Long]"#
1733                );
1734            }
1735        }
1736        Ok(())
1737    }
1738
1739    #[test]
1740    fn avro_4063_flush_applies_to_inner_writer() -> TestResult {
1741        const SCHEMA: &str = r#"
1742        {
1743            "type": "record",
1744            "name": "ExampleSchema",
1745            "fields": [
1746                {"name": "exampleField", "type": "string"}
1747            ]
1748        }
1749        "#;
1750
1751        #[derive(Clone, Default)]
1752        struct TestBuffer(Rc<RefCell<Vec<u8>>>);
1753
1754        impl TestBuffer {
1755            fn len(&self) -> usize {
1756                self.0.borrow().len()
1757            }
1758        }
1759
1760        impl Write for TestBuffer {
1761            fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
1762                self.0.borrow_mut().write(buf)
1763            }
1764
1765            fn flush(&mut self) -> std::io::Result<()> {
1766                Ok(())
1767            }
1768        }
1769
1770        let shared_buffer = TestBuffer::default();
1771
1772        let buffered_writer = std::io::BufWriter::new(shared_buffer.clone());
1773
1774        let schema = Schema::parse_str(SCHEMA)?;
1775
1776        let mut writer = Writer::new(&schema, buffered_writer)?;
1777
1778        let mut record = Record::new(writer.schema()).unwrap();
1779        record.put("exampleField", "value");
1780
1781        writer.append_value(record)?;
1782        writer.flush()?;
1783
1784        assert_eq!(
1785            shared_buffer.len(),
1786            151,
1787            "the test buffer was not fully written to after Writer::flush was called"
1788        );
1789
1790        Ok(())
1791    }
1792
1793    #[test]
1794    fn avro_rs_439_specific_single_object_writer_ref() -> TestResult {
1795        #[derive(Serialize)]
1796        struct Recursive {
1797            field: bool,
1798            recurse: Option<Box<Recursive>>,
1799        }
1800
1801        impl AvroSchema for Recursive {
1802            fn get_schema() -> Schema {
1803                Schema::parse_str(
1804                    r#"{
1805                    "name": "Recursive",
1806                    "type": "record",
1807                    "fields": [
1808                        { "name": "field", "type": "boolean" },
1809                        { "name": "recurse", "type": ["null", "Recursive"] }
1810                    ]
1811                }"#,
1812                )
1813                .unwrap()
1814            }
1815        }
1816
1817        let mut buffer = Vec::new();
1818        let writer = SpecificSingleObjectWriter::new()?;
1819
1820        writer.write(
1821            Recursive {
1822                field: true,
1823                recurse: Some(Box::new(Recursive {
1824                    field: false,
1825                    recurse: None,
1826                })),
1827            },
1828            &mut buffer,
1829        )?;
1830        assert_eq!(
1831            buffer,
1832            &[195, 1, 83, 223, 43, 26, 181, 179, 227, 224, 1, 2, 0, 0][..]
1833        );
1834
1835        Ok(())
1836    }
1837
1838    #[test]
1839    fn avro_rs_310_append_unvalidated_value() -> TestResult {
1840        let schema = Schema::String;
1841        let value = Value::Int(1);
1842
1843        let mut writer = Writer::new(&schema, Vec::new())?;
1844        writer.unvalidated_append_value_ref(&value)?;
1845        writer.unvalidated_append_value(value)?;
1846        let buffer = writer.into_inner()?;
1847
1848        // Check the last two bytes for the sync marker
1849        assert_eq!(&buffer[buffer.len() - 18..buffer.len() - 16], &[2, 2]);
1850
1851        let mut writer = Writer::new(&schema, Vec::new())?;
1852        let value = Value::Int(1);
1853        let err = writer.append_value_ref(&value).unwrap_err();
1854        assert_eq!(
1855            err.to_string(),
1856            "Value Int(1) does not match schema String: Reason: Unsupported value-schema combination! Value: Int(1), schema: String"
1857        );
1858        let err = writer.append_value(value).unwrap_err();
1859        assert_eq!(
1860            err.to_string(),
1861            "Value Int(1) does not match schema String: Reason: Unsupported value-schema combination! Value: Int(1), schema: String"
1862        );
1863
1864        Ok(())
1865    }
1866}