apache_avro/
schema_compatibility.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Logic for checking schema compatibility
19use crate::{
20    error::CompatibilityError,
21    schema::{
22        ArraySchema, DecimalSchema, EnumSchema, InnerDecimalSchema, MapSchema, RecordSchema,
23        Schema, UuidSchema,
24    },
25};
26use std::{
27    collections::{HashMap, hash_map::DefaultHasher},
28    hash::Hasher,
29    iter::once,
30    ops::BitAndAssign,
31    ptr,
32};
33
34pub struct SchemaCompatibility;
35
36/// How compatible are two schemas.
37#[derive(Debug, Copy, Clone, Eq, PartialEq)]
38pub enum Compatibility {
39    /// Full compatibility, resolving will always work.
40    Full,
41    /// Partial compatibility, resolving may error.
42    ///
43    /// This can happen if an enum doesn't have all fields, or unions don't entirely overlap.
44    Partial,
45}
46
47impl BitAndAssign for Compatibility {
48    /// Combine two compatibilities.
49    ///
50    /// # Truth table
51    /// |         | Full    | Partial |
52    /// | ------- | ------- | ------- |
53    /// | Full    | Full    | Partial |
54    /// | Partial | Partial | Partial |
55    fn bitand_assign(&mut self, rhs: Self) {
56        match (*self, rhs) {
57            (Self::Full, Self::Full) => *self = Self::Full,
58            _ => *self = Self::Partial,
59        }
60    }
61}
62
63struct Checker {
64    recursion: HashMap<(u64, u64), Compatibility>,
65}
66
67impl Checker {
68    /// Create a new checker, with recursion set to an empty set.
69    pub(crate) fn new() -> Self {
70        Self {
71            recursion: HashMap::new(),
72        }
73    }
74
75    /// Check if the reader schema can be resolved from the writer schema.
76    pub(crate) fn full_match_schemas(
77        &mut self,
78        writers_schema: &Schema,
79        readers_schema: &Schema,
80    ) -> Result<Compatibility, CompatibilityError> {
81        // Hash both reader and writer based on their pointer value. This is a fast way to see if
82        // we get the exact same schemas multiple times (because of recursive types)
83        let key = (
84            Self::pointer_hash(writers_schema),
85            Self::pointer_hash(readers_schema),
86        );
87
88        // If we already saw this pairing, return the previous value
89        if let Some(c) = self.recursion.get(&key).copied() {
90            Ok(c)
91        } else {
92            let c = self.inner_full_match_schemas(writers_schema, readers_schema)?;
93            // Insert the new value
94            self.recursion.insert(key, c);
95            Ok(c)
96        }
97    }
98
99    /// Hash a schema based only on its pointer value.
100    fn pointer_hash(schema: &Schema) -> u64 {
101        let mut hasher = DefaultHasher::new();
102        ptr::hash(schema, &mut hasher);
103        hasher.finish()
104    }
105
106    /// The actual implementation of [`full_match_schemas`] but without the recursion protection.
107    ///
108    /// This function should never be called directly as it can recurse infinitely on recursive types.
109    #[rustfmt::skip]
110    fn inner_full_match_schemas(
111        &mut self,
112        writers_schema: &Schema,
113        readers_schema: &Schema,
114    ) -> Result<Compatibility, CompatibilityError> {
115        // Compare unqualified names if the schemas have them
116        if let Some(w_name) = writers_schema.name()
117            && let Some(r_name) = readers_schema.name()
118            && w_name.name != r_name.name
119        {
120            return Err(CompatibilityError::NameMismatch {
121                writer_name: w_name.name.clone(),
122                reader_name: r_name.name.clone(),
123            });
124        }
125
126        // Logical types are downgraded to their actual type
127        match (writers_schema, readers_schema) {
128            (Schema::Ref { name: w_name }, Schema::Ref { name: r_name }) => {
129                if r_name == w_name {
130                    Ok(Compatibility::Full)
131                } else {
132                    Err(CompatibilityError::NameMismatch {
133                        writer_name: w_name.fullname(None),
134                        reader_name: r_name.fullname(None),
135                    })
136                }
137            }
138            (Schema::Union(writer), Schema::Union(reader)) => {
139                let mut any = false;
140                let mut all = true;
141                for writer in &writer.schemas {
142                    // Try to find a reader variant that is fully compatible with this writer variant.
143                    // In case that does not exist, we keep track of any partial compatibility we find.
144                    let mut local_any = false;
145                    all &= reader.schemas.iter().any(|reader| {
146                        match self.full_match_schemas(writer, reader) {
147                            Ok(Compatibility::Full) => {
148                                local_any = true;
149                                true
150                            }
151                            Ok(Compatibility::Partial) => {
152                                local_any = true;
153                                false
154                            }
155                            Err(_) => false,
156                        }
157                    });
158                    // Save any match we found
159                    any |= local_any;
160                }
161                if all {
162                    // All writer variants are fully compatible with reader variants
163                    Ok(Compatibility::Full)
164                } else if any {
165                    // At least one writer variant is partially or fully compatible with a reader variant
166                    Ok(Compatibility::Partial)
167                } else {
168                    Err(CompatibilityError::MissingUnionElements)
169                }
170            }
171            (Schema::Union(writer), _) => {
172                // Check if all writer variants are fully compatible with the reader schema.
173                // We keep track of if we see any (partial) compatibility.
174                let mut any = false;
175                let mut all = true;
176                for writer in &writer.schemas {
177                    match self.full_match_schemas(writer, readers_schema) {
178                        Ok(Compatibility::Full) => any = true,
179                        Ok(Compatibility::Partial) => {
180                            any = true;
181                            all = false;
182                        }
183                        Err(_) => {
184                            all = false;
185                        }
186                    }
187                }
188                if all {
189                    // All writer variants are fully compatible with the reader schema
190                    Ok(Compatibility::Full)
191                } else if any {
192                    // At least one writer variant is partially compatible with the reader schema
193                    Ok(Compatibility::Partial)
194                } else {
195                    Err(CompatibilityError::SchemaMismatchAllUnionElements)
196                }
197            }
198            (_, Schema::Union(reader)) => {
199                // Try to find a fully compatible reader variant for the writer schema.
200                // In case that does not exist, we keep track of any partial compatibility.
201                let mut partial = false;
202                if reader.schemas.iter().any(|reader| {
203                    match self.full_match_schemas(writers_schema, reader) {
204                        Ok(Compatibility::Full) => true,
205                        Ok(Compatibility::Partial) => {
206                            partial = true;
207                            false
208                        }
209                        Err(_) => false,
210                    }
211                }) {
212                    // At least one reader variant is fully compatible with the writer schema
213                    Ok(Compatibility::Full)
214                } else if partial {
215                    // At least one reader variant is partially compatible with the writer schema
216                    Ok(Compatibility::Partial)
217                } else {
218                    Err(CompatibilityError::SchemaMismatchAllUnionElements)
219                }
220            }
221            (Schema::Null, Schema::Null) => Ok(Compatibility::Full),
222            (Schema::Boolean, Schema::Boolean) => Ok(Compatibility::Full),
223            // int promotes to long, float and double
224            (
225                Schema::Int | Schema::Date | Schema::TimeMillis,
226                Schema::Int | Schema::Long | Schema::Float | Schema::Double | Schema::Date
227                | Schema::TimeMillis | Schema::TimeMicros | Schema::TimestampMillis
228                | Schema::TimestampMicros | Schema::TimestampNanos | Schema::LocalTimestampMillis
229                | Schema::LocalTimestampMicros | Schema::LocalTimestampNanos,
230            ) => Ok(Compatibility::Full),
231            // long promotes to float and double
232            (
233                Schema::Long | Schema::TimeMicros | Schema::TimestampMillis
234                | Schema::TimestampMicros | Schema::TimestampNanos | Schema::LocalTimestampMillis
235                | Schema::LocalTimestampMicros | Schema::LocalTimestampNanos,
236                Schema::Long | Schema::Float | Schema::Double | Schema::TimeMicros
237                | Schema::TimestampMillis | Schema::TimestampMicros | Schema::TimestampNanos
238                | Schema::LocalTimestampMillis | Schema::LocalTimestampMicros
239                | Schema::LocalTimestampNanos,
240            ) => Ok(Compatibility::Full),
241            // float promotes to double
242            (Schema::Float, Schema::Float | Schema::Double) => Ok(Compatibility::Full),
243            (Schema::Double, Schema::Double) => Ok(Compatibility::Full),
244            // This check needs to be above the other Decimal checks, so that if both schemas are
245            // Decimal then the precision and scale are checked. The other Decimal checks below will
246            // thus only hit if only one of the schemas is a Decimal
247            (
248                Schema::Decimal(DecimalSchema { precision: w_precision, scale: w_scale, .. }),
249                Schema::Decimal(DecimalSchema { precision: r_precision, scale: r_scale, .. }),
250            ) => {
251                // precision and scale must match
252                if r_precision == w_precision && r_scale == w_scale {
253                    Ok(Compatibility::Full)
254                } else {
255                    Err(CompatibilityError::DecimalMismatch {
256                        r_precision: *r_precision,
257                        r_scale: *r_scale,
258                        w_precision: *w_precision,
259                        w_scale: *w_scale
260                    })
261                }
262            }
263            // bytes and strings are interchangeable
264            (
265                Schema::Bytes | Schema::String | Schema::BigDecimal
266                | Schema::Uuid(UuidSchema::String | UuidSchema::Bytes)
267                | Schema::Decimal(DecimalSchema { inner: InnerDecimalSchema::Bytes, .. }),
268                Schema::Bytes | Schema::String | Schema::BigDecimal
269                | Schema::Uuid(UuidSchema::String | UuidSchema::Bytes)
270                | Schema::Decimal(DecimalSchema { inner: InnerDecimalSchema::Bytes, .. }),
271            ) => Ok(Compatibility::Full),
272            (Schema::Uuid(_), Schema::Uuid(_)) => Ok(Compatibility::Full),
273            (
274                Schema::Fixed(w_fixed) | Schema::Uuid(UuidSchema::Fixed(w_fixed))
275                | Schema::Decimal(DecimalSchema { inner: InnerDecimalSchema::Fixed(w_fixed), .. })
276                | Schema::Duration(w_fixed),
277                Schema::Fixed(r_fixed) | Schema::Uuid(UuidSchema::Fixed(r_fixed))
278                | Schema::Decimal(DecimalSchema { inner: InnerDecimalSchema::Fixed(r_fixed), .. })
279                | Schema::Duration(r_fixed),
280            ) => {
281                // Size must match
282                if r_fixed.size == w_fixed.size {
283                    Ok(Compatibility::Full)
284                } else {
285                    Err(CompatibilityError::FixedMismatch)
286                }
287            }
288            (
289                Schema::Array(ArraySchema { items: w_items, .. }),
290                Schema::Array(ArraySchema { items: r_items, .. }),
291            ) => {
292                // array schemas must match
293                self.full_match_schemas(w_items, r_items)
294            }
295            (
296                Schema::Map(MapSchema { types: w_types, .. }),
297                Schema::Map(MapSchema { types: r_types, .. }),
298            ) => {
299                // type schemas must match
300                self.full_match_schemas(w_types, r_types)
301            }
302            (
303                Schema::Enum(EnumSchema { symbols: w_symbols, .. }),
304                Schema::Enum(EnumSchema { symbols: r_symbols, default: r_default, .. }),
305            ) => {
306                // Reader must have a default or all symbols in the writer must also be in the reader
307                if r_default.is_some() {
308                    // No need to iter over all the fields if there is a default
309                    Ok(Compatibility::Full)
310                } else {
311                    let mut any = false;
312                    let mut all = true;
313                    w_symbols.iter().for_each(|s| {
314                        let res = r_symbols.contains(s);
315                        any |= res;
316                        all &= res;
317                    });
318                    if all {
319                        // All symbols match
320                        Ok(Compatibility::Full)
321                    } else if any {
322                        // Only some symbols match
323                        Ok(Compatibility::Partial)
324                    } else {
325                        // No symbols match
326                        Err(CompatibilityError::MissingSymbols)
327                    }
328                }
329            }
330            (
331                Schema::Record(RecordSchema { fields: w_fields, .. }),
332                Schema::Record(RecordSchema { fields: r_fields, .. }),
333            ) => {
334                let mut compatibility = Compatibility::Full;
335                for r_field in r_fields {
336                    // Can't use RecordField.lookup as aliases are also inserted into there and we
337                    // are not allowed to match on writer aliases.
338                    // Search using field name and *after* that aliases.
339                    if let Some(w_field) = once(&r_field.name)
340                        .chain(r_field.aliases.as_deref().unwrap_or(&[]).iter())
341                        .find_map(|ra| w_fields.iter().find(|wf| &wf.name == ra))
342                    {
343                        // Check that the schemas are compatible
344                        match self.full_match_schemas(&w_field.schema, &r_field.schema) {
345                            Ok(c) => compatibility &= c,
346                            Err(err) => {
347                                return Err(CompatibilityError::FieldTypeMismatch(
348                                    r_field.name.clone(),
349                                    Box::new(err),
350                                ));
351                            }
352                        }
353                    } else if r_field.default.is_none() {
354                        // No default and no matching field in the writer
355                        return Err(CompatibilityError::MissingDefaultValue(
356                            r_field.name.clone(),
357                        ));
358                    }
359                }
360                Ok(compatibility)
361            }
362            (_, _) => Err(CompatibilityError::WrongType {
363                writer_schema_type: format!("{writers_schema:#?}"),
364                reader_schema_type: format!("{readers_schema:#?}"),
365            }),
366        }
367    }
368
369    pub(crate) fn can_read(
370        &mut self,
371        writers_schema: &Schema,
372        readers_schema: &Schema,
373    ) -> Result<Compatibility, CompatibilityError> {
374        self.full_match_schemas(writers_schema, readers_schema)
375    }
376}
377
378impl SchemaCompatibility {
379    /// `can_read` performs a full, recursive check that a datum written using the
380    /// writers_schema can be read using the readers_schema.
381    pub fn can_read(
382        writers_schema: &Schema,
383        readers_schema: &Schema,
384    ) -> Result<Compatibility, CompatibilityError> {
385        let mut c = Checker::new();
386        c.can_read(writers_schema, readers_schema)
387    }
388
389    /// `mutual_read` performs a full, recursive check that a datum written using either
390    /// the writers_schema or the readers_schema can be read using the other schema.
391    pub fn mutual_read(
392        writers_schema: &Schema,
393        readers_schema: &Schema,
394    ) -> Result<Compatibility, CompatibilityError> {
395        let mut c = SchemaCompatibility::can_read(writers_schema, readers_schema)?;
396        c &= SchemaCompatibility::can_read(readers_schema, writers_schema)?;
397        Ok(c)
398    }
399}
400
401#[cfg(test)]
402mod tests {
403    use std::collections::BTreeMap;
404
405    use super::*;
406    use crate::{
407        Codec, Decimal, Reader, Writer,
408        schema::{FixedSchema, Name, UuidSchema},
409        types::{Record, Value},
410    };
411    use apache_avro_test_helper::TestResult;
412    use rstest::*;
413
414    fn int_array_schema() -> Schema {
415        Schema::parse_str(r#"{"type":"array", "items":"int"}"#).unwrap()
416    }
417
418    fn long_array_schema() -> Schema {
419        Schema::parse_str(r#"{"type":"array", "items":"long"}"#).unwrap()
420    }
421
422    fn string_array_schema() -> Schema {
423        Schema::parse_str(r#"{"type":"array", "items":"string"}"#).unwrap()
424    }
425
426    fn int_map_schema() -> Schema {
427        Schema::parse_str(r#"{"type":"map", "values":"int"}"#).unwrap()
428    }
429
430    fn long_map_schema() -> Schema {
431        Schema::parse_str(r#"{"type":"map", "values":"long"}"#).unwrap()
432    }
433
434    fn string_map_schema() -> Schema {
435        Schema::parse_str(r#"{"type":"map", "values":"string"}"#).unwrap()
436    }
437
438    fn enum1_ab_schema() -> Schema {
439        Schema::parse_str(r#"{"type":"enum", "name":"Enum1", "symbols":["A","B"]}"#).unwrap()
440    }
441
442    fn enum1_abc_schema() -> Schema {
443        Schema::parse_str(r#"{"type":"enum", "name":"Enum1", "symbols":["A","B","C"]}"#).unwrap()
444    }
445
446    fn enum1_bc_schema() -> Schema {
447        Schema::parse_str(r#"{"type":"enum", "name":"Enum1", "symbols":["B","C"]}"#).unwrap()
448    }
449
450    fn enum2_ab_schema() -> Schema {
451        Schema::parse_str(r#"{"type":"enum", "name":"Enum2", "symbols":["A","B"]}"#).unwrap()
452    }
453
454    fn empty_record1_schema() -> Schema {
455        Schema::parse_str(r#"{"type":"record", "name":"Record1", "fields":[]}"#).unwrap()
456    }
457
458    fn empty_record2_schema() -> Schema {
459        Schema::parse_str(r#"{"type":"record", "name":"Record2", "fields": []}"#).unwrap()
460    }
461
462    fn a_int_record1_schema() -> Schema {
463        Schema::parse_str(
464            r#"{"type":"record", "name":"Record1", "fields":[{"name":"a", "type":"int"}]}"#,
465        )
466        .unwrap()
467    }
468
469    fn a_long_record1_schema() -> Schema {
470        Schema::parse_str(
471            r#"{"type":"record", "name":"Record1", "fields":[{"name":"a", "type":"long"}]}"#,
472        )
473        .unwrap()
474    }
475
476    fn a_int_b_int_record1_schema() -> Schema {
477        Schema::parse_str(r#"{"type":"record", "name":"Record1", "fields":[{"name":"a", "type":"int"}, {"name":"b", "type":"int"}]}"#).unwrap()
478    }
479
480    fn a_dint_record1_schema() -> Schema {
481        Schema::parse_str(r#"{"type":"record", "name":"Record1", "fields":[{"name":"a", "type":"int", "default":0}]}"#).unwrap()
482    }
483
484    fn a_int_b_dint_record1_schema() -> Schema {
485        Schema::parse_str(r#"{"type":"record", "name":"Record1", "fields":[{"name":"a", "type":"int"}, {"name":"b", "type":"int", "default":0}]}"#).unwrap()
486    }
487
488    fn a_dint_b_dint_record1_schema() -> Schema {
489        Schema::parse_str(r#"{"type":"record", "name":"Record1", "fields":[{"name":"a", "type":"int", "default":0}, {"name":"b", "type":"int", "default":0}]}"#).unwrap()
490    }
491
492    fn nested_record() -> Schema {
493        Schema::parse_str(r#"{"type":"record","name":"parent","fields":[{"name":"attribute","type":{"type":"record","name":"child","fields":[{"name":"id","type":"string"}]}}]}"#).unwrap()
494    }
495
496    fn nested_optional_record() -> Schema {
497        Schema::parse_str(r#"{"type":"record","name":"parent","fields":[{"name":"attribute","type":["null",{"type":"record","name":"child","fields":[{"name":"id","type":"string"}]}],"default":null}]}"#).unwrap()
498    }
499
500    fn int_list_record_schema() -> Schema {
501        Schema::parse_str(r#"{"type":"record", "name":"List", "fields": [{"name": "head", "type": "int"},{"name": "tail", "type": "array", "items": "int"}]}"#).unwrap()
502    }
503
504    fn long_list_record_schema() -> Schema {
505        Schema::parse_str(
506            r#"
507      {
508        "type":"record", "name":"List", "fields": [
509          {"name": "head", "type": "long"},
510          {"name": "tail", "type": "array", "items": "long"}
511      ]}
512"#,
513        )
514        .unwrap()
515    }
516
517    fn union_schema(schemas: Vec<Schema>) -> Schema {
518        let schema_string = schemas
519            .iter()
520            .map(|s| s.canonical_form())
521            .collect::<Vec<String>>()
522            .join(",");
523        Schema::parse_str(&format!("[{schema_string}]")).unwrap()
524    }
525
526    fn empty_union_schema() -> Schema {
527        union_schema(vec![])
528    }
529
530    // unused
531    // fn null_union_schema() -> Schema { union_schema(vec![Schema::Null]) }
532
533    fn int_union_schema() -> Schema {
534        union_schema(vec![Schema::Int])
535    }
536
537    fn long_union_schema() -> Schema {
538        union_schema(vec![Schema::Long])
539    }
540
541    fn string_union_schema() -> Schema {
542        union_schema(vec![Schema::String])
543    }
544
545    fn int_string_union_schema() -> Schema {
546        union_schema(vec![Schema::Int, Schema::String])
547    }
548
549    fn string_int_union_schema() -> Schema {
550        union_schema(vec![Schema::String, Schema::Int])
551    }
552
553    #[test]
554    fn test_broken() {
555        assert_eq!(
556            Compatibility::Partial,
557            SchemaCompatibility::can_read(&int_string_union_schema(), &int_union_schema()).unwrap(),
558            "Only compatible if writer writes an int"
559        )
560    }
561
562    #[test]
563    fn test_incompatible_reader_writer_pairs() {
564        let incompatible_schemas = vec![
565            // null
566            (Schema::Null, Schema::Int),
567            (Schema::Null, Schema::Long),
568            // boolean
569            (Schema::Boolean, Schema::Int),
570            // int
571            (Schema::Int, Schema::Null),
572            (Schema::Int, Schema::Boolean),
573            (Schema::Int, Schema::Long),
574            (Schema::Int, Schema::Float),
575            (Schema::Int, Schema::Double),
576            // long
577            (Schema::Long, Schema::Float),
578            (Schema::Long, Schema::Double),
579            // float
580            (Schema::Float, Schema::Double),
581            // string
582            (Schema::String, Schema::Boolean),
583            (Schema::String, Schema::Int),
584            // bytes
585            (Schema::Bytes, Schema::Null),
586            (Schema::Bytes, Schema::Int),
587            // logical types
588            (Schema::TimeMicros, Schema::Int),
589            (Schema::TimestampMillis, Schema::Int),
590            (Schema::TimestampMicros, Schema::Int),
591            (Schema::TimestampNanos, Schema::Int),
592            (Schema::LocalTimestampMillis, Schema::Int),
593            (Schema::LocalTimestampMicros, Schema::Int),
594            (Schema::LocalTimestampNanos, Schema::Int),
595            (Schema::Date, Schema::Long),
596            (Schema::TimeMillis, Schema::Long),
597            // array and maps
598            (int_array_schema(), long_array_schema()),
599            (int_map_schema(), int_array_schema()),
600            (int_array_schema(), int_map_schema()),
601            (int_map_schema(), long_map_schema()),
602            // enum
603            (enum1_ab_schema(), enum1_abc_schema()),
604            (enum1_bc_schema(), enum1_abc_schema()),
605            (enum1_ab_schema(), enum2_ab_schema()),
606            (Schema::Int, enum2_ab_schema()),
607            (enum2_ab_schema(), Schema::Int),
608            //union
609            (int_union_schema(), int_string_union_schema()),
610            (string_union_schema(), int_string_union_schema()),
611            //record
612            (empty_record2_schema(), empty_record1_schema()),
613            (a_int_record1_schema(), empty_record1_schema()),
614            (a_int_b_dint_record1_schema(), empty_record1_schema()),
615            (int_list_record_schema(), long_list_record_schema()),
616            (nested_record(), nested_optional_record()),
617        ];
618
619        assert!(
620            incompatible_schemas
621                .iter()
622                .any(|(reader, writer)| SchemaCompatibility::can_read(writer, reader).is_err())
623        );
624    }
625
626    #[rstest]
627    // Record type test
628    #[case(
629        r#"{"type": "record", "name": "record_a", "fields": [{"type": "long", "name": "date"}]}"#,
630        r#"{"type": "record", "name": "record_a", "fields": [{"type": "long", "name": "date", "default": 18181}]}"#
631    )]
632    // Fixed type test
633    #[case(
634        r#"{"type": "fixed", "name": "EmployeeId", "size": 16}"#,
635        r#"{"type": "fixed", "name": "EmployeeId", "size": 16, "default": "u00ffffffffffffx"}"#
636    )]
637    // Enum type test
638    #[case(
639        r#"{"type": "enum", "name":"Enum1", "symbols": ["A","B"]}"#,
640        r#"{"type": "enum", "name":"Enum1", "symbols": ["A","B", "C"], "default": "C"}"#
641    )]
642    // Map type test
643    #[case(
644        r#"{"type": "map", "values": "int"}"#,
645        r#"{"type": "map", "values": "long"}"#
646    )]
647    // Date type
648    #[case(r#"{"type": "int"}"#, r#"{"type": "int", "logicalType": "date"}"#)]
649    // time-millis type
650    #[case(
651        r#"{"type": "int"}"#,
652        r#"{"type": "int", "logicalType": "time-millis"}"#
653    )]
654    // time-micros type
655    #[case(
656        r#"{"type": "long"}"#,
657        r#"{"type": "long", "logicalType": "time-micros"}"#
658    )]
659    // timestamp-nanos type
660    #[case(
661        r#"{"type": "long"}"#,
662        r#"{"type": "long", "logicalType": "timestamp-nanos"}"#
663    )]
664    // timestamp-millis type
665    #[case(
666        r#"{"type": "long"}"#,
667        r#"{"type": "long", "logicalType": "timestamp-millis"}"#
668    )]
669    // timestamp-micros type
670    #[case(
671        r#"{"type": "long"}"#,
672        r#"{"type": "long", "logicalType": "timestamp-micros"}"#
673    )]
674    // local-timestamp-millis type
675    #[case(
676        r#"{"type": "long"}"#,
677        r#"{"type": "long", "logicalType": "local-timestamp-millis"}"#
678    )]
679    // local-timestamp-micros type
680    #[case(
681        r#"{"type": "long"}"#,
682        r#"{"type": "long", "logicalType": "local-timestamp-micros"}"#
683    )]
684    // local-timestamp-nanos type
685    #[case(
686        r#"{"type": "long"}"#,
687        r#"{"type": "long", "logicalType": "local-timestamp-nanos"}"#
688    )]
689    // Array type test
690    #[case(
691        r#"{"type": "array", "items": "int"}"#,
692        r#"{"type": "array", "items": "long"}"#
693    )]
694    fn test_avro_3950_match_schemas_ok(
695        #[case] writer_schema_str: &str,
696        #[case] reader_schema_str: &str,
697    ) {
698        let writer_schema = Schema::parse_str(writer_schema_str).unwrap();
699        let reader_schema = Schema::parse_str(reader_schema_str).unwrap();
700
701        assert!(SchemaCompatibility::can_read(&writer_schema, &reader_schema).is_ok());
702    }
703
704    #[rstest]
705    // Record type test
706    #[case(
707        r#"{"type": "record", "name":"record_a", "fields": [{"type": "long", "name": "date"}]}"#,
708        r#"{"type": "record", "name":"record_b", "fields": [{"type": "long", "name": "date"}]}"#,
709        CompatibilityError::NameMismatch{writer_name: String::from("record_a"), reader_name: String::from("record_b")}
710    )]
711    // Fixed type test
712    #[case(
713        r#"{"type": "fixed", "name": "EmployeeId", "size": 16}"#,
714        r#"{"type": "fixed", "name": "EmployeeId", "size": 20}"#,
715        CompatibilityError::FixedMismatch
716    )]
717    // Enum type test
718    #[case(
719        r#"{"type": "enum", "name": "Enum1", "symbols": ["A","B"]}"#,
720        r#"{"type": "enum", "name": "Enum2", "symbols": ["A","B"]}"#,
721        CompatibilityError::NameMismatch{writer_name: String::from("Enum1"), reader_name: String::from("Enum2")}
722    )]
723    // Map type test
724    #[case(
725        r#"{"type":"map", "values": "long"}"#,
726        r#"{"type":"map", "values": "int"}"#,
727        CompatibilityError::WrongType { writer_schema_type: "Long".to_string(), reader_schema_type: "Int".to_string() }
728    )]
729    // Array type test
730    #[case(
731        r#"{"type": "array", "items": "long"}"#,
732        r#"{"type": "array", "items": "int"}"#,
733        CompatibilityError::WrongType { writer_schema_type: "Long".to_string(), reader_schema_type: "Int".to_string() }
734    )]
735    // Date type test
736    #[case(
737        r#"{"type": "string"}"#,
738        r#"{"type": "int", "logicalType": "date"}"#,
739        CompatibilityError::WrongType { writer_schema_type: "String".to_string(), reader_schema_type: "Date".to_string() }
740    )]
741    // time-millis type
742    #[case(
743        r#"{"type": "string"}"#,
744        r#"{"type": "int", "logicalType": "time-millis"}"#,
745        CompatibilityError::WrongType { writer_schema_type: "String".to_string(), reader_schema_type: "TimeMillis".to_string() }
746    )]
747    // time-millis type
748    #[case(
749        r#"{"type": "string"}"#,
750        r#"{"type": "long", "logicalType": "time-micros"}"#,
751        CompatibilityError::WrongType { writer_schema_type: "String".to_string(), reader_schema_type: "TimeMicros".to_string() }
752    )]
753    // timestamp-nanos type
754    #[case(
755        r#"{"type": "string"}"#,
756        r#"{"type": "long", "logicalType": "timestamp-nanos"}"#,
757        CompatibilityError::WrongType { writer_schema_type: "String".to_string(), reader_schema_type: "TimestampNanos".to_string() }
758    )]
759    // timestamp-millis type
760    #[case(
761        r#"{"type": "string"}"#,
762        r#"{"type": "long", "logicalType": "timestamp-millis"}"#,
763        CompatibilityError::WrongType { writer_schema_type: "String".to_string(), reader_schema_type: "TimestampMillis".to_string() }
764    )]
765    // timestamp-micros type
766    #[case(
767        r#"{"type": "string"}"#,
768        r#"{"type": "long", "logicalType": "timestamp-micros"}"#,
769        CompatibilityError::WrongType { writer_schema_type: "String".to_string(), reader_schema_type: "TimestampMicros".to_string() }
770    )]
771    // local-timestamp-millis type
772    #[case(
773        r#"{"type": "string"}"#,
774        r#"{"type": "long", "logicalType": "local-timestamp-millis"}"#,
775        CompatibilityError::WrongType { writer_schema_type: "String".to_string(), reader_schema_type: "LocalTimestampMillis".to_string() }
776    )]
777    // local-timestamp-micros type
778    #[case(
779        r#"{"type": "string"}"#,
780        r#"{"type": "long", "logicalType": "local-timestamp-micros"}"#,
781        CompatibilityError::WrongType { writer_schema_type: "String".to_string(), reader_schema_type: "LocalTimestampMicros".to_string() }
782    )]
783    // local-timestamp-nanos type
784    #[case(
785        r#"{"type": "string"}"#,
786        r#"{"type": "long", "logicalType": "local-timestamp-nanos"}"#,
787        CompatibilityError::WrongType { writer_schema_type: "String".to_string(), reader_schema_type: "LocalTimestampNanos".to_string() }
788    )]
789    // Names are checked first, so this should not be a WrongType
790    #[case(
791        r#"{"type": "record", "name":"record_b", "fields": [{"type": "long", "name": "date"}]}"#,
792        r#"{"type": "fixed", "name": "EmployeeId", "size": 16}"#,
793        CompatibilityError::NameMismatch { writer_name: "record_b".to_string(), reader_name: "EmployeeId".to_string() }
794    )]
795    fn test_avro_3950_match_schemas_error(
796        #[case] writer_schema_str: &str,
797        #[case] reader_schema_str: &str,
798        #[case] expected_error: CompatibilityError,
799    ) {
800        let writer_schema = Schema::parse_str(writer_schema_str).unwrap();
801        let reader_schema = Schema::parse_str(reader_schema_str).unwrap();
802
803        assert_eq!(
804            expected_error,
805            SchemaCompatibility::can_read(&writer_schema, &reader_schema).unwrap_err()
806        )
807    }
808
809    #[test]
810    fn test_compatible_reader_writer_pairs() {
811        let uuid_fixed = FixedSchema {
812            name: Name::new("uuid_fixed").unwrap(),
813            aliases: None,
814            doc: None,
815            size: 16,
816            default: None,
817            attributes: Default::default(),
818        };
819
820        let compatible_schemas = vec![
821            (Schema::Null, Schema::Null),
822            (Schema::Long, Schema::Int),
823            (Schema::Float, Schema::Int),
824            (Schema::Float, Schema::Long),
825            (Schema::Double, Schema::Long),
826            (Schema::Double, Schema::Int),
827            (Schema::Double, Schema::Float),
828            (Schema::String, Schema::Bytes),
829            (Schema::Bytes, Schema::String),
830            // logical types
831            (
832                Schema::Uuid(UuidSchema::String),
833                Schema::Uuid(UuidSchema::String),
834            ),
835            (Schema::Uuid(UuidSchema::String), Schema::String),
836            (Schema::String, Schema::Uuid(UuidSchema::String)),
837            (
838                Schema::Uuid(UuidSchema::Bytes),
839                Schema::Uuid(UuidSchema::Bytes),
840            ),
841            (Schema::Uuid(UuidSchema::Bytes), Schema::Bytes),
842            (Schema::Bytes, Schema::Uuid(UuidSchema::Bytes)),
843            (
844                Schema::Uuid(UuidSchema::Fixed(uuid_fixed.clone())),
845                Schema::Uuid(UuidSchema::Fixed(uuid_fixed.clone())),
846            ),
847            (
848                Schema::Uuid(UuidSchema::Fixed(uuid_fixed.clone())),
849                Schema::Fixed(uuid_fixed.clone()),
850            ),
851            (
852                Schema::Fixed(uuid_fixed.clone()),
853                Schema::Uuid(UuidSchema::Fixed(uuid_fixed.clone())),
854            ),
855            (Schema::Date, Schema::Int),
856            (Schema::TimeMillis, Schema::Int),
857            (Schema::TimeMicros, Schema::Long),
858            (Schema::TimestampMillis, Schema::Long),
859            (Schema::TimestampMicros, Schema::Long),
860            (Schema::TimestampNanos, Schema::Long),
861            (Schema::LocalTimestampMillis, Schema::Long),
862            (Schema::LocalTimestampMicros, Schema::Long),
863            (Schema::LocalTimestampNanos, Schema::Long),
864            (Schema::Int, Schema::Date),
865            (Schema::Int, Schema::TimeMillis),
866            (Schema::Long, Schema::TimeMicros),
867            (Schema::Long, Schema::TimestampMillis),
868            (Schema::Long, Schema::TimestampMicros),
869            (Schema::Long, Schema::TimestampNanos),
870            (Schema::Long, Schema::LocalTimestampMillis),
871            (Schema::Long, Schema::LocalTimestampMicros),
872            (Schema::Long, Schema::LocalTimestampNanos),
873            (int_array_schema(), int_array_schema()),
874            (long_array_schema(), int_array_schema()),
875            (int_map_schema(), int_map_schema()),
876            (long_map_schema(), int_map_schema()),
877            (enum1_ab_schema(), enum1_ab_schema()),
878            (enum1_abc_schema(), enum1_ab_schema()),
879            (empty_union_schema(), empty_union_schema()),
880            (int_union_schema(), int_union_schema()),
881            (int_string_union_schema(), string_int_union_schema()),
882            (int_union_schema(), empty_union_schema()),
883            (long_union_schema(), int_union_schema()),
884            (int_union_schema(), Schema::Int),
885            (Schema::Int, int_union_schema()),
886            (empty_record1_schema(), empty_record1_schema()),
887            (empty_record1_schema(), a_int_record1_schema()),
888            (a_int_record1_schema(), a_int_record1_schema()),
889            (a_dint_record1_schema(), a_int_record1_schema()),
890            (a_dint_record1_schema(), a_dint_record1_schema()),
891            (a_int_record1_schema(), a_dint_record1_schema()),
892            (a_long_record1_schema(), a_int_record1_schema()),
893            (a_int_record1_schema(), a_int_b_int_record1_schema()),
894            (a_dint_record1_schema(), a_int_b_int_record1_schema()),
895            (a_int_b_dint_record1_schema(), a_int_record1_schema()),
896            (a_dint_b_dint_record1_schema(), empty_record1_schema()),
897            (a_dint_b_dint_record1_schema(), a_int_record1_schema()),
898            (a_int_b_int_record1_schema(), a_dint_b_dint_record1_schema()),
899            (int_list_record_schema(), int_list_record_schema()),
900            (long_list_record_schema(), long_list_record_schema()),
901            (long_list_record_schema(), int_list_record_schema()),
902            (nested_optional_record(), nested_record()),
903        ];
904
905        for (reader, writer) in compatible_schemas {
906            SchemaCompatibility::can_read(&writer, &reader).unwrap();
907        }
908    }
909
910    fn writer_schema() -> Schema {
911        Schema::parse_str(
912            r#"
913      {"type":"record", "name":"Record", "fields":[
914        {"name":"oldfield1", "type":"int"},
915        {"name":"oldfield2", "type":"string"}
916      ]}
917"#,
918        )
919        .unwrap()
920    }
921
922    #[test]
923    fn test_missing_field() -> TestResult {
924        let reader_schema = Schema::parse_str(
925            r#"
926      {"type":"record", "name":"Record", "fields":[
927        {"name":"oldfield1", "type":"int"}
928      ]}
929"#,
930        )?;
931        assert!(SchemaCompatibility::can_read(&writer_schema(), &reader_schema,).is_ok());
932        assert_eq!(
933            CompatibilityError::MissingDefaultValue(String::from("oldfield2")),
934            SchemaCompatibility::can_read(&reader_schema, &writer_schema()).unwrap_err()
935        );
936
937        Ok(())
938    }
939
940    #[test]
941    fn test_missing_second_field() -> TestResult {
942        let reader_schema = Schema::parse_str(
943            r#"
944        {"type":"record", "name":"Record", "fields":[
945          {"name":"oldfield2", "type":"string"}
946        ]}
947"#,
948        )?;
949        assert!(SchemaCompatibility::can_read(&writer_schema(), &reader_schema).is_ok());
950        assert_eq!(
951            CompatibilityError::MissingDefaultValue(String::from("oldfield1")),
952            SchemaCompatibility::can_read(&reader_schema, &writer_schema()).unwrap_err()
953        );
954
955        Ok(())
956    }
957
958    #[test]
959    fn test_all_fields() -> TestResult {
960        let reader_schema = Schema::parse_str(
961            r#"
962        {"type":"record", "name":"Record", "fields":[
963          {"name":"oldfield1", "type":"int"},
964          {"name":"oldfield2", "type":"string"}
965        ]}
966"#,
967        )?;
968        assert!(SchemaCompatibility::can_read(&writer_schema(), &reader_schema).is_ok());
969        assert!(SchemaCompatibility::can_read(&reader_schema, &writer_schema()).is_ok());
970
971        Ok(())
972    }
973
974    #[test]
975    fn test_new_field_with_default() -> TestResult {
976        let reader_schema = Schema::parse_str(
977            r#"
978        {"type":"record", "name":"Record", "fields":[
979          {"name":"oldfield1", "type":"int"},
980          {"name":"newfield1", "type":"int", "default":42}
981        ]}
982"#,
983        )?;
984        assert!(SchemaCompatibility::can_read(&writer_schema(), &reader_schema).is_ok());
985        assert_eq!(
986            CompatibilityError::MissingDefaultValue(String::from("oldfield2")),
987            SchemaCompatibility::can_read(&reader_schema, &writer_schema()).unwrap_err()
988        );
989
990        Ok(())
991    }
992
993    #[test]
994    fn test_new_field() -> TestResult {
995        let reader_schema = Schema::parse_str(
996            r#"
997        {"type":"record", "name":"Record", "fields":[
998          {"name":"oldfield1", "type":"int"},
999          {"name":"newfield1", "type":"int"}
1000        ]}
1001"#,
1002        )?;
1003        assert_eq!(
1004            CompatibilityError::MissingDefaultValue(String::from("newfield1")),
1005            SchemaCompatibility::can_read(&writer_schema(), &reader_schema).unwrap_err()
1006        );
1007        assert_eq!(
1008            CompatibilityError::MissingDefaultValue(String::from("oldfield2")),
1009            SchemaCompatibility::can_read(&reader_schema, &writer_schema()).unwrap_err()
1010        );
1011
1012        Ok(())
1013    }
1014
1015    #[test]
1016    fn test_array_writer_schema() {
1017        let valid_reader = string_array_schema();
1018        let invalid_reader = string_map_schema();
1019
1020        assert_eq!(
1021            Compatibility::Full,
1022            SchemaCompatibility::can_read(&string_array_schema(), &valid_reader).unwrap()
1023        );
1024        assert!(matches!(
1025            SchemaCompatibility::can_read(&string_array_schema(), &invalid_reader),
1026            Err(CompatibilityError::WrongType { .. }),
1027        ));
1028    }
1029
1030    #[test]
1031    fn test_primitive_writer_schema() {
1032        let valid_reader = Schema::String;
1033        assert!(SchemaCompatibility::can_read(&Schema::String, &valid_reader).is_ok());
1034        assert_eq!(
1035            CompatibilityError::WrongType {
1036                writer_schema_type: "Int".to_string(),
1037                reader_schema_type: "String".to_string()
1038            },
1039            SchemaCompatibility::can_read(&Schema::Int, &Schema::String).unwrap_err()
1040        );
1041    }
1042
1043    #[test]
1044    fn test_union_reader_writer_subset_incompatibility() {
1045        // reader union schema must contain all writer union branches
1046        let union_writer = union_schema(vec![Schema::Int, Schema::String]);
1047        let union_reader = union_schema(vec![Schema::String]);
1048
1049        assert_eq!(
1050            Compatibility::Partial,
1051            SchemaCompatibility::can_read(&union_writer, &union_reader).unwrap()
1052        );
1053        assert_eq!(
1054            Compatibility::Full,
1055            SchemaCompatibility::can_read(&union_reader, &union_writer).unwrap()
1056        );
1057    }
1058
1059    #[test]
1060    fn test_incompatible_record_field() -> TestResult {
1061        let string_schema = Schema::parse_str(
1062            r#"
1063        {"type":"record", "name":"MyRecord", "namespace":"ns", "fields": [
1064            {"name":"field1", "type":"string"}
1065        ]}
1066        "#,
1067        )?;
1068
1069        let int_schema = Schema::parse_str(
1070            r#"
1071              {"type":"record", "name":"MyRecord", "namespace":"ns", "fields": [
1072                {"name":"field1", "type":"int"}
1073              ]}
1074        "#,
1075        )?;
1076
1077        assert_eq!(
1078            CompatibilityError::FieldTypeMismatch(
1079                "field1".to_owned(),
1080                Box::new(CompatibilityError::WrongType {
1081                    writer_schema_type: "String".to_string(),
1082                    reader_schema_type: "Int".to_string()
1083                })
1084            ),
1085            SchemaCompatibility::can_read(&string_schema, &int_schema).unwrap_err()
1086        );
1087
1088        Ok(())
1089    }
1090
1091    #[test]
1092    fn test_enum_symbols() -> TestResult {
1093        let enum_schema1 = Schema::parse_str(
1094            r#"
1095      {"type":"enum", "name":"MyEnum", "symbols":["A","B"]}
1096"#,
1097        )?;
1098        let enum_schema2 =
1099            Schema::parse_str(r#"{"type":"enum", "name":"MyEnum", "symbols":["A","B","C"]}"#)?;
1100        assert_eq!(
1101            Compatibility::Partial,
1102            SchemaCompatibility::can_read(&enum_schema2, &enum_schema1)?
1103        );
1104        assert_eq!(
1105            Compatibility::Full,
1106            SchemaCompatibility::can_read(&enum_schema1, &enum_schema2)?
1107        );
1108
1109        Ok(())
1110    }
1111
1112    fn point_2d_schema() -> Schema {
1113        Schema::parse_str(
1114            r#"
1115      {"type":"record", "name":"Point2D", "fields":[
1116        {"name":"x", "type":"double"},
1117        {"name":"y", "type":"double"}
1118      ]}
1119    "#,
1120        )
1121        .unwrap()
1122    }
1123
1124    fn point_2d_fullname_schema() -> Schema {
1125        Schema::parse_str(
1126            r#"
1127      {"type":"record", "name":"Point", "namespace":"written", "fields":[
1128        {"name":"x", "type":"double"},
1129        {"name":"y", "type":"double"}
1130      ]}
1131    "#,
1132        )
1133        .unwrap()
1134    }
1135
1136    fn point_3d_no_default_schema() -> Schema {
1137        Schema::parse_str(
1138            r#"
1139      {"type":"record", "name":"Point", "fields":[
1140        {"name":"x", "type":"double"},
1141        {"name":"y", "type":"double"},
1142        {"name":"z", "type":"double"}
1143      ]}
1144    "#,
1145        )
1146        .unwrap()
1147    }
1148
1149    fn point_3d_schema() -> Schema {
1150        Schema::parse_str(
1151            r#"
1152      {"type":"record", "name":"Point3D", "fields":[
1153        {"name":"x", "type":"double"},
1154        {"name":"y", "type":"double"},
1155        {"name":"z", "type":"double", "default": 0.0}
1156      ]}
1157    "#,
1158        )
1159        .unwrap()
1160    }
1161
1162    fn point_3d_match_name_schema() -> Schema {
1163        Schema::parse_str(
1164            r#"
1165      {"type":"record", "name":"Point", "fields":[
1166        {"name":"x", "type":"double"},
1167        {"name":"y", "type":"double"},
1168        {"name":"z", "type":"double", "default": 0.0}
1169      ]}
1170    "#,
1171        )
1172        .unwrap()
1173    }
1174
1175    #[test]
1176    fn test_union_resolution_no_structure_match() {
1177        // short name match, but no structure match
1178        let read_schema = union_schema(vec![Schema::Null, point_3d_no_default_schema()]);
1179        assert_eq!(
1180            CompatibilityError::SchemaMismatchAllUnionElements,
1181            SchemaCompatibility::can_read(&point_2d_fullname_schema(), &read_schema).unwrap_err()
1182        );
1183    }
1184
1185    #[test]
1186    fn test_union_resolution_first_structure_match_2d() {
1187        // multiple structure matches with no name matches
1188        let read_schema = union_schema(vec![
1189            Schema::Null,
1190            point_3d_no_default_schema(),
1191            point_2d_schema(),
1192            point_3d_schema(),
1193        ]);
1194        assert_eq!(
1195            CompatibilityError::SchemaMismatchAllUnionElements,
1196            SchemaCompatibility::can_read(&point_2d_fullname_schema(), &read_schema).unwrap_err()
1197        );
1198    }
1199
1200    #[test]
1201    fn test_union_resolution_first_structure_match_3d() {
1202        // multiple structure matches with no name matches
1203        let read_schema = union_schema(vec![
1204            Schema::Null,
1205            point_3d_no_default_schema(),
1206            point_3d_schema(),
1207            point_2d_schema(),
1208        ]);
1209        assert_eq!(
1210            CompatibilityError::SchemaMismatchAllUnionElements,
1211            SchemaCompatibility::can_read(&point_2d_fullname_schema(), &read_schema).unwrap_err()
1212        );
1213    }
1214
1215    #[test]
1216    fn test_union_resolution_named_structure_match() {
1217        // multiple structure matches with a short name match
1218        let read_schema = union_schema(vec![
1219            Schema::Null,
1220            point_2d_schema(),
1221            point_3d_match_name_schema(),
1222            point_3d_schema(),
1223        ]);
1224        assert_eq!(
1225            CompatibilityError::SchemaMismatchAllUnionElements,
1226            SchemaCompatibility::can_read(&point_2d_fullname_schema(), &read_schema).unwrap_err()
1227        );
1228    }
1229
1230    #[test]
1231    fn test_union_resolution_full_name_match() {
1232        // there is a full name match that should be chosen
1233        let read_schema = union_schema(vec![
1234            Schema::Null,
1235            point_2d_schema(),
1236            point_3d_match_name_schema(),
1237            point_3d_schema(),
1238            point_2d_fullname_schema(),
1239        ]);
1240        assert!(SchemaCompatibility::can_read(&point_2d_fullname_schema(), &read_schema).is_ok());
1241    }
1242
1243    #[test]
1244    fn test_avro_3772_enum_default() -> TestResult {
1245        let writer_raw_schema = r#"
1246        {
1247          "type": "record",
1248          "name": "test",
1249          "fields": [
1250            {"name": "a", "type": "long", "default": 42},
1251            {"name": "b", "type": "string"},
1252            {
1253              "name": "c",
1254              "type": {
1255                "type": "enum",
1256                "name": "suit",
1257                "symbols": ["diamonds", "spades", "clubs", "hearts"],
1258                "default": "spades"
1259              }
1260            }
1261          ]
1262        }
1263        "#;
1264
1265        let reader_raw_schema = r#"
1266        {
1267          "type": "record",
1268          "name": "test",
1269          "fields": [
1270            {"name": "a", "type": "long", "default": 42},
1271            {"name": "b", "type": "string"},
1272            {
1273              "name": "c",
1274              "type": {
1275                 "type": "enum",
1276                 "name": "suit",
1277                 "symbols": ["diamonds", "spades", "ninja", "hearts"],
1278                 "default": "spades"
1279              }
1280            }
1281          ]
1282        }
1283      "#;
1284        let writer_schema = Schema::parse_str(writer_raw_schema)?;
1285        let reader_schema = Schema::parse_str(reader_raw_schema)?;
1286        let mut writer = Writer::with_codec(&writer_schema, Vec::new(), Codec::Null)?;
1287        let mut record = Record::new(writer.schema()).unwrap();
1288        record.put("a", 27i64);
1289        record.put("b", "foo");
1290        record.put("c", "clubs");
1291        writer.append_value(record).unwrap();
1292        let input = writer.into_inner()?;
1293        let mut reader = Reader::with_schema(&reader_schema, &input[..])?;
1294        assert_eq!(
1295            reader.next().unwrap().unwrap(),
1296            Value::Record(vec![
1297                ("a".to_string(), Value::Long(27)),
1298                ("b".to_string(), Value::String("foo".to_string())),
1299                ("c".to_string(), Value::Enum(1, "spades".to_string())),
1300            ])
1301        );
1302        assert!(reader.next().is_none());
1303
1304        Ok(())
1305    }
1306
1307    #[test]
1308    fn test_avro_3772_enum_default_less_symbols() -> TestResult {
1309        let writer_raw_schema = r#"
1310        {
1311          "type": "record",
1312          "name": "test",
1313          "fields": [
1314            {"name": "a", "type": "long", "default": 42},
1315            {"name": "b", "type": "string"},
1316            {
1317              "name": "c",
1318              "type": {
1319                "type": "enum",
1320                "name": "suit",
1321                "symbols": ["diamonds", "spades", "clubs", "hearts"],
1322                "default": "spades"
1323              }
1324            }
1325          ]
1326        }
1327        "#;
1328
1329        let reader_raw_schema = r#"
1330        {
1331          "type": "record",
1332          "name": "test",
1333          "fields": [
1334            {"name": "a", "type": "long", "default": 42},
1335            {"name": "b", "type": "string"},
1336            {
1337              "name": "c",
1338              "type": {
1339                 "type": "enum",
1340                  "name": "suit",
1341                  "symbols": ["hearts", "spades"],
1342                  "default": "spades"
1343              }
1344            }
1345          ]
1346        }
1347      "#;
1348        let writer_schema = Schema::parse_str(writer_raw_schema)?;
1349        let reader_schema = Schema::parse_str(reader_raw_schema)?;
1350        let mut writer = Writer::with_codec(&writer_schema, Vec::new(), Codec::Null)?;
1351        let mut record = Record::new(writer.schema()).unwrap();
1352        record.put("a", 27i64);
1353        record.put("b", "foo");
1354        record.put("c", "hearts");
1355        writer.append_value(record).unwrap();
1356        let input = writer.into_inner()?;
1357        let mut reader = Reader::with_schema(&reader_schema, &input[..])?;
1358        assert_eq!(
1359            reader.next().unwrap().unwrap(),
1360            Value::Record(vec![
1361                ("a".to_string(), Value::Long(27)),
1362                ("b".to_string(), Value::String("foo".to_string())),
1363                ("c".to_string(), Value::Enum(0, "hearts".to_string())),
1364            ])
1365        );
1366        assert!(reader.next().is_none());
1367
1368        Ok(())
1369    }
1370
1371    #[test]
1372    fn avro_3894_take_aliases_into_account_when_serializing_for_schema_compatibility() -> TestResult
1373    {
1374        let schema_v1 = Schema::parse_str(
1375            r#"
1376        {
1377            "type": "record",
1378            "name": "Conference",
1379            "namespace": "advdaba",
1380            "fields": [
1381                {"type": "string", "name": "name"},
1382                {"type": "long", "name": "date"}
1383            ]
1384        }"#,
1385        )?;
1386
1387        let schema_v2 = Schema::parse_str(
1388            r#"
1389        {
1390            "type": "record",
1391            "name": "Conference",
1392            "namespace": "advdaba",
1393            "fields": [
1394                {"type": "string", "name": "name"},
1395                {"type": "long", "name": "date", "aliases" : [ "time" ]}
1396            ]
1397        }"#,
1398        )?;
1399
1400        assert!(SchemaCompatibility::mutual_read(&schema_v1, &schema_v2).is_ok());
1401
1402        Ok(())
1403    }
1404
1405    #[test]
1406    fn avro_3917_take_aliases_into_account_for_schema_compatibility() -> TestResult {
1407        let schema_v1 = Schema::parse_str(
1408            r#"
1409        {
1410            "type": "record",
1411            "name": "Conference",
1412            "namespace": "advdaba",
1413            "fields": [
1414                {"type": "string", "name": "name"},
1415                {"type": "long", "name": "date", "aliases" : [ "time" ]}
1416            ]
1417        }"#,
1418        )?;
1419
1420        let schema_v2 = Schema::parse_str(
1421            r#"
1422        {
1423            "type": "record",
1424            "name": "Conference",
1425            "namespace": "advdaba",
1426            "fields": [
1427                {"type": "string", "name": "name"},
1428                {"type": "long", "name": "time"}
1429            ]
1430        }"#,
1431        )?;
1432
1433        assert_eq!(
1434            Compatibility::Full,
1435            SchemaCompatibility::can_read(&schema_v2, &schema_v1)?
1436        );
1437        assert_eq!(
1438            CompatibilityError::MissingDefaultValue(String::from("time")),
1439            SchemaCompatibility::can_read(&schema_v1, &schema_v2).unwrap_err()
1440        );
1441
1442        Ok(())
1443    }
1444
1445    #[test]
1446    fn test_avro_3898_record_schemas_match_by_unqualified_name() -> TestResult {
1447        let schemas = [
1448            // Record schemas
1449            (
1450                Schema::parse_str(
1451                    r#"{
1452              "type": "record",
1453              "name": "Statistics",
1454              "fields": [
1455                { "name": "success", "type": "int" },
1456                { "name": "fail", "type": "int" },
1457                { "name": "time", "type": "string" },
1458                { "name": "max", "type": "int", "default": 0 }
1459              ]
1460            }"#,
1461                )?,
1462                Schema::parse_str(
1463                    r#"{
1464              "type": "record",
1465              "name": "Statistics",
1466              "namespace": "my.namespace",
1467              "fields": [
1468                { "name": "success", "type": "int" },
1469                { "name": "fail", "type": "int" },
1470                { "name": "time", "type": "string" },
1471                { "name": "average", "type": "int", "default": 0}
1472              ]
1473            }"#,
1474                )?,
1475            ),
1476            // Enum schemas
1477            (
1478                Schema::parse_str(
1479                    r#"{
1480                    "type": "enum",
1481                    "name": "Suit",
1482                    "symbols": ["diamonds", "spades", "clubs"]
1483                }"#,
1484                )?,
1485                Schema::parse_str(
1486                    r#"{
1487                    "type": "enum",
1488                    "name": "Suit",
1489                    "namespace": "my.namespace",
1490                    "symbols": ["diamonds", "spades", "clubs", "hearts"]
1491                }"#,
1492                )?,
1493            ),
1494            // Fixed schemas
1495            (
1496                Schema::parse_str(
1497                    r#"{
1498                    "type": "fixed",
1499                    "name": "EmployeeId",
1500                    "size": 16
1501                }"#,
1502                )?,
1503                Schema::parse_str(
1504                    r#"{
1505                    "type": "fixed",
1506                    "name": "EmployeeId",
1507                    "namespace": "my.namespace",
1508                    "size": 16
1509                }"#,
1510                )?,
1511            ),
1512        ];
1513
1514        for (schema_1, schema_2) in schemas {
1515            assert!(SchemaCompatibility::can_read(&schema_1, &schema_2).is_ok());
1516        }
1517
1518        Ok(())
1519    }
1520
1521    #[test]
1522    fn test_can_read_compatibility_errors() -> TestResult {
1523        let schemas = [
1524            (
1525                Schema::parse_str(
1526                    r#"{
1527                    "type": "record",
1528                    "name": "StatisticsMap",
1529                    "fields": [
1530                        {"name": "average", "type": "int", "default": 0},
1531                        {"name": "success", "type": {"type": "map", "values": "int"}}
1532                    ]
1533                }"#,
1534                )?,
1535                Schema::parse_str(
1536                    r#"{
1537                    "type": "record",
1538                    "name": "StatisticsMap",
1539                    "fields": [
1540                        {"name": "average", "type": "int", "default": 0},
1541                        {"name": "success", "type": ["null", {"type": "map", "values": "int"}], "default": null}
1542                    ]
1543                }"#,
1544                )?,
1545            ),
1546            (
1547                Schema::parse_str(
1548                    r#"{
1549                        "type": "record",
1550                        "name": "StatisticsArray",
1551                        "fields": [
1552                            {"name": "max_values", "type": {"type": "array", "items": "int"}}
1553                        ]
1554                    }"#,
1555                )?,
1556                Schema::parse_str(
1557                    r#"{
1558                        "type": "record",
1559                        "name": "StatisticsArray",
1560                        "fields": [
1561                            {"name": "max_values", "type": ["null", {"type": "array", "items": "int"}], "default": null}
1562                        ]
1563                    }"#,
1564                )?,
1565            ),
1566        ];
1567
1568        for (schema_1, schema_2) in schemas {
1569            assert_eq!(
1570                Compatibility::Full,
1571                SchemaCompatibility::can_read(&schema_1, &schema_2).unwrap()
1572            );
1573            assert_eq!(
1574                Compatibility::Partial,
1575                SchemaCompatibility::can_read(&schema_2, &schema_1).unwrap()
1576            );
1577        }
1578
1579        Ok(())
1580    }
1581
1582    #[test]
1583    fn avro_3974_can_read_schema_references() -> TestResult {
1584        let schema_strs = vec![
1585            r#"{
1586          "type": "record",
1587          "name": "Child",
1588          "namespace": "avro",
1589          "fields": [
1590            {
1591              "name": "val",
1592              "type": "int"
1593            }
1594          ]
1595        }
1596        "#,
1597            r#"{
1598          "type": "record",
1599          "name": "Parent",
1600          "namespace": "avro",
1601          "fields": [
1602            {
1603              "name": "child",
1604              "type": "avro.Child"
1605            }
1606          ]
1607        }
1608        "#,
1609        ];
1610
1611        let schemas = Schema::parse_list(schema_strs).unwrap();
1612        SchemaCompatibility::can_read(&schemas[1], &schemas[1])?;
1613
1614        Ok(())
1615    }
1616
1617    #[test]
1618    fn duration_and_fixed_of_different_size() -> TestResult {
1619        let schema_strs = vec![
1620            r#"{
1621          "type": "fixed",
1622          "name": "Fixed25",
1623          "size": 25
1624        }
1625        "#,
1626            r#"{
1627          "type": "fixed",
1628          "logicalType": "duration",
1629          "name": "Duration",
1630          "size": 12
1631        }
1632        "#,
1633        ];
1634
1635        let schemas = Schema::parse_list(schema_strs).unwrap();
1636        assert!(SchemaCompatibility::can_read(&schemas[0], &schemas[1]).is_err());
1637        assert!(SchemaCompatibility::can_read(&schemas[1], &schemas[0]).is_err());
1638        SchemaCompatibility::can_read(&schemas[1], &schemas[1])?;
1639        SchemaCompatibility::can_read(&schemas[0], &schemas[0])?;
1640
1641        Ok(())
1642    }
1643
1644    #[test]
1645    fn avro_rs_342_decimal_fixed_and_bytes() -> TestResult {
1646        let bytes = Schema::Decimal(DecimalSchema {
1647            precision: 20,
1648            scale: 0,
1649            inner: InnerDecimalSchema::Bytes,
1650        });
1651        let fixed = Schema::Decimal(DecimalSchema {
1652            precision: 20,
1653            scale: 0,
1654            inner: InnerDecimalSchema::Fixed(FixedSchema {
1655                name: Name::new("DecimalFixed")?,
1656                aliases: None,
1657                doc: None,
1658                size: 20,
1659                default: None,
1660                attributes: BTreeMap::default(),
1661            }),
1662        });
1663
1664        assert_eq!(
1665            Compatibility::Full,
1666            SchemaCompatibility::mutual_read(&bytes, &fixed)?
1667        );
1668
1669        let value = Value::Decimal(Decimal::from(vec![1; 10]));
1670        let fixed_value = value.clone().resolve(&fixed)?;
1671        let bytes_value = value.resolve(&bytes)?;
1672
1673        assert_eq!(fixed_value, bytes_value);
1674
1675        Ok(())
1676    }
1677}