apache_avro/
writer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Logic handling writing in Avro format at user level.
19use crate::{
20    encode::{encode, encode_internal, encode_to_vec},
21    rabin::Rabin,
22    schema::{AvroSchema, Name, ResolvedOwnedSchema, ResolvedSchema, Schema},
23    ser_schema::SchemaAwareWriteSerializer,
24    types::Value,
25    AvroResult, Codec, Error,
26};
27use serde::Serialize;
28use std::{collections::HashMap, io::Write, marker::PhantomData};
29
30const DEFAULT_BLOCK_SIZE: usize = 16000;
31const AVRO_OBJECT_HEADER: &[u8] = b"Obj\x01";
32
33/// Main interface for writing Avro formatted values.
34#[derive(bon::Builder)]
35pub struct Writer<'a, W: Write> {
36    schema: &'a Schema,
37    writer: W,
38    #[builder(skip)]
39    resolved_schema: Option<ResolvedSchema<'a>>,
40    #[builder(default = Codec::Null)]
41    codec: Codec,
42    #[builder(default = DEFAULT_BLOCK_SIZE)]
43    block_size: usize,
44    #[builder(skip = Vec::with_capacity(block_size))]
45    buffer: Vec<u8>,
46    #[builder(skip)]
47    num_values: usize,
48    #[builder(default = generate_sync_marker())]
49    marker: [u8; 16],
50    #[builder(default = false)]
51    has_header: bool,
52    #[builder(default)]
53    user_metadata: HashMap<String, Value>,
54}
55
56impl<'a, W: Write> Writer<'a, W> {
57    /// Creates a `Writer` given a `Schema` and something implementing the `io::Write` trait to write
58    /// to.
59    /// No compression `Codec` will be used.
60    pub fn new(schema: &'a Schema, writer: W) -> Self {
61        Writer::with_codec(schema, writer, Codec::Null)
62    }
63
64    /// Creates a `Writer` with a specific `Codec` given a `Schema` and something implementing the
65    /// `io::Write` trait to write to.
66    pub fn with_codec(schema: &'a Schema, writer: W, codec: Codec) -> Self {
67        let mut w = Self::builder()
68            .schema(schema)
69            .writer(writer)
70            .codec(codec)
71            .build();
72        w.resolved_schema = ResolvedSchema::try_from(schema).ok();
73        w
74    }
75
76    /// Creates a `Writer` with a specific `Codec` given a `Schema` and something implementing the
77    /// `io::Write` trait to write to.
78    /// If the `schema` is incomplete, i.e. contains `Schema::Ref`s then all dependencies must
79    /// be provided in `schemata`.
80    pub fn with_schemata(
81        schema: &'a Schema,
82        schemata: Vec<&'a Schema>,
83        writer: W,
84        codec: Codec,
85    ) -> Self {
86        let mut w = Self::builder()
87            .schema(schema)
88            .writer(writer)
89            .codec(codec)
90            .build();
91        w.resolved_schema = ResolvedSchema::try_from(schemata).ok();
92        w
93    }
94
95    /// Creates a `Writer` that will append values to already populated
96    /// `std::io::Write` using the provided `marker`
97    /// No compression `Codec` will be used.
98    pub fn append_to(schema: &'a Schema, writer: W, marker: [u8; 16]) -> Self {
99        Writer::append_to_with_codec(schema, writer, Codec::Null, marker)
100    }
101
102    /// Creates a `Writer` that will append values to already populated
103    /// `std::io::Write` using the provided `marker`
104    pub fn append_to_with_codec(
105        schema: &'a Schema,
106        writer: W,
107        codec: Codec,
108        marker: [u8; 16],
109    ) -> Self {
110        let mut w = Self::builder()
111            .schema(schema)
112            .writer(writer)
113            .codec(codec)
114            .marker(marker)
115            .has_header(true)
116            .build();
117        w.resolved_schema = ResolvedSchema::try_from(schema).ok();
118        w
119    }
120
121    /// Creates a `Writer` that will append values to already populated
122    /// `std::io::Write` using the provided `marker`
123    pub fn append_to_with_codec_schemata(
124        schema: &'a Schema,
125        schemata: Vec<&'a Schema>,
126        writer: W,
127        codec: Codec,
128        marker: [u8; 16],
129    ) -> Self {
130        let mut w = Self::builder()
131            .schema(schema)
132            .writer(writer)
133            .codec(codec)
134            .marker(marker)
135            .has_header(true)
136            .build();
137        w.resolved_schema = ResolvedSchema::try_from(schemata).ok();
138        w
139    }
140
141    /// Get a reference to the `Schema` associated to a `Writer`.
142    pub fn schema(&self) -> &'a Schema {
143        self.schema
144    }
145
146    /// Append a compatible value (implementing the `ToAvro` trait) to a `Writer`, also performing
147    /// schema validation.
148    ///
149    /// Return the number of bytes written (it might be 0, see below).
150    ///
151    /// **NOTE** This function is not guaranteed to perform any actual write, since it relies on
152    /// internal buffering for performance reasons. If you want to be sure the value has been
153    /// written, then call [`flush`](struct.Writer.html#method.flush).
154    pub fn append<T: Into<Value>>(&mut self, value: T) -> AvroResult<usize> {
155        let n = self.maybe_write_header()?;
156
157        let avro = value.into();
158        self.append_value_ref(&avro).map(|m| m + n)
159    }
160
161    /// Append a compatible value to a `Writer`, also performing schema validation.
162    ///
163    /// Return the number of bytes written (it might be 0, see below).
164    ///
165    /// **NOTE** This function is not guaranteed to perform any actual write, since it relies on
166    /// internal buffering for performance reasons. If you want to be sure the value has been
167    /// written, then call [`flush`](struct.Writer.html#method.flush).
168    pub fn append_value_ref(&mut self, value: &Value) -> AvroResult<usize> {
169        let n = self.maybe_write_header()?;
170
171        // Lazy init for users using the builder pattern with error throwing
172        match self.resolved_schema {
173            Some(ref rs) => {
174                write_value_ref_resolved(self.schema, rs, value, &mut self.buffer)?;
175                self.num_values += 1;
176
177                if self.buffer.len() >= self.block_size {
178                    return self.flush().map(|b| b + n);
179                }
180
181                Ok(n)
182            }
183            None => {
184                let rs = ResolvedSchema::try_from(self.schema)?;
185                self.resolved_schema = Some(rs);
186                self.append_value_ref(value)
187            }
188        }
189    }
190
191    /// Append anything implementing the `Serialize` trait to a `Writer` for
192    /// [`serde`](https://docs.serde.rs/serde/index.html) compatibility, also performing schema
193    /// validation.
194    ///
195    /// Return the number of bytes written.
196    ///
197    /// **NOTE** This function is not guaranteed to perform any actual write, since it relies on
198    /// internal buffering for performance reasons. If you want to be sure the value has been
199    /// written, then call [`flush`](struct.Writer.html#method.flush).
200    pub fn append_ser<S: Serialize>(&mut self, value: S) -> AvroResult<usize> {
201        let n = self.maybe_write_header()?;
202
203        match self.resolved_schema {
204            Some(ref rs) => {
205                let mut serializer = SchemaAwareWriteSerializer::new(
206                    &mut self.buffer,
207                    self.schema,
208                    rs.get_names(),
209                    None,
210                );
211                value.serialize(&mut serializer)?;
212                self.num_values += 1;
213
214                if self.buffer.len() >= self.block_size {
215                    return self.flush().map(|b| b + n);
216                }
217
218                Ok(n)
219            }
220            None => {
221                let rs = ResolvedSchema::try_from(self.schema)?;
222                self.resolved_schema = Some(rs);
223                self.append_ser(value)
224            }
225        }
226    }
227
228    /// Extend a `Writer` with an `Iterator` of compatible values (implementing the `ToAvro`
229    /// trait), also performing schema validation.
230    ///
231    /// Return the number of bytes written.
232    ///
233    /// **NOTE** This function forces the written data to be flushed (an implicit
234    /// call to [`flush`](struct.Writer.html#method.flush) is performed).
235    pub fn extend<I, T: Into<Value>>(&mut self, values: I) -> AvroResult<usize>
236    where
237        I: IntoIterator<Item = T>,
238    {
239        /*
240        https://github.com/rust-lang/rfcs/issues/811 :(
241        let mut stream = values
242            .filter_map(|value| value.serialize(&mut self.serializer).ok())
243            .map(|value| value.encode(self.schema))
244            .collect::<Option<Vec<_>>>()
245            .ok_or_else(|| err_msg("value does not match given schema"))?
246            .into_iter()
247            .fold(Vec::new(), |mut acc, stream| {
248                num_values += 1;
249                acc.extend(stream); acc
250            });
251        */
252
253        let mut num_bytes = 0;
254        for value in values {
255            num_bytes += self.append(value)?;
256        }
257        num_bytes += self.flush()?;
258
259        Ok(num_bytes)
260    }
261
262    /// Extend a `Writer` with an `Iterator` of anything implementing the `Serialize` trait for
263    /// [`serde`](https://docs.serde.rs/serde/index.html) compatibility, also performing schema
264    /// validation.
265    ///
266    /// Return the number of bytes written.
267    ///
268    /// **NOTE** This function forces the written data to be flushed (an implicit
269    /// call to [`flush`](struct.Writer.html#method.flush) is performed).
270    pub fn extend_ser<I, T: Serialize>(&mut self, values: I) -> AvroResult<usize>
271    where
272        I: IntoIterator<Item = T>,
273    {
274        /*
275        https://github.com/rust-lang/rfcs/issues/811 :(
276        let mut stream = values
277            .filter_map(|value| value.serialize(&mut self.serializer).ok())
278            .map(|value| value.encode(self.schema))
279            .collect::<Option<Vec<_>>>()
280            .ok_or_else(|| err_msg("value does not match given schema"))?
281            .into_iter()
282            .fold(Vec::new(), |mut acc, stream| {
283                num_values += 1;
284                acc.extend(stream); acc
285            });
286        */
287
288        let mut num_bytes = 0;
289        for value in values {
290            num_bytes += self.append_ser(value)?;
291        }
292        num_bytes += self.flush()?;
293
294        Ok(num_bytes)
295    }
296
297    /// Extend a `Writer` by appending each `Value` from a slice, while also performing schema
298    /// validation on each value appended.
299    ///
300    /// Return the number of bytes written.
301    ///
302    /// **NOTE** This function forces the written data to be flushed (an implicit
303    /// call to [`flush`](struct.Writer.html#method.flush) is performed).
304    pub fn extend_from_slice(&mut self, values: &[Value]) -> AvroResult<usize> {
305        let mut num_bytes = 0;
306        for value in values {
307            num_bytes += self.append_value_ref(value)?;
308        }
309        num_bytes += self.flush()?;
310
311        Ok(num_bytes)
312    }
313
314    /// Flush the content appended to a `Writer`. Call this function to make sure all the content
315    /// has been written before releasing the `Writer`.
316    ///
317    /// Return the number of bytes written.
318    pub fn flush(&mut self) -> AvroResult<usize> {
319        if self.num_values == 0 {
320            return Ok(0);
321        }
322
323        self.codec.compress(&mut self.buffer)?;
324
325        let num_values = self.num_values;
326        let stream_len = self.buffer.len();
327
328        let num_bytes = self.append_raw(&num_values.into(), &Schema::Long)?
329            + self.append_raw(&stream_len.into(), &Schema::Long)?
330            + self
331                .writer
332                .write(self.buffer.as_ref())
333                .map_err(Error::WriteBytes)?
334            + self.append_marker()?;
335
336        self.buffer.clear();
337        self.num_values = 0;
338
339        self.writer.flush().map_err(Error::FlushWriter)?;
340
341        Ok(num_bytes)
342    }
343
344    /// Return what the `Writer` is writing to, consuming the `Writer` itself.
345    ///
346    /// **NOTE** This function forces the written data to be flushed (an implicit
347    /// call to [`flush`](struct.Writer.html#method.flush) is performed).
348    pub fn into_inner(mut self) -> AvroResult<W> {
349        self.maybe_write_header()?;
350        self.flush()?;
351        Ok(self.writer)
352    }
353
354    /// Generate and append synchronization marker to the payload.
355    fn append_marker(&mut self) -> AvroResult<usize> {
356        // using .writer.write directly to avoid mutable borrow of self
357        // with ref borrowing of self.marker
358        self.writer.write(&self.marker).map_err(Error::WriteMarker)
359    }
360
361    /// Append a raw Avro Value to the payload avoiding to encode it again.
362    fn append_raw(&mut self, value: &Value, schema: &Schema) -> AvroResult<usize> {
363        self.append_bytes(encode_to_vec(value, schema)?.as_ref())
364    }
365
366    /// Append pure bytes to the payload.
367    fn append_bytes(&mut self, bytes: &[u8]) -> AvroResult<usize> {
368        self.writer.write(bytes).map_err(Error::WriteBytes)
369    }
370
371    /// Adds custom metadata to the file.
372    /// This method could be used only before adding the first record to the writer.
373    pub fn add_user_metadata<T: AsRef<[u8]>>(&mut self, key: String, value: T) -> AvroResult<()> {
374        if !self.has_header {
375            if key.starts_with("avro.") {
376                return Err(Error::InvalidMetadataKey(key));
377            }
378            self.user_metadata
379                .insert(key, Value::Bytes(value.as_ref().to_vec()));
380            Ok(())
381        } else {
382            Err(Error::FileHeaderAlreadyWritten)
383        }
384    }
385
386    /// Create an Avro header based on schema, codec and sync marker.
387    fn header(&self) -> Result<Vec<u8>, Error> {
388        let schema_bytes = serde_json::to_string(self.schema)
389            .map_err(Error::ConvertJsonToString)?
390            .into_bytes();
391
392        let mut metadata = HashMap::with_capacity(2);
393        metadata.insert("avro.schema", Value::Bytes(schema_bytes));
394        metadata.insert("avro.codec", self.codec.into());
395        match self.codec {
396            #[cfg(feature = "bzip")]
397            Codec::Bzip2(settings) => {
398                metadata.insert(
399                    "avro.codec.compression_level",
400                    Value::Bytes(vec![settings.compression_level]),
401                );
402            }
403            #[cfg(feature = "xz")]
404            Codec::Xz(settings) => {
405                metadata.insert(
406                    "avro.codec.compression_level",
407                    Value::Bytes(vec![settings.compression_level]),
408                );
409            }
410            #[cfg(feature = "zstandard")]
411            Codec::Zstandard(settings) => {
412                metadata.insert(
413                    "avro.codec.compression_level",
414                    Value::Bytes(vec![settings.compression_level]),
415                );
416            }
417            _ => {}
418        }
419
420        for (k, v) in &self.user_metadata {
421            metadata.insert(k.as_str(), v.clone());
422        }
423
424        let mut header = Vec::new();
425        header.extend_from_slice(AVRO_OBJECT_HEADER);
426        encode(&metadata.into(), &Schema::map(Schema::Bytes), &mut header)?;
427        header.extend_from_slice(&self.marker);
428
429        Ok(header)
430    }
431
432    fn maybe_write_header(&mut self) -> AvroResult<usize> {
433        if !self.has_header {
434            let header = self.header()?;
435            let n = self.append_bytes(header.as_ref())?;
436            self.has_header = true;
437            Ok(n)
438        } else {
439            Ok(0)
440        }
441    }
442}
443
444/// Encode a compatible value (implementing the `ToAvro` trait) into Avro format, also performing
445/// schema validation.
446///
447/// This is an internal function which gets the bytes buffer where to write as parameter instead of
448/// creating a new one like `to_avro_datum`.
449fn write_avro_datum<T: Into<Value>, W: Write>(
450    schema: &Schema,
451    value: T,
452    writer: &mut W,
453) -> Result<(), Error> {
454    let avro = value.into();
455    if !avro.validate(schema) {
456        return Err(Error::Validation);
457    }
458    encode(&avro, schema, writer)?;
459    Ok(())
460}
461
462fn write_avro_datum_schemata<T: Into<Value>>(
463    schema: &Schema,
464    schemata: Vec<&Schema>,
465    value: T,
466    buffer: &mut Vec<u8>,
467) -> AvroResult<usize> {
468    let avro = value.into();
469    let rs = ResolvedSchema::try_from(schemata)?;
470    let names = rs.get_names();
471    let enclosing_namespace = schema.namespace();
472    if let Some(_err) = avro.validate_internal(schema, names, &enclosing_namespace) {
473        return Err(Error::Validation);
474    }
475    encode_internal(&avro, schema, names, &enclosing_namespace, buffer)
476}
477
478/// Writer that encodes messages according to the single object encoding v1 spec
479/// Uses an API similar to the current File Writer
480/// Writes all object bytes at once, and drains internal buffer
481pub struct GenericSingleObjectWriter {
482    buffer: Vec<u8>,
483    resolved: ResolvedOwnedSchema,
484}
485
486impl GenericSingleObjectWriter {
487    pub fn new_with_capacity(
488        schema: &Schema,
489        initial_buffer_cap: usize,
490    ) -> AvroResult<GenericSingleObjectWriter> {
491        let fingerprint = schema.fingerprint::<Rabin>();
492        let mut buffer = Vec::with_capacity(initial_buffer_cap);
493        let header = [
494            0xC3,
495            0x01,
496            fingerprint.bytes[0],
497            fingerprint.bytes[1],
498            fingerprint.bytes[2],
499            fingerprint.bytes[3],
500            fingerprint.bytes[4],
501            fingerprint.bytes[5],
502            fingerprint.bytes[6],
503            fingerprint.bytes[7],
504        ];
505        buffer.extend_from_slice(&header);
506
507        Ok(GenericSingleObjectWriter {
508            buffer,
509            resolved: ResolvedOwnedSchema::try_from(schema.clone())?,
510        })
511    }
512
513    /// Write the referenced Value to the provided Write object. Returns a result with the number of bytes written including the header
514    pub fn write_value_ref<W: Write>(&mut self, v: &Value, writer: &mut W) -> AvroResult<usize> {
515        if self.buffer.len() != 10 {
516            Err(Error::IllegalSingleObjectWriterState)
517        } else {
518            write_value_ref_owned_resolved(&self.resolved, v, &mut self.buffer)?;
519            writer.write_all(&self.buffer).map_err(Error::WriteBytes)?;
520            let len = self.buffer.len();
521            self.buffer.truncate(10);
522            Ok(len)
523        }
524    }
525
526    /// Write the Value to the provided Write object. Returns a result with the number of bytes written including the header
527    pub fn write_value<W: Write>(&mut self, v: Value, writer: &mut W) -> AvroResult<usize> {
528        self.write_value_ref(&v, writer)
529    }
530}
531
532/// Writer that encodes messages according to the single object encoding v1 spec
533pub struct SpecificSingleObjectWriter<T>
534where
535    T: AvroSchema,
536{
537    inner: GenericSingleObjectWriter,
538    schema: Schema,
539    header_written: bool,
540    _model: PhantomData<T>,
541}
542
543impl<T> SpecificSingleObjectWriter<T>
544where
545    T: AvroSchema,
546{
547    pub fn with_capacity(buffer_cap: usize) -> AvroResult<SpecificSingleObjectWriter<T>> {
548        let schema = T::get_schema();
549        Ok(SpecificSingleObjectWriter {
550            inner: GenericSingleObjectWriter::new_with_capacity(&schema, buffer_cap)?,
551            schema,
552            header_written: false,
553            _model: PhantomData,
554        })
555    }
556}
557
558impl<T> SpecificSingleObjectWriter<T>
559where
560    T: AvroSchema + Into<Value>,
561{
562    /// Write the `Into<Value>` to the provided Write object. Returns a result with the number
563    /// of bytes written including the header
564    pub fn write_value<W: Write>(&mut self, data: T, writer: &mut W) -> AvroResult<usize> {
565        let v: Value = data.into();
566        self.inner.write_value_ref(&v, writer)
567    }
568}
569
570impl<T> SpecificSingleObjectWriter<T>
571where
572    T: AvroSchema + Serialize,
573{
574    /// Write the referenced `Serialize` object to the provided Write object. Returns a result with
575    /// the number of bytes written including the header
576    pub fn write_ref<W: Write>(&mut self, data: &T, writer: &mut W) -> AvroResult<usize> {
577        let mut bytes_written: usize = 0;
578
579        if !self.header_written {
580            bytes_written += writer
581                .write(self.inner.buffer.as_slice())
582                .map_err(Error::WriteBytes)?;
583            self.header_written = true;
584        }
585
586        let names: HashMap<Name, &Schema> = HashMap::new();
587        let mut serializer = SchemaAwareWriteSerializer::new(writer, &self.schema, &names, None);
588        bytes_written += data.serialize(&mut serializer)?;
589
590        Ok(bytes_written)
591    }
592
593    /// Write the Serialize object to the provided Write object. Returns a result with the number
594    /// of bytes written including the header
595    pub fn write<W: Write>(&mut self, data: T, writer: &mut W) -> AvroResult<usize> {
596        self.write_ref(&data, writer)
597    }
598}
599
600fn write_value_ref_resolved(
601    schema: &Schema,
602    resolved_schema: &ResolvedSchema,
603    value: &Value,
604    buffer: &mut Vec<u8>,
605) -> AvroResult<usize> {
606    match value.validate_internal(schema, resolved_schema.get_names(), &schema.namespace()) {
607        Some(reason) => Err(Error::ValidationWithReason {
608            value: value.clone(),
609            schema: schema.clone(),
610            reason,
611        }),
612        None => encode_internal(
613            value,
614            schema,
615            resolved_schema.get_names(),
616            &schema.namespace(),
617            buffer,
618        ),
619    }
620}
621
622fn write_value_ref_owned_resolved(
623    resolved_schema: &ResolvedOwnedSchema,
624    value: &Value,
625    buffer: &mut Vec<u8>,
626) -> AvroResult<()> {
627    let root_schema = resolved_schema.get_root_schema();
628    if let Some(reason) = value.validate_internal(
629        root_schema,
630        resolved_schema.get_names(),
631        &root_schema.namespace(),
632    ) {
633        return Err(Error::ValidationWithReason {
634            value: value.clone(),
635            schema: root_schema.clone(),
636            reason,
637        });
638    }
639    encode_internal(
640        value,
641        root_schema,
642        resolved_schema.get_names(),
643        &root_schema.namespace(),
644        buffer,
645    )?;
646    Ok(())
647}
648
649/// Encode a compatible value (implementing the `ToAvro` trait) into Avro format, also
650/// performing schema validation.
651///
652/// **NOTE** This function has a quite small niche of usage and does NOT generate headers and sync
653/// markers; use [`Writer`](struct.Writer.html) to be fully Avro-compatible if you don't know what
654/// you are doing, instead.
655pub fn to_avro_datum<T: Into<Value>>(schema: &Schema, value: T) -> AvroResult<Vec<u8>> {
656    let mut buffer = Vec::new();
657    write_avro_datum(schema, value, &mut buffer)?;
658    Ok(buffer)
659}
660
661/// Encode a compatible value (implementing the `ToAvro` trait) into Avro format, also
662/// performing schema validation.
663/// If the provided `schema` is incomplete then its dependencies must be
664/// provided in `schemata`
665pub fn to_avro_datum_schemata<T: Into<Value>>(
666    schema: &Schema,
667    schemata: Vec<&Schema>,
668    value: T,
669) -> AvroResult<Vec<u8>> {
670    let mut buffer = Vec::new();
671    write_avro_datum_schemata(schema, schemata, value, &mut buffer)?;
672    Ok(buffer)
673}
674
675#[cfg(not(target_arch = "wasm32"))]
676fn generate_sync_marker() -> [u8; 16] {
677    let mut marker = [0_u8; 16];
678    std::iter::repeat_with(rand::random)
679        .take(16)
680        .enumerate()
681        .for_each(|(i, n)| marker[i] = n);
682    marker
683}
684
685#[cfg(target_arch = "wasm32")]
686fn generate_sync_marker() -> [u8; 16] {
687    let mut marker = [0_u8; 16];
688    std::iter::repeat_with(quad_rand::rand)
689        .take(4)
690        .flat_map(|i| i.to_be_bytes())
691        .enumerate()
692        .for_each(|(i, n)| marker[i] = n);
693    marker
694}
695
696#[cfg(test)]
697mod tests {
698    use std::{cell::RefCell, rc::Rc};
699
700    use super::*;
701    use crate::{
702        decimal::Decimal,
703        duration::{Days, Duration, Millis, Months},
704        schema::{DecimalSchema, FixedSchema, Name},
705        types::Record,
706        util::zig_i64,
707        Reader,
708    };
709    use pretty_assertions::assert_eq;
710    use serde::{Deserialize, Serialize};
711
712    use crate::codec::DeflateSettings;
713    use apache_avro_test_helper::TestResult;
714
715    const AVRO_OBJECT_HEADER_LEN: usize = AVRO_OBJECT_HEADER.len();
716
717    const SCHEMA: &str = r#"
718    {
719      "type": "record",
720      "name": "test",
721      "fields": [
722        {
723          "name": "a",
724          "type": "long",
725          "default": 42
726        },
727        {
728          "name": "b",
729          "type": "string"
730        }
731      ]
732    }
733    "#;
734    const UNION_SCHEMA: &str = r#"["null", "long"]"#;
735
736    #[test]
737    fn test_to_avro_datum() -> TestResult {
738        let schema = Schema::parse_str(SCHEMA)?;
739        let mut record = Record::new(&schema).unwrap();
740        record.put("a", 27i64);
741        record.put("b", "foo");
742
743        let mut expected = Vec::new();
744        zig_i64(27, &mut expected)?;
745        zig_i64(3, &mut expected)?;
746        expected.extend([b'f', b'o', b'o']);
747
748        assert_eq!(to_avro_datum(&schema, record)?, expected);
749
750        Ok(())
751    }
752
753    #[test]
754    fn test_union_not_null() -> TestResult {
755        let schema = Schema::parse_str(UNION_SCHEMA)?;
756        let union = Value::Union(1, Box::new(Value::Long(3)));
757
758        let mut expected = Vec::new();
759        zig_i64(1, &mut expected)?;
760        zig_i64(3, &mut expected)?;
761
762        assert_eq!(to_avro_datum(&schema, union)?, expected);
763
764        Ok(())
765    }
766
767    #[test]
768    fn test_union_null() -> TestResult {
769        let schema = Schema::parse_str(UNION_SCHEMA)?;
770        let union = Value::Union(0, Box::new(Value::Null));
771
772        let mut expected = Vec::new();
773        zig_i64(0, &mut expected)?;
774
775        assert_eq!(to_avro_datum(&schema, union)?, expected);
776
777        Ok(())
778    }
779
780    fn logical_type_test<T: Into<Value> + Clone>(
781        schema_str: &'static str,
782
783        expected_schema: &Schema,
784        value: Value,
785
786        raw_schema: &Schema,
787        raw_value: T,
788    ) -> TestResult {
789        let schema = Schema::parse_str(schema_str)?;
790        assert_eq!(&schema, expected_schema);
791        // The serialized format should be the same as the schema.
792        let ser = to_avro_datum(&schema, value.clone())?;
793        let raw_ser = to_avro_datum(raw_schema, raw_value)?;
794        assert_eq!(ser, raw_ser);
795
796        // Should deserialize from the schema into the logical type.
797        let mut r = ser.as_slice();
798        let de = crate::from_avro_datum(&schema, &mut r, None)?;
799        assert_eq!(de, value);
800        Ok(())
801    }
802
803    #[test]
804    fn date() -> TestResult {
805        logical_type_test(
806            r#"{"type": "int", "logicalType": "date"}"#,
807            &Schema::Date,
808            Value::Date(1_i32),
809            &Schema::Int,
810            1_i32,
811        )
812    }
813
814    #[test]
815    fn time_millis() -> TestResult {
816        logical_type_test(
817            r#"{"type": "int", "logicalType": "time-millis"}"#,
818            &Schema::TimeMillis,
819            Value::TimeMillis(1_i32),
820            &Schema::Int,
821            1_i32,
822        )
823    }
824
825    #[test]
826    fn time_micros() -> TestResult {
827        logical_type_test(
828            r#"{"type": "long", "logicalType": "time-micros"}"#,
829            &Schema::TimeMicros,
830            Value::TimeMicros(1_i64),
831            &Schema::Long,
832            1_i64,
833        )
834    }
835
836    #[test]
837    fn timestamp_millis() -> TestResult {
838        logical_type_test(
839            r#"{"type": "long", "logicalType": "timestamp-millis"}"#,
840            &Schema::TimestampMillis,
841            Value::TimestampMillis(1_i64),
842            &Schema::Long,
843            1_i64,
844        )
845    }
846
847    #[test]
848    fn timestamp_micros() -> TestResult {
849        logical_type_test(
850            r#"{"type": "long", "logicalType": "timestamp-micros"}"#,
851            &Schema::TimestampMicros,
852            Value::TimestampMicros(1_i64),
853            &Schema::Long,
854            1_i64,
855        )
856    }
857
858    #[test]
859    fn decimal_fixed() -> TestResult {
860        let size = 30;
861        let inner = Schema::Fixed(FixedSchema {
862            name: Name::new("decimal")?,
863            aliases: None,
864            doc: None,
865            size,
866            default: None,
867            attributes: Default::default(),
868        });
869        let value = vec![0u8; size];
870        logical_type_test(
871            r#"{"type": {"type": "fixed", "size": 30, "name": "decimal"}, "logicalType": "decimal", "precision": 20, "scale": 5}"#,
872            &Schema::Decimal(DecimalSchema {
873                precision: 20,
874                scale: 5,
875                inner: Box::new(inner.clone()),
876            }),
877            Value::Decimal(Decimal::from(value.clone())),
878            &inner,
879            Value::Fixed(size, value),
880        )
881    }
882
883    #[test]
884    fn decimal_bytes() -> TestResult {
885        let inner = Schema::Bytes;
886        let value = vec![0u8; 10];
887        logical_type_test(
888            r#"{"type": "bytes", "logicalType": "decimal", "precision": 4, "scale": 3}"#,
889            &Schema::Decimal(DecimalSchema {
890                precision: 4,
891                scale: 3,
892                inner: Box::new(inner.clone()),
893            }),
894            Value::Decimal(Decimal::from(value.clone())),
895            &inner,
896            value,
897        )
898    }
899
900    #[test]
901    fn duration() -> TestResult {
902        let inner = Schema::Fixed(FixedSchema {
903            name: Name::new("duration")?,
904            aliases: None,
905            doc: None,
906            size: 12,
907            default: None,
908            attributes: Default::default(),
909        });
910        let value = Value::Duration(Duration::new(
911            Months::new(256),
912            Days::new(512),
913            Millis::new(1024),
914        ));
915        logical_type_test(
916            r#"{"type": {"type": "fixed", "name": "duration", "size": 12}, "logicalType": "duration"}"#,
917            &Schema::Duration,
918            value,
919            &inner,
920            Value::Fixed(12, vec![0, 1, 0, 0, 0, 2, 0, 0, 0, 4, 0, 0]),
921        )
922    }
923
924    #[test]
925    fn test_writer_append() -> TestResult {
926        let schema = Schema::parse_str(SCHEMA)?;
927        let mut writer = Writer::new(&schema, Vec::new());
928
929        let mut record = Record::new(&schema).unwrap();
930        record.put("a", 27i64);
931        record.put("b", "foo");
932
933        let n1 = writer.append(record.clone())?;
934        let n2 = writer.append(record.clone())?;
935        let n3 = writer.flush()?;
936        let result = writer.into_inner()?;
937
938        assert_eq!(n1 + n2 + n3, result.len());
939
940        let mut data = Vec::new();
941        zig_i64(27, &mut data)?;
942        zig_i64(3, &mut data)?;
943        data.extend(b"foo");
944        data.extend(data.clone());
945
946        // starts with magic
947        assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
948        // ends with data and sync marker
949        let last_data_byte = result.len() - 16;
950        assert_eq!(
951            &result[last_data_byte - data.len()..last_data_byte],
952            data.as_slice()
953        );
954
955        Ok(())
956    }
957
958    #[test]
959    fn test_writer_extend() -> TestResult {
960        let schema = Schema::parse_str(SCHEMA)?;
961        let mut writer = Writer::new(&schema, Vec::new());
962
963        let mut record = Record::new(&schema).unwrap();
964        record.put("a", 27i64);
965        record.put("b", "foo");
966        let record_copy = record.clone();
967        let records = vec![record, record_copy];
968
969        let n1 = writer.extend(records)?;
970        let n2 = writer.flush()?;
971        let result = writer.into_inner()?;
972
973        assert_eq!(n1 + n2, result.len());
974
975        let mut data = Vec::new();
976        zig_i64(27, &mut data)?;
977        zig_i64(3, &mut data)?;
978        data.extend(b"foo");
979        data.extend(data.clone());
980
981        // starts with magic
982        assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
983        // ends with data and sync marker
984        let last_data_byte = result.len() - 16;
985        assert_eq!(
986            &result[last_data_byte - data.len()..last_data_byte],
987            data.as_slice()
988        );
989
990        Ok(())
991    }
992
993    #[derive(Debug, Clone, Deserialize, Serialize)]
994    struct TestSerdeSerialize {
995        a: i64,
996        b: String,
997    }
998
999    #[test]
1000    fn test_writer_append_ser() -> TestResult {
1001        let schema = Schema::parse_str(SCHEMA)?;
1002        let mut writer = Writer::new(&schema, Vec::new());
1003
1004        let record = TestSerdeSerialize {
1005            a: 27,
1006            b: "foo".to_owned(),
1007        };
1008
1009        let n1 = writer.append_ser(record)?;
1010        let n2 = writer.flush()?;
1011        let result = writer.into_inner()?;
1012
1013        assert_eq!(n1 + n2, result.len());
1014
1015        let mut data = Vec::new();
1016        zig_i64(27, &mut data)?;
1017        zig_i64(3, &mut data)?;
1018        data.extend(b"foo");
1019
1020        // starts with magic
1021        assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1022        // ends with data and sync marker
1023        let last_data_byte = result.len() - 16;
1024        assert_eq!(
1025            &result[last_data_byte - data.len()..last_data_byte],
1026            data.as_slice()
1027        );
1028
1029        Ok(())
1030    }
1031
1032    #[test]
1033    fn test_writer_extend_ser() -> TestResult {
1034        let schema = Schema::parse_str(SCHEMA)?;
1035        let mut writer = Writer::new(&schema, Vec::new());
1036
1037        let record = TestSerdeSerialize {
1038            a: 27,
1039            b: "foo".to_owned(),
1040        };
1041        let record_copy = record.clone();
1042        let records = vec![record, record_copy];
1043
1044        let n1 = writer.extend_ser(records)?;
1045        let n2 = writer.flush()?;
1046        let result = writer.into_inner()?;
1047
1048        assert_eq!(n1 + n2, result.len());
1049
1050        let mut data = Vec::new();
1051        zig_i64(27, &mut data)?;
1052        zig_i64(3, &mut data)?;
1053        data.extend(b"foo");
1054        data.extend(data.clone());
1055
1056        // starts with magic
1057        assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1058        // ends with data and sync marker
1059        let last_data_byte = result.len() - 16;
1060        assert_eq!(
1061            &result[last_data_byte - data.len()..last_data_byte],
1062            data.as_slice()
1063        );
1064
1065        Ok(())
1066    }
1067
1068    fn make_writer_with_codec(schema: &Schema) -> Writer<'_, Vec<u8>> {
1069        Writer::with_codec(
1070            schema,
1071            Vec::new(),
1072            Codec::Deflate(DeflateSettings::default()),
1073        )
1074    }
1075
1076    fn make_writer_with_builder(schema: &Schema) -> Writer<'_, Vec<u8>> {
1077        Writer::builder()
1078            .writer(Vec::new())
1079            .schema(schema)
1080            .codec(Codec::Deflate(DeflateSettings::default()))
1081            .block_size(100)
1082            .build()
1083    }
1084
1085    fn check_writer(mut writer: Writer<'_, Vec<u8>>, schema: &Schema) -> TestResult {
1086        let mut record = Record::new(schema).unwrap();
1087        record.put("a", 27i64);
1088        record.put("b", "foo");
1089
1090        let n1 = writer.append(record.clone())?;
1091        let n2 = writer.append(record.clone())?;
1092        let n3 = writer.flush()?;
1093        let result = writer.into_inner()?;
1094
1095        assert_eq!(n1 + n2 + n3, result.len());
1096
1097        let mut data = Vec::new();
1098        zig_i64(27, &mut data)?;
1099        zig_i64(3, &mut data)?;
1100        data.extend(b"foo");
1101        data.extend(data.clone());
1102        Codec::Deflate(DeflateSettings::default()).compress(&mut data)?;
1103
1104        // starts with magic
1105        assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1106        // ends with data and sync marker
1107        let last_data_byte = result.len() - 16;
1108        assert_eq!(
1109            &result[last_data_byte - data.len()..last_data_byte],
1110            data.as_slice()
1111        );
1112
1113        Ok(())
1114    }
1115
1116    #[test]
1117    fn test_writer_with_codec() -> TestResult {
1118        let schema = Schema::parse_str(SCHEMA)?;
1119        let writer = make_writer_with_codec(&schema);
1120        check_writer(writer, &schema)
1121    }
1122
1123    #[test]
1124    fn test_writer_with_builder() -> TestResult {
1125        let schema = Schema::parse_str(SCHEMA)?;
1126        let writer = make_writer_with_builder(&schema);
1127        check_writer(writer, &schema)
1128    }
1129
1130    #[test]
1131    fn test_logical_writer() -> TestResult {
1132        const LOGICAL_TYPE_SCHEMA: &str = r#"
1133        {
1134          "type": "record",
1135          "name": "logical_type_test",
1136          "fields": [
1137            {
1138              "name": "a",
1139              "type": [
1140                "null",
1141                {
1142                  "type": "long",
1143                  "logicalType": "timestamp-micros"
1144                }
1145              ]
1146            }
1147          ]
1148        }
1149        "#;
1150        let codec = Codec::Deflate(DeflateSettings::default());
1151        let schema = Schema::parse_str(LOGICAL_TYPE_SCHEMA)?;
1152        let mut writer = Writer::builder()
1153            .schema(&schema)
1154            .codec(codec)
1155            .writer(Vec::new())
1156            .build();
1157
1158        let mut record1 = Record::new(&schema).unwrap();
1159        record1.put(
1160            "a",
1161            Value::Union(1, Box::new(Value::TimestampMicros(1234_i64))),
1162        );
1163
1164        let mut record2 = Record::new(&schema).unwrap();
1165        record2.put("a", Value::Union(0, Box::new(Value::Null)));
1166
1167        let n1 = writer.append(record1)?;
1168        let n2 = writer.append(record2)?;
1169        let n3 = writer.flush()?;
1170        let result = writer.into_inner()?;
1171
1172        assert_eq!(n1 + n2 + n3, result.len());
1173
1174        let mut data = Vec::new();
1175        // byte indicating not null
1176        zig_i64(1, &mut data)?;
1177        zig_i64(1234, &mut data)?;
1178
1179        // byte indicating null
1180        zig_i64(0, &mut data)?;
1181        codec.compress(&mut data)?;
1182
1183        // starts with magic
1184        assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1185        // ends with data and sync marker
1186        let last_data_byte = result.len() - 16;
1187        assert_eq!(
1188            &result[last_data_byte - data.len()..last_data_byte],
1189            data.as_slice()
1190        );
1191
1192        Ok(())
1193    }
1194
1195    #[test]
1196    fn test_avro_3405_writer_add_metadata_success() -> TestResult {
1197        let schema = Schema::parse_str(SCHEMA)?;
1198        let mut writer = Writer::new(&schema, Vec::new());
1199
1200        writer.add_user_metadata("stringKey".to_string(), String::from("stringValue"))?;
1201        writer.add_user_metadata("strKey".to_string(), "strValue")?;
1202        writer.add_user_metadata("bytesKey".to_string(), b"bytesValue")?;
1203        writer.add_user_metadata("vecKey".to_string(), vec![1, 2, 3])?;
1204
1205        let mut record = Record::new(&schema).unwrap();
1206        record.put("a", 27i64);
1207        record.put("b", "foo");
1208
1209        writer.append(record.clone())?;
1210        writer.append(record.clone())?;
1211        writer.flush()?;
1212        let result = writer.into_inner()?;
1213
1214        assert_eq!(result.len(), 260);
1215
1216        Ok(())
1217    }
1218
1219    #[test]
1220    fn test_avro_3881_metadata_empty_body() -> TestResult {
1221        let schema = Schema::parse_str(SCHEMA)?;
1222        let mut writer = Writer::new(&schema, Vec::new());
1223        writer.add_user_metadata("a".to_string(), "b")?;
1224        let result = writer.into_inner()?;
1225
1226        let reader = Reader::with_schema(&schema, &result[..])?;
1227        let mut expected = HashMap::new();
1228        expected.insert("a".to_string(), vec![b'b']);
1229        assert_eq!(reader.user_metadata(), &expected);
1230        assert_eq!(reader.into_iter().count(), 0);
1231
1232        Ok(())
1233    }
1234
1235    #[test]
1236    fn test_avro_3405_writer_add_metadata_failure() -> TestResult {
1237        let schema = Schema::parse_str(SCHEMA)?;
1238        let mut writer = Writer::new(&schema, Vec::new());
1239
1240        let mut record = Record::new(&schema).unwrap();
1241        record.put("a", 27i64);
1242        record.put("b", "foo");
1243        writer.append(record.clone())?;
1244
1245        match writer.add_user_metadata("stringKey".to_string(), String::from("value2")) {
1246            Err(e @ Error::FileHeaderAlreadyWritten) => {
1247                assert_eq!(e.to_string(), "The file metadata is already flushed.")
1248            }
1249            Err(e) => panic!("Unexpected error occurred while writing user metadata: {e:?}"),
1250            Ok(_) => panic!("Expected an error that metadata cannot be added after adding data"),
1251        }
1252
1253        Ok(())
1254    }
1255
1256    #[test]
1257    fn test_avro_3405_writer_add_metadata_reserved_prefix_failure() -> TestResult {
1258        let schema = Schema::parse_str(SCHEMA)?;
1259        let mut writer = Writer::new(&schema, Vec::new());
1260
1261        let key = "avro.stringKey".to_string();
1262        match writer.add_user_metadata(key.clone(), "value") {
1263            Err(ref e @ Error::InvalidMetadataKey(_)) => {
1264                assert_eq!(e.to_string(), format!("Metadata keys starting with 'avro.' are reserved for internal usage: {key}."))
1265            }
1266            Err(e) => panic!(
1267                "Unexpected error occurred while writing user metadata with reserved prefix ('avro.'): {e:?}"
1268            ),
1269            Ok(_) => panic!("Expected an error that the metadata key cannot be prefixed with 'avro.'"),
1270        }
1271
1272        Ok(())
1273    }
1274
1275    #[test]
1276    fn test_avro_3405_writer_add_metadata_with_builder_api_success() -> TestResult {
1277        let schema = Schema::parse_str(SCHEMA)?;
1278
1279        let mut user_meta_data: HashMap<String, Value> = HashMap::new();
1280        user_meta_data.insert(
1281            "stringKey".to_string(),
1282            Value::String("stringValue".to_string()),
1283        );
1284        user_meta_data.insert("bytesKey".to_string(), Value::Bytes(b"bytesValue".to_vec()));
1285        user_meta_data.insert("vecKey".to_string(), Value::Bytes(vec![1, 2, 3]));
1286
1287        let writer: Writer<'_, Vec<u8>> = Writer::builder()
1288            .writer(Vec::new())
1289            .schema(&schema)
1290            .user_metadata(user_meta_data.clone())
1291            .build();
1292
1293        assert_eq!(writer.user_metadata, user_meta_data);
1294
1295        Ok(())
1296    }
1297
1298    #[derive(Serialize, Clone)]
1299    struct TestSingleObjectWriter {
1300        a: i64,
1301        b: f64,
1302        c: Vec<String>,
1303    }
1304
1305    impl AvroSchema for TestSingleObjectWriter {
1306        fn get_schema() -> Schema {
1307            let schema = r#"
1308            {
1309                "type":"record",
1310                "name":"TestSingleObjectWrtierSerialize",
1311                "fields":[
1312                    {
1313                        "name":"a",
1314                        "type":"long"
1315                    },
1316                    {
1317                        "name":"b",
1318                        "type":"double"
1319                    },
1320                    {
1321                        "name":"c",
1322                        "type":{
1323                            "type":"array",
1324                            "items":"string"
1325                        }
1326                    }
1327                ]
1328            }
1329            "#;
1330            Schema::parse_str(schema).unwrap()
1331        }
1332    }
1333
1334    impl From<TestSingleObjectWriter> for Value {
1335        fn from(obj: TestSingleObjectWriter) -> Value {
1336            Value::Record(vec![
1337                ("a".into(), obj.a.into()),
1338                ("b".into(), obj.b.into()),
1339                (
1340                    "c".into(),
1341                    Value::Array(obj.c.into_iter().map(|s| s.into()).collect()),
1342                ),
1343            ])
1344        }
1345    }
1346
1347    #[test]
1348    fn test_single_object_writer() -> TestResult {
1349        let mut buf: Vec<u8> = Vec::new();
1350        let obj = TestSingleObjectWriter {
1351            a: 300,
1352            b: 34.555,
1353            c: vec!["cat".into(), "dog".into()],
1354        };
1355        let mut writer = GenericSingleObjectWriter::new_with_capacity(
1356            &TestSingleObjectWriter::get_schema(),
1357            1024,
1358        )
1359        .expect("Should resolve schema");
1360        let value = obj.into();
1361        let written_bytes = writer
1362            .write_value_ref(&value, &mut buf)
1363            .expect("Error serializing properly");
1364
1365        assert!(buf.len() > 10, "no bytes written");
1366        assert_eq!(buf.len(), written_bytes);
1367        assert_eq!(buf[0], 0xC3);
1368        assert_eq!(buf[1], 0x01);
1369        assert_eq!(
1370            &buf[2..10],
1371            &TestSingleObjectWriter::get_schema()
1372                .fingerprint::<Rabin>()
1373                .bytes[..]
1374        );
1375        let mut msg_binary = Vec::new();
1376        encode(
1377            &value,
1378            &TestSingleObjectWriter::get_schema(),
1379            &mut msg_binary,
1380        )
1381        .expect("encode should have failed by here as a dependency of any writing");
1382        assert_eq!(&buf[10..], &msg_binary[..]);
1383
1384        Ok(())
1385    }
1386
1387    #[test]
1388    fn test_writer_parity() -> TestResult {
1389        let obj1 = TestSingleObjectWriter {
1390            a: 300,
1391            b: 34.555,
1392            c: vec!["cat".into(), "dog".into()],
1393        };
1394
1395        let mut buf1: Vec<u8> = Vec::new();
1396        let mut buf2: Vec<u8> = Vec::new();
1397        let mut buf3: Vec<u8> = Vec::new();
1398
1399        let mut generic_writer = GenericSingleObjectWriter::new_with_capacity(
1400            &TestSingleObjectWriter::get_schema(),
1401            1024,
1402        )
1403        .expect("Should resolve schema");
1404        let mut specific_writer =
1405            SpecificSingleObjectWriter::<TestSingleObjectWriter>::with_capacity(1024)
1406                .expect("Resolved should pass");
1407        specific_writer
1408            .write(obj1.clone(), &mut buf1)
1409            .expect("Serialization expected");
1410        specific_writer
1411            .write_value(obj1.clone(), &mut buf2)
1412            .expect("Serialization expected");
1413        generic_writer
1414            .write_value(obj1.into(), &mut buf3)
1415            .expect("Serialization expected");
1416        assert_eq!(buf1, buf2);
1417        assert_eq!(buf1, buf3);
1418
1419        Ok(())
1420    }
1421
1422    #[test]
1423    fn avro_3894_take_aliases_into_account_when_serializing() -> TestResult {
1424        const SCHEMA: &str = r#"
1425  {
1426      "type": "record",
1427      "name": "Conference",
1428      "fields": [
1429          {"type": "string", "name": "name"},
1430          {"type": ["null", "long"], "name": "date", "aliases" : [ "time2", "time" ]}
1431      ]
1432  }"#;
1433
1434        #[derive(Debug, PartialEq, Eq, Clone, Serialize)]
1435        pub struct Conference {
1436            pub name: String,
1437            pub time: Option<i64>,
1438        }
1439
1440        let conf = Conference {
1441            name: "RustConf".to_string(),
1442            time: Some(1234567890),
1443        };
1444
1445        let schema = Schema::parse_str(SCHEMA)?;
1446        let mut writer = Writer::new(&schema, Vec::new());
1447
1448        let bytes = writer.append_ser(conf)?;
1449
1450        assert_eq!(198, bytes);
1451
1452        Ok(())
1453    }
1454
1455    #[test]
1456    fn avro_4014_validation_returns_a_detailed_error() -> TestResult {
1457        const SCHEMA: &str = r#"
1458  {
1459      "type": "record",
1460      "name": "Conference",
1461      "fields": [
1462          {"type": "string", "name": "name"},
1463          {"type": ["null", "long"], "name": "date", "aliases" : [ "time2", "time" ]}
1464      ]
1465  }"#;
1466
1467        #[derive(Debug, PartialEq, Clone, Serialize)]
1468        pub struct Conference {
1469            pub name: String,
1470            pub time: Option<f64>, // wrong type: f64 instead of i64
1471        }
1472
1473        let conf = Conference {
1474            name: "RustConf".to_string(),
1475            time: Some(12345678.90),
1476        };
1477
1478        let schema = Schema::parse_str(SCHEMA)?;
1479        let mut writer = Writer::new(&schema, Vec::new());
1480
1481        match writer.append_ser(conf) {
1482            Ok(bytes) => panic!("Expected an error, but got {} bytes written", bytes),
1483            Err(e) => {
1484                assert_eq!(
1485                    e.to_string(),
1486                    r#"Failed to serialize field 'time' for record Record(RecordSchema { name: Name { name: "Conference", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "name", doc: None, aliases: None, default: None, schema: String, order: Ascending, position: 0, custom_attributes: {} }, RecordField { name: "date", doc: None, aliases: Some(["time2", "time"]), default: None, schema: Union(UnionSchema { schemas: [Null, Long], variant_index: {Null: 0, Long: 1} }), order: Ascending, position: 1, custom_attributes: {} }], lookup: {"date": 1, "name": 0, "time": 1, "time2": 1}, attributes: {} }): Failed to serialize value of type f64 using schema Long: 12345678.9. Cause: Expected: Long. Got: Double"#
1487                );
1488            }
1489        }
1490        Ok(())
1491    }
1492
1493    #[test]
1494    fn avro_4063_flush_applies_to_inner_writer() -> TestResult {
1495        const SCHEMA: &str = r#"
1496        {
1497            "type": "record",
1498            "name": "ExampleSchema",
1499            "fields": [
1500                {"name": "exampleField", "type": "string"}
1501            ]
1502        }
1503        "#;
1504
1505        #[derive(Clone, Default)]
1506        struct TestBuffer(Rc<RefCell<Vec<u8>>>);
1507
1508        impl TestBuffer {
1509            fn len(&self) -> usize {
1510                self.0.borrow().len()
1511            }
1512        }
1513
1514        impl Write for TestBuffer {
1515            fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
1516                self.0.borrow_mut().write(buf)
1517            }
1518
1519            fn flush(&mut self) -> std::io::Result<()> {
1520                Ok(())
1521            }
1522        }
1523
1524        let shared_buffer = TestBuffer::default();
1525
1526        let buffered_writer = std::io::BufWriter::new(shared_buffer.clone());
1527
1528        let schema = Schema::parse_str(SCHEMA)?;
1529
1530        let mut writer = Writer::new(&schema, buffered_writer);
1531
1532        let mut record = Record::new(writer.schema()).unwrap();
1533        record.put("exampleField", "value");
1534
1535        writer.append(record)?;
1536        writer.flush()?;
1537
1538        assert_eq!(
1539            shared_buffer.len(),
1540            167,
1541            "the test buffer was not fully written to after Writer::flush was called"
1542        );
1543
1544        Ok(())
1545    }
1546}