apache_avro/
schema_compatibility.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Check if the reader's schema is compatible with the writer's schema.
19//!
20//! To allow for schema evolution, Avro supports resolving the writer's schema to the reader's schema.
21//! To check if this is possible, [`SchemaCompatibility`] can be used. For the complete rules see
22//! [the specification](https://avro.apache.org/docs/++version++/specification/#schema-resolution).
23//!
24//! There are three levels of compatibility.
25//!
26//! 1. Fully compatible schemas (`Ok(Compatibility::Full)`)
27//!
28//! For example, an integer can always be resolved to a long:
29//!
30//! ```
31//! # use apache_avro::{Schema, schema_compatibility::{Compatibility, SchemaCompatibility}};
32//! let writers_schema = Schema::array(Schema::Int).build();
33//! let readers_schema = Schema::array(Schema::Long).build();
34//! assert_eq!(SchemaCompatibility::can_read(&writers_schema, &readers_schema), Ok(Compatibility::Full));
35//! ```
36//!
37//! 2. Incompatible schemas (`Err`)
38//!
39//! For example, a long can never be resolved to an int:
40//!
41//! ```
42//! # use apache_avro::{Schema, schema_compatibility::SchemaCompatibility};
43//! let writers_schema = Schema::array(Schema::Long).build();
44//! let readers_schema = Schema::array(Schema::Int).build();
45//! assert!(SchemaCompatibility::can_read(&writers_schema, &readers_schema).is_err());
46//! ```
47//!
48//! 3. Partially compatible schemas (`Ok(Compatibility::Partial)`)
49//!
50//! For example, a union of a string and integer is only compatible with an integer if an integer was written:
51//!
52//! ```
53//! # use apache_avro::{Error, Schema, schema_compatibility::{Compatibility, SchemaCompatibility}};
54//! let writers_schema = Schema::union(vec![Schema::Int, Schema::String])?;
55//! let readers_schema = Schema::Int;
56//! assert_eq!(SchemaCompatibility::can_read(&writers_schema, &readers_schema), Ok(Compatibility::Partial));
57//! # Ok::<(), Error>(())
58//! ```
59//!
60use crate::{
61    error::CompatibilityError,
62    schema::{
63        ArraySchema, DecimalSchema, EnumSchema, InnerDecimalSchema, MapSchema, RecordSchema,
64        Schema, UuidSchema,
65    },
66};
67use std::{
68    collections::{HashMap, hash_map::DefaultHasher},
69    hash::Hasher,
70    iter::once,
71    ops::BitAndAssign,
72    ptr,
73};
74
75/// Check if two schemas can be resolved.
76///
77/// See [the module documentation] for more details.
78///
79/// [the module documentation]: crate::schema_compatibility
80pub struct SchemaCompatibility;
81
82impl SchemaCompatibility {
83    /// Recursively check if the reader's schema can be resolved to the writer's schema
84    pub fn can_read(
85        writers_schema: &Schema,
86        readers_schema: &Schema,
87    ) -> Result<Compatibility, CompatibilityError> {
88        let mut c = Checker::new();
89        c.can_read(writers_schema, readers_schema)
90    }
91
92    /// Recursively check if both schemas can be resolved to each other
93    pub fn mutual_read(
94        schema_a: &Schema,
95        schema_b: &Schema,
96    ) -> Result<Compatibility, CompatibilityError> {
97        let mut c = SchemaCompatibility::can_read(schema_a, schema_b)?;
98        c &= SchemaCompatibility::can_read(schema_b, schema_a)?;
99        Ok(c)
100    }
101}
102
103/// How compatible two schemas are.
104#[derive(Debug, Copy, Clone, Eq, PartialEq)]
105pub enum Compatibility {
106    /// Full compatibility, resolving will always work.
107    Full,
108    /// Partial compatibility, resolving may error.
109    ///
110    /// This can happen if an enum doesn't have all fields, or unions don't entirely overlap.
111    Partial,
112}
113
114impl BitAndAssign for Compatibility {
115    /// Combine two compatibilities.
116    ///
117    /// # Truth table
118    /// |         | Full    | Partial |
119    /// | ------- | ------- | ------- |
120    /// | Full    | Full    | Partial |
121    /// | Partial | Partial | Partial |
122    fn bitand_assign(&mut self, rhs: Self) {
123        match (*self, rhs) {
124            (Self::Full, Self::Full) => *self = Self::Full,
125            _ => *self = Self::Partial,
126        }
127    }
128}
129
130struct Checker {
131    recursion: HashMap<(u64, u64), Compatibility>,
132}
133
134impl Checker {
135    /// Create a new checker, with recursion set to an empty set.
136    pub(crate) fn new() -> Self {
137        Self {
138            recursion: HashMap::new(),
139        }
140    }
141
142    /// Check if the reader schema can be resolved from the writer schema.
143    pub(crate) fn full_match_schemas(
144        &mut self,
145        writers_schema: &Schema,
146        readers_schema: &Schema,
147    ) -> Result<Compatibility, CompatibilityError> {
148        // Hash both reader and writer based on their pointer value. This is a fast way to see if
149        // we get the exact same schemas multiple times (because of recursive types)
150        let key = (
151            Self::pointer_hash(writers_schema),
152            Self::pointer_hash(readers_schema),
153        );
154
155        // If we already saw this pairing, return the previous value
156        if let Some(c) = self.recursion.get(&key).copied() {
157            Ok(c)
158        } else {
159            let c = self.inner_full_match_schemas(writers_schema, readers_schema)?;
160            // Insert the new value
161            self.recursion.insert(key, c);
162            Ok(c)
163        }
164    }
165
166    /// Hash a schema based only on its pointer value.
167    fn pointer_hash(schema: &Schema) -> u64 {
168        let mut hasher = DefaultHasher::new();
169        ptr::hash(schema, &mut hasher);
170        hasher.finish()
171    }
172
173    /// The actual implementation of [`full_match_schemas`] but without the recursion protection.
174    ///
175    /// This function should never be called directly as it can recurse infinitely on recursive types.
176    #[rustfmt::skip]
177    fn inner_full_match_schemas(
178        &mut self,
179        writers_schema: &Schema,
180        readers_schema: &Schema,
181    ) -> Result<Compatibility, CompatibilityError> {
182        // Compare unqualified names if the schemas have them
183        if let Some(w_name) = writers_schema.name()
184            && let Some(r_name) = readers_schema.name()
185            && w_name.name != r_name.name
186        {
187            return Err(CompatibilityError::NameMismatch {
188                writer_name: w_name.name.clone(),
189                reader_name: r_name.name.clone(),
190            });
191        }
192
193        // Logical types are downgraded to their actual type
194        match (writers_schema, readers_schema) {
195            (Schema::Ref { name: w_name }, Schema::Ref { name: r_name }) => {
196                if r_name == w_name {
197                    Ok(Compatibility::Full)
198                } else {
199                    Err(CompatibilityError::NameMismatch {
200                        writer_name: w_name.fullname(None),
201                        reader_name: r_name.fullname(None),
202                    })
203                }
204            }
205            (Schema::Union(writer), Schema::Union(reader)) => {
206                let mut any = false;
207                let mut all = true;
208                for writer in &writer.schemas {
209                    // Try to find a reader variant that is fully compatible with this writer variant.
210                    // In case that does not exist, we keep track of any partial compatibility we find.
211                    let mut local_any = false;
212                    all &= reader.schemas.iter().any(|reader| {
213                        match self.full_match_schemas(writer, reader) {
214                            Ok(Compatibility::Full) => {
215                                local_any = true;
216                                true
217                            }
218                            Ok(Compatibility::Partial) => {
219                                local_any = true;
220                                false
221                            }
222                            Err(_) => false,
223                        }
224                    });
225                    // Save any match we found
226                    any |= local_any;
227                }
228                if all {
229                    // All writer variants are fully compatible with reader variants
230                    Ok(Compatibility::Full)
231                } else if any {
232                    // At least one writer variant is partially or fully compatible with a reader variant
233                    Ok(Compatibility::Partial)
234                } else {
235                    Err(CompatibilityError::MissingUnionElements)
236                }
237            }
238            (Schema::Union(writer), _) => {
239                // Check if all writer variants are fully compatible with the reader schema.
240                // We keep track of if we see any (partial) compatibility.
241                let mut any = false;
242                let mut all = true;
243                for writer in &writer.schemas {
244                    match self.full_match_schemas(writer, readers_schema) {
245                        Ok(Compatibility::Full) => any = true,
246                        Ok(Compatibility::Partial) => {
247                            any = true;
248                            all = false;
249                        }
250                        Err(_) => {
251                            all = false;
252                        }
253                    }
254                }
255                if all {
256                    // All writer variants are fully compatible with the reader schema
257                    Ok(Compatibility::Full)
258                } else if any {
259                    // At least one writer variant is partially compatible with the reader schema
260                    Ok(Compatibility::Partial)
261                } else {
262                    Err(CompatibilityError::SchemaMismatchAllUnionElements)
263                }
264            }
265            (_, Schema::Union(reader)) => {
266                // Try to find a fully compatible reader variant for the writer schema.
267                // In case that does not exist, we keep track of any partial compatibility.
268                let mut partial = false;
269                if reader.schemas.iter().any(|reader| {
270                    match self.full_match_schemas(writers_schema, reader) {
271                        Ok(Compatibility::Full) => true,
272                        Ok(Compatibility::Partial) => {
273                            partial = true;
274                            false
275                        }
276                        Err(_) => false,
277                    }
278                }) {
279                    // At least one reader variant is fully compatible with the writer schema
280                    Ok(Compatibility::Full)
281                } else if partial {
282                    // At least one reader variant is partially compatible with the writer schema
283                    Ok(Compatibility::Partial)
284                } else {
285                    Err(CompatibilityError::SchemaMismatchAllUnionElements)
286                }
287            }
288            (Schema::Null, Schema::Null) => Ok(Compatibility::Full),
289            (Schema::Boolean, Schema::Boolean) => Ok(Compatibility::Full),
290            // int promotes to long, float and double
291            (
292                Schema::Int | Schema::Date | Schema::TimeMillis,
293                Schema::Int | Schema::Long | Schema::Float | Schema::Double | Schema::Date
294                | Schema::TimeMillis | Schema::TimeMicros | Schema::TimestampMillis
295                | Schema::TimestampMicros | Schema::TimestampNanos | Schema::LocalTimestampMillis
296                | Schema::LocalTimestampMicros | Schema::LocalTimestampNanos,
297            ) => Ok(Compatibility::Full),
298            // long promotes to float and double
299            (
300                Schema::Long | Schema::TimeMicros | Schema::TimestampMillis
301                | Schema::TimestampMicros | Schema::TimestampNanos | Schema::LocalTimestampMillis
302                | Schema::LocalTimestampMicros | Schema::LocalTimestampNanos,
303                Schema::Long | Schema::Float | Schema::Double | Schema::TimeMicros
304                | Schema::TimestampMillis | Schema::TimestampMicros | Schema::TimestampNanos
305                | Schema::LocalTimestampMillis | Schema::LocalTimestampMicros
306                | Schema::LocalTimestampNanos,
307            ) => Ok(Compatibility::Full),
308            // float promotes to double
309            (Schema::Float, Schema::Float | Schema::Double) => Ok(Compatibility::Full),
310            (Schema::Double, Schema::Double) => Ok(Compatibility::Full),
311            // This check needs to be above the other Decimal checks, so that if both schemas are
312            // Decimal then the precision and scale are checked. The other Decimal checks below will
313            // thus only hit if only one of the schemas is a Decimal
314            (
315                Schema::Decimal(DecimalSchema { precision: w_precision, scale: w_scale, .. }),
316                Schema::Decimal(DecimalSchema { precision: r_precision, scale: r_scale, .. }),
317            ) => {
318                // precision and scale must match
319                if r_precision == w_precision && r_scale == w_scale {
320                    Ok(Compatibility::Full)
321                } else {
322                    Err(CompatibilityError::DecimalMismatch {
323                        r_precision: *r_precision,
324                        r_scale: *r_scale,
325                        w_precision: *w_precision,
326                        w_scale: *w_scale
327                    })
328                }
329            }
330            // bytes and strings are interchangeable
331            (
332                Schema::Bytes | Schema::String | Schema::BigDecimal
333                | Schema::Uuid(UuidSchema::String | UuidSchema::Bytes)
334                | Schema::Decimal(DecimalSchema { inner: InnerDecimalSchema::Bytes, .. }),
335                Schema::Bytes | Schema::String | Schema::BigDecimal
336                | Schema::Uuid(UuidSchema::String | UuidSchema::Bytes)
337                | Schema::Decimal(DecimalSchema { inner: InnerDecimalSchema::Bytes, .. }),
338            ) => Ok(Compatibility::Full),
339            (Schema::Uuid(_), Schema::Uuid(_)) => Ok(Compatibility::Full),
340            (
341                Schema::Fixed(w_fixed) | Schema::Uuid(UuidSchema::Fixed(w_fixed))
342                | Schema::Decimal(DecimalSchema { inner: InnerDecimalSchema::Fixed(w_fixed), .. })
343                | Schema::Duration(w_fixed),
344                Schema::Fixed(r_fixed) | Schema::Uuid(UuidSchema::Fixed(r_fixed))
345                | Schema::Decimal(DecimalSchema { inner: InnerDecimalSchema::Fixed(r_fixed), .. })
346                | Schema::Duration(r_fixed),
347            ) => {
348                // Size must match
349                if r_fixed.size == w_fixed.size {
350                    Ok(Compatibility::Full)
351                } else {
352                    Err(CompatibilityError::FixedMismatch)
353                }
354            }
355            (
356                Schema::Array(ArraySchema { items: w_items, .. }),
357                Schema::Array(ArraySchema { items: r_items, .. }),
358            ) => {
359                // array schemas must match
360                self.full_match_schemas(w_items, r_items)
361            }
362            (
363                Schema::Map(MapSchema { types: w_types, .. }),
364                Schema::Map(MapSchema { types: r_types, .. }),
365            ) => {
366                // type schemas must match
367                self.full_match_schemas(w_types, r_types)
368            }
369            (
370                Schema::Enum(EnumSchema { symbols: w_symbols, .. }),
371                Schema::Enum(EnumSchema { symbols: r_symbols, default: r_default, .. }),
372            ) => {
373                // Reader must have a default or all symbols in the writer must also be in the reader
374                if r_default.is_some() {
375                    // No need to iter over all the fields if there is a default
376                    Ok(Compatibility::Full)
377                } else {
378                    let mut any = false;
379                    let mut all = true;
380                    w_symbols.iter().for_each(|s| {
381                        let res = r_symbols.contains(s);
382                        any |= res;
383                        all &= res;
384                    });
385                    if all {
386                        // All symbols match
387                        Ok(Compatibility::Full)
388                    } else if any {
389                        // Only some symbols match
390                        Ok(Compatibility::Partial)
391                    } else {
392                        // No symbols match
393                        Err(CompatibilityError::MissingSymbols)
394                    }
395                }
396            }
397            (
398                Schema::Record(RecordSchema { fields: w_fields, .. }),
399                Schema::Record(RecordSchema { fields: r_fields, .. }),
400            ) => {
401                let mut compatibility = Compatibility::Full;
402                for r_field in r_fields {
403                    // Can't use RecordField.lookup as aliases are also inserted into there and we
404                    // are not allowed to match on writer aliases.
405                    // Search using field name and *after* that aliases.
406                    if let Some(w_field) = once(&r_field.name)
407                        .chain(r_field.aliases.as_deref().unwrap_or(&[]).iter())
408                        .find_map(|ra| w_fields.iter().find(|wf| &wf.name == ra))
409                    {
410                        // Check that the schemas are compatible
411                        match self.full_match_schemas(&w_field.schema, &r_field.schema) {
412                            Ok(c) => compatibility &= c,
413                            Err(err) => {
414                                return Err(CompatibilityError::FieldTypeMismatch(
415                                    r_field.name.clone(),
416                                    Box::new(err),
417                                ));
418                            }
419                        }
420                    } else if r_field.default.is_none() {
421                        // No default and no matching field in the writer
422                        return Err(CompatibilityError::MissingDefaultValue(
423                            r_field.name.clone(),
424                        ));
425                    }
426                }
427                Ok(compatibility)
428            }
429            (_, _) => Err(CompatibilityError::WrongType {
430                writer_schema_type: format!("{writers_schema:#?}"),
431                reader_schema_type: format!("{readers_schema:#?}"),
432            }),
433        }
434    }
435
436    pub(crate) fn can_read(
437        &mut self,
438        writers_schema: &Schema,
439        readers_schema: &Schema,
440    ) -> Result<Compatibility, CompatibilityError> {
441        self.full_match_schemas(writers_schema, readers_schema)
442    }
443}
444
445#[cfg(test)]
446mod tests {
447    use std::collections::BTreeMap;
448
449    use super::*;
450    use crate::{
451        Codec, Decimal, Reader, Writer,
452        schema::{FixedSchema, Name, UuidSchema},
453        types::{Record, Value},
454    };
455    use apache_avro_test_helper::TestResult;
456    use rstest::*;
457
458    fn int_array_schema() -> Schema {
459        Schema::parse_str(r#"{"type":"array", "items":"int"}"#).unwrap()
460    }
461
462    fn long_array_schema() -> Schema {
463        Schema::parse_str(r#"{"type":"array", "items":"long"}"#).unwrap()
464    }
465
466    fn string_array_schema() -> Schema {
467        Schema::parse_str(r#"{"type":"array", "items":"string"}"#).unwrap()
468    }
469
470    fn int_map_schema() -> Schema {
471        Schema::parse_str(r#"{"type":"map", "values":"int"}"#).unwrap()
472    }
473
474    fn long_map_schema() -> Schema {
475        Schema::parse_str(r#"{"type":"map", "values":"long"}"#).unwrap()
476    }
477
478    fn string_map_schema() -> Schema {
479        Schema::parse_str(r#"{"type":"map", "values":"string"}"#).unwrap()
480    }
481
482    fn enum1_ab_schema() -> Schema {
483        Schema::parse_str(r#"{"type":"enum", "name":"Enum1", "symbols":["A","B"]}"#).unwrap()
484    }
485
486    fn enum1_abc_schema() -> Schema {
487        Schema::parse_str(r#"{"type":"enum", "name":"Enum1", "symbols":["A","B","C"]}"#).unwrap()
488    }
489
490    fn enum1_bc_schema() -> Schema {
491        Schema::parse_str(r#"{"type":"enum", "name":"Enum1", "symbols":["B","C"]}"#).unwrap()
492    }
493
494    fn enum2_ab_schema() -> Schema {
495        Schema::parse_str(r#"{"type":"enum", "name":"Enum2", "symbols":["A","B"]}"#).unwrap()
496    }
497
498    fn empty_record1_schema() -> Schema {
499        Schema::parse_str(r#"{"type":"record", "name":"Record1", "fields":[]}"#).unwrap()
500    }
501
502    fn empty_record2_schema() -> Schema {
503        Schema::parse_str(r#"{"type":"record", "name":"Record2", "fields": []}"#).unwrap()
504    }
505
506    fn a_int_record1_schema() -> Schema {
507        Schema::parse_str(
508            r#"{"type":"record", "name":"Record1", "fields":[{"name":"a", "type":"int"}]}"#,
509        )
510        .unwrap()
511    }
512
513    fn a_long_record1_schema() -> Schema {
514        Schema::parse_str(
515            r#"{"type":"record", "name":"Record1", "fields":[{"name":"a", "type":"long"}]}"#,
516        )
517        .unwrap()
518    }
519
520    fn a_int_b_int_record1_schema() -> Schema {
521        Schema::parse_str(r#"{"type":"record", "name":"Record1", "fields":[{"name":"a", "type":"int"}, {"name":"b", "type":"int"}]}"#).unwrap()
522    }
523
524    fn a_dint_record1_schema() -> Schema {
525        Schema::parse_str(r#"{"type":"record", "name":"Record1", "fields":[{"name":"a", "type":"int", "default":0}]}"#).unwrap()
526    }
527
528    fn a_int_b_dint_record1_schema() -> Schema {
529        Schema::parse_str(r#"{"type":"record", "name":"Record1", "fields":[{"name":"a", "type":"int"}, {"name":"b", "type":"int", "default":0}]}"#).unwrap()
530    }
531
532    fn a_dint_b_dint_record1_schema() -> Schema {
533        Schema::parse_str(r#"{"type":"record", "name":"Record1", "fields":[{"name":"a", "type":"int", "default":0}, {"name":"b", "type":"int", "default":0}]}"#).unwrap()
534    }
535
536    fn nested_record() -> Schema {
537        Schema::parse_str(r#"{"type":"record","name":"parent","fields":[{"name":"attribute","type":{"type":"record","name":"child","fields":[{"name":"id","type":"string"}]}}]}"#).unwrap()
538    }
539
540    fn nested_optional_record() -> Schema {
541        Schema::parse_str(r#"{"type":"record","name":"parent","fields":[{"name":"attribute","type":["null",{"type":"record","name":"child","fields":[{"name":"id","type":"string"}]}],"default":null}]}"#).unwrap()
542    }
543
544    fn int_list_record_schema() -> Schema {
545        Schema::parse_str(r#"{"type":"record", "name":"List", "fields": [{"name": "head", "type": "int"},{"name": "tail", "type": "array", "items": "int"}]}"#).unwrap()
546    }
547
548    fn long_list_record_schema() -> Schema {
549        Schema::parse_str(
550            r#"
551      {
552        "type":"record", "name":"List", "fields": [
553          {"name": "head", "type": "long"},
554          {"name": "tail", "type": "array", "items": "long"}
555      ]}
556"#,
557        )
558        .unwrap()
559    }
560
561    fn union_schema(schemas: Vec<Schema>) -> Schema {
562        let schema_string = schemas
563            .iter()
564            .map(|s| s.canonical_form())
565            .collect::<Vec<String>>()
566            .join(",");
567        Schema::parse_str(&format!("[{schema_string}]")).unwrap()
568    }
569
570    fn empty_union_schema() -> Schema {
571        union_schema(vec![])
572    }
573
574    // unused
575    // fn null_union_schema() -> Schema { union_schema(vec![Schema::Null]) }
576
577    fn int_union_schema() -> Schema {
578        union_schema(vec![Schema::Int])
579    }
580
581    fn long_union_schema() -> Schema {
582        union_schema(vec![Schema::Long])
583    }
584
585    fn string_union_schema() -> Schema {
586        union_schema(vec![Schema::String])
587    }
588
589    fn int_string_union_schema() -> Schema {
590        union_schema(vec![Schema::Int, Schema::String])
591    }
592
593    fn string_int_union_schema() -> Schema {
594        union_schema(vec![Schema::String, Schema::Int])
595    }
596
597    #[test]
598    fn test_broken() {
599        assert_eq!(
600            Compatibility::Partial,
601            SchemaCompatibility::can_read(&int_string_union_schema(), &int_union_schema()).unwrap(),
602            "Only compatible if writer writes an int"
603        )
604    }
605
606    #[test]
607    fn test_incompatible_reader_writer_pairs() {
608        let incompatible_schemas = vec![
609            // null
610            (Schema::Null, Schema::Int),
611            (Schema::Null, Schema::Long),
612            // boolean
613            (Schema::Boolean, Schema::Int),
614            // int
615            (Schema::Int, Schema::Null),
616            (Schema::Int, Schema::Boolean),
617            (Schema::Int, Schema::Long),
618            (Schema::Int, Schema::Float),
619            (Schema::Int, Schema::Double),
620            // long
621            (Schema::Long, Schema::Float),
622            (Schema::Long, Schema::Double),
623            // float
624            (Schema::Float, Schema::Double),
625            // string
626            (Schema::String, Schema::Boolean),
627            (Schema::String, Schema::Int),
628            // bytes
629            (Schema::Bytes, Schema::Null),
630            (Schema::Bytes, Schema::Int),
631            // logical types
632            (Schema::TimeMicros, Schema::Int),
633            (Schema::TimestampMillis, Schema::Int),
634            (Schema::TimestampMicros, Schema::Int),
635            (Schema::TimestampNanos, Schema::Int),
636            (Schema::LocalTimestampMillis, Schema::Int),
637            (Schema::LocalTimestampMicros, Schema::Int),
638            (Schema::LocalTimestampNanos, Schema::Int),
639            (Schema::Date, Schema::Long),
640            (Schema::TimeMillis, Schema::Long),
641            // array and maps
642            (int_array_schema(), long_array_schema()),
643            (int_map_schema(), int_array_schema()),
644            (int_array_schema(), int_map_schema()),
645            (int_map_schema(), long_map_schema()),
646            // enum
647            (enum1_ab_schema(), enum1_abc_schema()),
648            (enum1_bc_schema(), enum1_abc_schema()),
649            (enum1_ab_schema(), enum2_ab_schema()),
650            (Schema::Int, enum2_ab_schema()),
651            (enum2_ab_schema(), Schema::Int),
652            //union
653            (int_union_schema(), int_string_union_schema()),
654            (string_union_schema(), int_string_union_schema()),
655            //record
656            (empty_record2_schema(), empty_record1_schema()),
657            (a_int_record1_schema(), empty_record1_schema()),
658            (a_int_b_dint_record1_schema(), empty_record1_schema()),
659            (int_list_record_schema(), long_list_record_schema()),
660            (nested_record(), nested_optional_record()),
661        ];
662
663        assert!(
664            incompatible_schemas
665                .iter()
666                .any(|(reader, writer)| SchemaCompatibility::can_read(writer, reader).is_err())
667        );
668    }
669
670    #[rstest]
671    // Record type test
672    #[case(
673        r#"{"type": "record", "name": "record_a", "fields": [{"type": "long", "name": "date"}]}"#,
674        r#"{"type": "record", "name": "record_a", "fields": [{"type": "long", "name": "date", "default": 18181}]}"#
675    )]
676    // Fixed type test
677    #[case(
678        r#"{"type": "fixed", "name": "EmployeeId", "size": 16}"#,
679        r#"{"type": "fixed", "name": "EmployeeId", "size": 16, "default": "u00ffffffffffffx"}"#
680    )]
681    // Enum type test
682    #[case(
683        r#"{"type": "enum", "name":"Enum1", "symbols": ["A","B"]}"#,
684        r#"{"type": "enum", "name":"Enum1", "symbols": ["A","B", "C"], "default": "C"}"#
685    )]
686    // Map type test
687    #[case(
688        r#"{"type": "map", "values": "int"}"#,
689        r#"{"type": "map", "values": "long"}"#
690    )]
691    // Date type
692    #[case(r#"{"type": "int"}"#, r#"{"type": "int", "logicalType": "date"}"#)]
693    // time-millis type
694    #[case(
695        r#"{"type": "int"}"#,
696        r#"{"type": "int", "logicalType": "time-millis"}"#
697    )]
698    // time-micros type
699    #[case(
700        r#"{"type": "long"}"#,
701        r#"{"type": "long", "logicalType": "time-micros"}"#
702    )]
703    // timestamp-nanos type
704    #[case(
705        r#"{"type": "long"}"#,
706        r#"{"type": "long", "logicalType": "timestamp-nanos"}"#
707    )]
708    // timestamp-millis type
709    #[case(
710        r#"{"type": "long"}"#,
711        r#"{"type": "long", "logicalType": "timestamp-millis"}"#
712    )]
713    // timestamp-micros type
714    #[case(
715        r#"{"type": "long"}"#,
716        r#"{"type": "long", "logicalType": "timestamp-micros"}"#
717    )]
718    // local-timestamp-millis type
719    #[case(
720        r#"{"type": "long"}"#,
721        r#"{"type": "long", "logicalType": "local-timestamp-millis"}"#
722    )]
723    // local-timestamp-micros type
724    #[case(
725        r#"{"type": "long"}"#,
726        r#"{"type": "long", "logicalType": "local-timestamp-micros"}"#
727    )]
728    // local-timestamp-nanos type
729    #[case(
730        r#"{"type": "long"}"#,
731        r#"{"type": "long", "logicalType": "local-timestamp-nanos"}"#
732    )]
733    // Array type test
734    #[case(
735        r#"{"type": "array", "items": "int"}"#,
736        r#"{"type": "array", "items": "long"}"#
737    )]
738    fn test_avro_3950_match_schemas_ok(
739        #[case] writer_schema_str: &str,
740        #[case] reader_schema_str: &str,
741    ) {
742        let writer_schema = Schema::parse_str(writer_schema_str).unwrap();
743        let reader_schema = Schema::parse_str(reader_schema_str).unwrap();
744
745        assert!(SchemaCompatibility::can_read(&writer_schema, &reader_schema).is_ok());
746    }
747
748    #[rstest]
749    // Record type test
750    #[case(
751        r#"{"type": "record", "name":"record_a", "fields": [{"type": "long", "name": "date"}]}"#,
752        r#"{"type": "record", "name":"record_b", "fields": [{"type": "long", "name": "date"}]}"#,
753        CompatibilityError::NameMismatch{writer_name: String::from("record_a"), reader_name: String::from("record_b")}
754    )]
755    // Fixed type test
756    #[case(
757        r#"{"type": "fixed", "name": "EmployeeId", "size": 16}"#,
758        r#"{"type": "fixed", "name": "EmployeeId", "size": 20}"#,
759        CompatibilityError::FixedMismatch
760    )]
761    // Enum type test
762    #[case(
763        r#"{"type": "enum", "name": "Enum1", "symbols": ["A","B"]}"#,
764        r#"{"type": "enum", "name": "Enum2", "symbols": ["A","B"]}"#,
765        CompatibilityError::NameMismatch{writer_name: String::from("Enum1"), reader_name: String::from("Enum2")}
766    )]
767    // Map type test
768    #[case(
769        r#"{"type":"map", "values": "long"}"#,
770        r#"{"type":"map", "values": "int"}"#,
771        CompatibilityError::WrongType { writer_schema_type: "Long".to_string(), reader_schema_type: "Int".to_string() }
772    )]
773    // Array type test
774    #[case(
775        r#"{"type": "array", "items": "long"}"#,
776        r#"{"type": "array", "items": "int"}"#,
777        CompatibilityError::WrongType { writer_schema_type: "Long".to_string(), reader_schema_type: "Int".to_string() }
778    )]
779    // Date type test
780    #[case(
781        r#"{"type": "string"}"#,
782        r#"{"type": "int", "logicalType": "date"}"#,
783        CompatibilityError::WrongType { writer_schema_type: "String".to_string(), reader_schema_type: "Date".to_string() }
784    )]
785    // time-millis type
786    #[case(
787        r#"{"type": "string"}"#,
788        r#"{"type": "int", "logicalType": "time-millis"}"#,
789        CompatibilityError::WrongType { writer_schema_type: "String".to_string(), reader_schema_type: "TimeMillis".to_string() }
790    )]
791    // time-millis type
792    #[case(
793        r#"{"type": "string"}"#,
794        r#"{"type": "long", "logicalType": "time-micros"}"#,
795        CompatibilityError::WrongType { writer_schema_type: "String".to_string(), reader_schema_type: "TimeMicros".to_string() }
796    )]
797    // timestamp-nanos type
798    #[case(
799        r#"{"type": "string"}"#,
800        r#"{"type": "long", "logicalType": "timestamp-nanos"}"#,
801        CompatibilityError::WrongType { writer_schema_type: "String".to_string(), reader_schema_type: "TimestampNanos".to_string() }
802    )]
803    // timestamp-millis type
804    #[case(
805        r#"{"type": "string"}"#,
806        r#"{"type": "long", "logicalType": "timestamp-millis"}"#,
807        CompatibilityError::WrongType { writer_schema_type: "String".to_string(), reader_schema_type: "TimestampMillis".to_string() }
808    )]
809    // timestamp-micros type
810    #[case(
811        r#"{"type": "string"}"#,
812        r#"{"type": "long", "logicalType": "timestamp-micros"}"#,
813        CompatibilityError::WrongType { writer_schema_type: "String".to_string(), reader_schema_type: "TimestampMicros".to_string() }
814    )]
815    // local-timestamp-millis type
816    #[case(
817        r#"{"type": "string"}"#,
818        r#"{"type": "long", "logicalType": "local-timestamp-millis"}"#,
819        CompatibilityError::WrongType { writer_schema_type: "String".to_string(), reader_schema_type: "LocalTimestampMillis".to_string() }
820    )]
821    // local-timestamp-micros type
822    #[case(
823        r#"{"type": "string"}"#,
824        r#"{"type": "long", "logicalType": "local-timestamp-micros"}"#,
825        CompatibilityError::WrongType { writer_schema_type: "String".to_string(), reader_schema_type: "LocalTimestampMicros".to_string() }
826    )]
827    // local-timestamp-nanos type
828    #[case(
829        r#"{"type": "string"}"#,
830        r#"{"type": "long", "logicalType": "local-timestamp-nanos"}"#,
831        CompatibilityError::WrongType { writer_schema_type: "String".to_string(), reader_schema_type: "LocalTimestampNanos".to_string() }
832    )]
833    // Names are checked first, so this should not be a WrongType
834    #[case(
835        r#"{"type": "record", "name":"record_b", "fields": [{"type": "long", "name": "date"}]}"#,
836        r#"{"type": "fixed", "name": "EmployeeId", "size": 16}"#,
837        CompatibilityError::NameMismatch { writer_name: "record_b".to_string(), reader_name: "EmployeeId".to_string() }
838    )]
839    fn test_avro_3950_match_schemas_error(
840        #[case] writer_schema_str: &str,
841        #[case] reader_schema_str: &str,
842        #[case] expected_error: CompatibilityError,
843    ) {
844        let writer_schema = Schema::parse_str(writer_schema_str).unwrap();
845        let reader_schema = Schema::parse_str(reader_schema_str).unwrap();
846
847        assert_eq!(
848            expected_error,
849            SchemaCompatibility::can_read(&writer_schema, &reader_schema).unwrap_err()
850        )
851    }
852
853    #[test]
854    fn test_compatible_reader_writer_pairs() {
855        let uuid_fixed = FixedSchema {
856            name: Name::new("uuid_fixed").unwrap(),
857            aliases: None,
858            doc: None,
859            size: 16,
860            attributes: Default::default(),
861        };
862
863        let compatible_schemas = vec![
864            (Schema::Null, Schema::Null),
865            (Schema::Long, Schema::Int),
866            (Schema::Float, Schema::Int),
867            (Schema::Float, Schema::Long),
868            (Schema::Double, Schema::Long),
869            (Schema::Double, Schema::Int),
870            (Schema::Double, Schema::Float),
871            (Schema::String, Schema::Bytes),
872            (Schema::Bytes, Schema::String),
873            // logical types
874            (
875                Schema::Uuid(UuidSchema::String),
876                Schema::Uuid(UuidSchema::String),
877            ),
878            (Schema::Uuid(UuidSchema::String), Schema::String),
879            (Schema::String, Schema::Uuid(UuidSchema::String)),
880            (
881                Schema::Uuid(UuidSchema::Bytes),
882                Schema::Uuid(UuidSchema::Bytes),
883            ),
884            (Schema::Uuid(UuidSchema::Bytes), Schema::Bytes),
885            (Schema::Bytes, Schema::Uuid(UuidSchema::Bytes)),
886            (
887                Schema::Uuid(UuidSchema::Fixed(uuid_fixed.clone())),
888                Schema::Uuid(UuidSchema::Fixed(uuid_fixed.clone())),
889            ),
890            (
891                Schema::Uuid(UuidSchema::Fixed(uuid_fixed.clone())),
892                Schema::Fixed(uuid_fixed.clone()),
893            ),
894            (
895                Schema::Fixed(uuid_fixed.clone()),
896                Schema::Uuid(UuidSchema::Fixed(uuid_fixed.clone())),
897            ),
898            (Schema::Date, Schema::Int),
899            (Schema::TimeMillis, Schema::Int),
900            (Schema::TimeMicros, Schema::Long),
901            (Schema::TimestampMillis, Schema::Long),
902            (Schema::TimestampMicros, Schema::Long),
903            (Schema::TimestampNanos, Schema::Long),
904            (Schema::LocalTimestampMillis, Schema::Long),
905            (Schema::LocalTimestampMicros, Schema::Long),
906            (Schema::LocalTimestampNanos, Schema::Long),
907            (Schema::Int, Schema::Date),
908            (Schema::Int, Schema::TimeMillis),
909            (Schema::Long, Schema::TimeMicros),
910            (Schema::Long, Schema::TimestampMillis),
911            (Schema::Long, Schema::TimestampMicros),
912            (Schema::Long, Schema::TimestampNanos),
913            (Schema::Long, Schema::LocalTimestampMillis),
914            (Schema::Long, Schema::LocalTimestampMicros),
915            (Schema::Long, Schema::LocalTimestampNanos),
916            (int_array_schema(), int_array_schema()),
917            (long_array_schema(), int_array_schema()),
918            (int_map_schema(), int_map_schema()),
919            (long_map_schema(), int_map_schema()),
920            (enum1_ab_schema(), enum1_ab_schema()),
921            (enum1_abc_schema(), enum1_ab_schema()),
922            (empty_union_schema(), empty_union_schema()),
923            (int_union_schema(), int_union_schema()),
924            (int_string_union_schema(), string_int_union_schema()),
925            (int_union_schema(), empty_union_schema()),
926            (long_union_schema(), int_union_schema()),
927            (int_union_schema(), Schema::Int),
928            (Schema::Int, int_union_schema()),
929            (empty_record1_schema(), empty_record1_schema()),
930            (empty_record1_schema(), a_int_record1_schema()),
931            (a_int_record1_schema(), a_int_record1_schema()),
932            (a_dint_record1_schema(), a_int_record1_schema()),
933            (a_dint_record1_schema(), a_dint_record1_schema()),
934            (a_int_record1_schema(), a_dint_record1_schema()),
935            (a_long_record1_schema(), a_int_record1_schema()),
936            (a_int_record1_schema(), a_int_b_int_record1_schema()),
937            (a_dint_record1_schema(), a_int_b_int_record1_schema()),
938            (a_int_b_dint_record1_schema(), a_int_record1_schema()),
939            (a_dint_b_dint_record1_schema(), empty_record1_schema()),
940            (a_dint_b_dint_record1_schema(), a_int_record1_schema()),
941            (a_int_b_int_record1_schema(), a_dint_b_dint_record1_schema()),
942            (int_list_record_schema(), int_list_record_schema()),
943            (long_list_record_schema(), long_list_record_schema()),
944            (long_list_record_schema(), int_list_record_schema()),
945            (nested_optional_record(), nested_record()),
946        ];
947
948        for (reader, writer) in compatible_schemas {
949            SchemaCompatibility::can_read(&writer, &reader).unwrap();
950        }
951    }
952
953    fn writer_schema() -> Schema {
954        Schema::parse_str(
955            r#"
956      {"type":"record", "name":"Record", "fields":[
957        {"name":"oldfield1", "type":"int"},
958        {"name":"oldfield2", "type":"string"}
959      ]}
960"#,
961        )
962        .unwrap()
963    }
964
965    #[test]
966    fn test_missing_field() -> TestResult {
967        let reader_schema = Schema::parse_str(
968            r#"
969      {"type":"record", "name":"Record", "fields":[
970        {"name":"oldfield1", "type":"int"}
971      ]}
972"#,
973        )?;
974        assert!(SchemaCompatibility::can_read(&writer_schema(), &reader_schema,).is_ok());
975        assert_eq!(
976            CompatibilityError::MissingDefaultValue(String::from("oldfield2")),
977            SchemaCompatibility::can_read(&reader_schema, &writer_schema()).unwrap_err()
978        );
979
980        Ok(())
981    }
982
983    #[test]
984    fn test_missing_second_field() -> TestResult {
985        let reader_schema = Schema::parse_str(
986            r#"
987        {"type":"record", "name":"Record", "fields":[
988          {"name":"oldfield2", "type":"string"}
989        ]}
990"#,
991        )?;
992        assert!(SchemaCompatibility::can_read(&writer_schema(), &reader_schema).is_ok());
993        assert_eq!(
994            CompatibilityError::MissingDefaultValue(String::from("oldfield1")),
995            SchemaCompatibility::can_read(&reader_schema, &writer_schema()).unwrap_err()
996        );
997
998        Ok(())
999    }
1000
1001    #[test]
1002    fn test_all_fields() -> TestResult {
1003        let reader_schema = Schema::parse_str(
1004            r#"
1005        {"type":"record", "name":"Record", "fields":[
1006          {"name":"oldfield1", "type":"int"},
1007          {"name":"oldfield2", "type":"string"}
1008        ]}
1009"#,
1010        )?;
1011        assert!(SchemaCompatibility::can_read(&writer_schema(), &reader_schema).is_ok());
1012        assert!(SchemaCompatibility::can_read(&reader_schema, &writer_schema()).is_ok());
1013
1014        Ok(())
1015    }
1016
1017    #[test]
1018    fn test_new_field_with_default() -> TestResult {
1019        let reader_schema = Schema::parse_str(
1020            r#"
1021        {"type":"record", "name":"Record", "fields":[
1022          {"name":"oldfield1", "type":"int"},
1023          {"name":"newfield1", "type":"int", "default":42}
1024        ]}
1025"#,
1026        )?;
1027        assert!(SchemaCompatibility::can_read(&writer_schema(), &reader_schema).is_ok());
1028        assert_eq!(
1029            CompatibilityError::MissingDefaultValue(String::from("oldfield2")),
1030            SchemaCompatibility::can_read(&reader_schema, &writer_schema()).unwrap_err()
1031        );
1032
1033        Ok(())
1034    }
1035
1036    #[test]
1037    fn test_new_field() -> TestResult {
1038        let reader_schema = Schema::parse_str(
1039            r#"
1040        {"type":"record", "name":"Record", "fields":[
1041          {"name":"oldfield1", "type":"int"},
1042          {"name":"newfield1", "type":"int"}
1043        ]}
1044"#,
1045        )?;
1046        assert_eq!(
1047            CompatibilityError::MissingDefaultValue(String::from("newfield1")),
1048            SchemaCompatibility::can_read(&writer_schema(), &reader_schema).unwrap_err()
1049        );
1050        assert_eq!(
1051            CompatibilityError::MissingDefaultValue(String::from("oldfield2")),
1052            SchemaCompatibility::can_read(&reader_schema, &writer_schema()).unwrap_err()
1053        );
1054
1055        Ok(())
1056    }
1057
1058    #[test]
1059    fn test_array_writer_schema() {
1060        let valid_reader = string_array_schema();
1061        let invalid_reader = string_map_schema();
1062
1063        assert_eq!(
1064            Compatibility::Full,
1065            SchemaCompatibility::can_read(&string_array_schema(), &valid_reader).unwrap()
1066        );
1067        assert!(matches!(
1068            SchemaCompatibility::can_read(&string_array_schema(), &invalid_reader),
1069            Err(CompatibilityError::WrongType { .. }),
1070        ));
1071    }
1072
1073    #[test]
1074    fn test_primitive_writer_schema() {
1075        let valid_reader = Schema::String;
1076        assert!(SchemaCompatibility::can_read(&Schema::String, &valid_reader).is_ok());
1077        assert_eq!(
1078            CompatibilityError::WrongType {
1079                writer_schema_type: "Int".to_string(),
1080                reader_schema_type: "String".to_string()
1081            },
1082            SchemaCompatibility::can_read(&Schema::Int, &Schema::String).unwrap_err()
1083        );
1084    }
1085
1086    #[test]
1087    fn test_union_reader_writer_subset_incompatibility() {
1088        // reader union schema must contain all writer union branches
1089        let union_writer = union_schema(vec![Schema::Int, Schema::String]);
1090        let union_reader = union_schema(vec![Schema::String]);
1091
1092        assert_eq!(
1093            Compatibility::Partial,
1094            SchemaCompatibility::can_read(&union_writer, &union_reader).unwrap()
1095        );
1096        assert_eq!(
1097            Compatibility::Full,
1098            SchemaCompatibility::can_read(&union_reader, &union_writer).unwrap()
1099        );
1100    }
1101
1102    #[test]
1103    fn test_incompatible_record_field() -> TestResult {
1104        let string_schema = Schema::parse_str(
1105            r#"
1106        {"type":"record", "name":"MyRecord", "namespace":"ns", "fields": [
1107            {"name":"field1", "type":"string"}
1108        ]}
1109        "#,
1110        )?;
1111
1112        let int_schema = Schema::parse_str(
1113            r#"
1114              {"type":"record", "name":"MyRecord", "namespace":"ns", "fields": [
1115                {"name":"field1", "type":"int"}
1116              ]}
1117        "#,
1118        )?;
1119
1120        assert_eq!(
1121            CompatibilityError::FieldTypeMismatch(
1122                "field1".to_owned(),
1123                Box::new(CompatibilityError::WrongType {
1124                    writer_schema_type: "String".to_string(),
1125                    reader_schema_type: "Int".to_string()
1126                })
1127            ),
1128            SchemaCompatibility::can_read(&string_schema, &int_schema).unwrap_err()
1129        );
1130
1131        Ok(())
1132    }
1133
1134    #[test]
1135    fn test_enum_symbols() -> TestResult {
1136        let enum_schema1 = Schema::parse_str(
1137            r#"
1138      {"type":"enum", "name":"MyEnum", "symbols":["A","B"]}
1139"#,
1140        )?;
1141        let enum_schema2 =
1142            Schema::parse_str(r#"{"type":"enum", "name":"MyEnum", "symbols":["A","B","C"]}"#)?;
1143        assert_eq!(
1144            Compatibility::Partial,
1145            SchemaCompatibility::can_read(&enum_schema2, &enum_schema1)?
1146        );
1147        assert_eq!(
1148            Compatibility::Full,
1149            SchemaCompatibility::can_read(&enum_schema1, &enum_schema2)?
1150        );
1151
1152        Ok(())
1153    }
1154
1155    fn point_2d_schema() -> Schema {
1156        Schema::parse_str(
1157            r#"
1158      {"type":"record", "name":"Point2D", "fields":[
1159        {"name":"x", "type":"double"},
1160        {"name":"y", "type":"double"}
1161      ]}
1162    "#,
1163        )
1164        .unwrap()
1165    }
1166
1167    fn point_2d_fullname_schema() -> Schema {
1168        Schema::parse_str(
1169            r#"
1170      {"type":"record", "name":"Point", "namespace":"written", "fields":[
1171        {"name":"x", "type":"double"},
1172        {"name":"y", "type":"double"}
1173      ]}
1174    "#,
1175        )
1176        .unwrap()
1177    }
1178
1179    fn point_3d_no_default_schema() -> Schema {
1180        Schema::parse_str(
1181            r#"
1182      {"type":"record", "name":"Point", "fields":[
1183        {"name":"x", "type":"double"},
1184        {"name":"y", "type":"double"},
1185        {"name":"z", "type":"double"}
1186      ]}
1187    "#,
1188        )
1189        .unwrap()
1190    }
1191
1192    fn point_3d_schema() -> Schema {
1193        Schema::parse_str(
1194            r#"
1195      {"type":"record", "name":"Point3D", "fields":[
1196        {"name":"x", "type":"double"},
1197        {"name":"y", "type":"double"},
1198        {"name":"z", "type":"double", "default": 0.0}
1199      ]}
1200    "#,
1201        )
1202        .unwrap()
1203    }
1204
1205    fn point_3d_match_name_schema() -> Schema {
1206        Schema::parse_str(
1207            r#"
1208      {"type":"record", "name":"Point", "fields":[
1209        {"name":"x", "type":"double"},
1210        {"name":"y", "type":"double"},
1211        {"name":"z", "type":"double", "default": 0.0}
1212      ]}
1213    "#,
1214        )
1215        .unwrap()
1216    }
1217
1218    #[test]
1219    fn test_union_resolution_no_structure_match() {
1220        // short name match, but no structure match
1221        let read_schema = union_schema(vec![Schema::Null, point_3d_no_default_schema()]);
1222        assert_eq!(
1223            CompatibilityError::SchemaMismatchAllUnionElements,
1224            SchemaCompatibility::can_read(&point_2d_fullname_schema(), &read_schema).unwrap_err()
1225        );
1226    }
1227
1228    #[test]
1229    fn test_union_resolution_first_structure_match_2d() {
1230        // multiple structure matches with no name matches
1231        let read_schema = union_schema(vec![
1232            Schema::Null,
1233            point_3d_no_default_schema(),
1234            point_2d_schema(),
1235            point_3d_schema(),
1236        ]);
1237        assert_eq!(
1238            CompatibilityError::SchemaMismatchAllUnionElements,
1239            SchemaCompatibility::can_read(&point_2d_fullname_schema(), &read_schema).unwrap_err()
1240        );
1241    }
1242
1243    #[test]
1244    fn test_union_resolution_first_structure_match_3d() {
1245        // multiple structure matches with no name matches
1246        let read_schema = union_schema(vec![
1247            Schema::Null,
1248            point_3d_no_default_schema(),
1249            point_3d_schema(),
1250            point_2d_schema(),
1251        ]);
1252        assert_eq!(
1253            CompatibilityError::SchemaMismatchAllUnionElements,
1254            SchemaCompatibility::can_read(&point_2d_fullname_schema(), &read_schema).unwrap_err()
1255        );
1256    }
1257
1258    #[test]
1259    fn test_union_resolution_named_structure_match() {
1260        // multiple structure matches with a short name match
1261        let read_schema = union_schema(vec![
1262            Schema::Null,
1263            point_2d_schema(),
1264            point_3d_match_name_schema(),
1265            point_3d_schema(),
1266        ]);
1267        assert_eq!(
1268            CompatibilityError::SchemaMismatchAllUnionElements,
1269            SchemaCompatibility::can_read(&point_2d_fullname_schema(), &read_schema).unwrap_err()
1270        );
1271    }
1272
1273    #[test]
1274    fn test_union_resolution_full_name_match() {
1275        // there is a full name match that should be chosen
1276        let read_schema = union_schema(vec![
1277            Schema::Null,
1278            point_2d_schema(),
1279            point_3d_match_name_schema(),
1280            point_3d_schema(),
1281            point_2d_fullname_schema(),
1282        ]);
1283        assert!(SchemaCompatibility::can_read(&point_2d_fullname_schema(), &read_schema).is_ok());
1284    }
1285
1286    #[test]
1287    fn test_avro_3772_enum_default() -> TestResult {
1288        let writer_raw_schema = r#"
1289        {
1290          "type": "record",
1291          "name": "test",
1292          "fields": [
1293            {"name": "a", "type": "long", "default": 42},
1294            {"name": "b", "type": "string"},
1295            {
1296              "name": "c",
1297              "type": {
1298                "type": "enum",
1299                "name": "suit",
1300                "symbols": ["diamonds", "spades", "clubs", "hearts"],
1301                "default": "spades"
1302              }
1303            }
1304          ]
1305        }
1306        "#;
1307
1308        let reader_raw_schema = r#"
1309        {
1310          "type": "record",
1311          "name": "test",
1312          "fields": [
1313            {"name": "a", "type": "long", "default": 42},
1314            {"name": "b", "type": "string"},
1315            {
1316              "name": "c",
1317              "type": {
1318                 "type": "enum",
1319                 "name": "suit",
1320                 "symbols": ["diamonds", "spades", "ninja", "hearts"],
1321                 "default": "spades"
1322              }
1323            }
1324          ]
1325        }
1326      "#;
1327        let writer_schema = Schema::parse_str(writer_raw_schema)?;
1328        let reader_schema = Schema::parse_str(reader_raw_schema)?;
1329        let mut writer = Writer::with_codec(&writer_schema, Vec::new(), Codec::Null)?;
1330        let mut record = Record::new(writer.schema()).unwrap();
1331        record.put("a", 27i64);
1332        record.put("b", "foo");
1333        record.put("c", "clubs");
1334        writer.append_value(record).unwrap();
1335        let input = writer.into_inner()?;
1336        let mut reader = Reader::builder(&input[..])
1337            .reader_schema(&reader_schema)
1338            .build()?;
1339        assert_eq!(
1340            reader.next().unwrap().unwrap(),
1341            Value::Record(vec![
1342                ("a".to_string(), Value::Long(27)),
1343                ("b".to_string(), Value::String("foo".to_string())),
1344                ("c".to_string(), Value::Enum(1, "spades".to_string())),
1345            ])
1346        );
1347        assert!(reader.next().is_none());
1348
1349        Ok(())
1350    }
1351
1352    #[test]
1353    fn test_avro_3772_enum_default_less_symbols() -> TestResult {
1354        let writer_raw_schema = r#"
1355        {
1356          "type": "record",
1357          "name": "test",
1358          "fields": [
1359            {"name": "a", "type": "long", "default": 42},
1360            {"name": "b", "type": "string"},
1361            {
1362              "name": "c",
1363              "type": {
1364                "type": "enum",
1365                "name": "suit",
1366                "symbols": ["diamonds", "spades", "clubs", "hearts"],
1367                "default": "spades"
1368              }
1369            }
1370          ]
1371        }
1372        "#;
1373
1374        let reader_raw_schema = r#"
1375        {
1376          "type": "record",
1377          "name": "test",
1378          "fields": [
1379            {"name": "a", "type": "long", "default": 42},
1380            {"name": "b", "type": "string"},
1381            {
1382              "name": "c",
1383              "type": {
1384                 "type": "enum",
1385                  "name": "suit",
1386                  "symbols": ["hearts", "spades"],
1387                  "default": "spades"
1388              }
1389            }
1390          ]
1391        }
1392      "#;
1393        let writer_schema = Schema::parse_str(writer_raw_schema)?;
1394        let reader_schema = Schema::parse_str(reader_raw_schema)?;
1395        let mut writer = Writer::with_codec(&writer_schema, Vec::new(), Codec::Null)?;
1396        let mut record = Record::new(writer.schema()).unwrap();
1397        record.put("a", 27i64);
1398        record.put("b", "foo");
1399        record.put("c", "hearts");
1400        writer.append_value(record).unwrap();
1401        let input = writer.into_inner()?;
1402        let mut reader = Reader::builder(&input[..])
1403            .reader_schema(&reader_schema)
1404            .build()?;
1405        assert_eq!(
1406            reader.next().unwrap().unwrap(),
1407            Value::Record(vec![
1408                ("a".to_string(), Value::Long(27)),
1409                ("b".to_string(), Value::String("foo".to_string())),
1410                ("c".to_string(), Value::Enum(0, "hearts".to_string())),
1411            ])
1412        );
1413        assert!(reader.next().is_none());
1414
1415        Ok(())
1416    }
1417
1418    #[test]
1419    fn avro_3894_take_aliases_into_account_when_serializing_for_schema_compatibility() -> TestResult
1420    {
1421        let schema_v1 = Schema::parse_str(
1422            r#"
1423        {
1424            "type": "record",
1425            "name": "Conference",
1426            "namespace": "advdaba",
1427            "fields": [
1428                {"type": "string", "name": "name"},
1429                {"type": "long", "name": "date"}
1430            ]
1431        }"#,
1432        )?;
1433
1434        let schema_v2 = Schema::parse_str(
1435            r#"
1436        {
1437            "type": "record",
1438            "name": "Conference",
1439            "namespace": "advdaba",
1440            "fields": [
1441                {"type": "string", "name": "name"},
1442                {"type": "long", "name": "date", "aliases" : [ "time" ]}
1443            ]
1444        }"#,
1445        )?;
1446
1447        assert!(SchemaCompatibility::mutual_read(&schema_v1, &schema_v2).is_ok());
1448
1449        Ok(())
1450    }
1451
1452    #[test]
1453    fn avro_3917_take_aliases_into_account_for_schema_compatibility() -> TestResult {
1454        let schema_v1 = Schema::parse_str(
1455            r#"
1456        {
1457            "type": "record",
1458            "name": "Conference",
1459            "namespace": "advdaba",
1460            "fields": [
1461                {"type": "string", "name": "name"},
1462                {"type": "long", "name": "date", "aliases" : [ "time" ]}
1463            ]
1464        }"#,
1465        )?;
1466
1467        let schema_v2 = Schema::parse_str(
1468            r#"
1469        {
1470            "type": "record",
1471            "name": "Conference",
1472            "namespace": "advdaba",
1473            "fields": [
1474                {"type": "string", "name": "name"},
1475                {"type": "long", "name": "time"}
1476            ]
1477        }"#,
1478        )?;
1479
1480        assert_eq!(
1481            Compatibility::Full,
1482            SchemaCompatibility::can_read(&schema_v2, &schema_v1)?
1483        );
1484        assert_eq!(
1485            CompatibilityError::MissingDefaultValue(String::from("time")),
1486            SchemaCompatibility::can_read(&schema_v1, &schema_v2).unwrap_err()
1487        );
1488
1489        Ok(())
1490    }
1491
1492    #[test]
1493    fn test_avro_3898_record_schemas_match_by_unqualified_name() -> TestResult {
1494        let schemas = [
1495            // Record schemas
1496            (
1497                Schema::parse_str(
1498                    r#"{
1499              "type": "record",
1500              "name": "Statistics",
1501              "fields": [
1502                { "name": "success", "type": "int" },
1503                { "name": "fail", "type": "int" },
1504                { "name": "time", "type": "string" },
1505                { "name": "max", "type": "int", "default": 0 }
1506              ]
1507            }"#,
1508                )?,
1509                Schema::parse_str(
1510                    r#"{
1511              "type": "record",
1512              "name": "Statistics",
1513              "namespace": "my.namespace",
1514              "fields": [
1515                { "name": "success", "type": "int" },
1516                { "name": "fail", "type": "int" },
1517                { "name": "time", "type": "string" },
1518                { "name": "average", "type": "int", "default": 0}
1519              ]
1520            }"#,
1521                )?,
1522            ),
1523            // Enum schemas
1524            (
1525                Schema::parse_str(
1526                    r#"{
1527                    "type": "enum",
1528                    "name": "Suit",
1529                    "symbols": ["diamonds", "spades", "clubs"]
1530                }"#,
1531                )?,
1532                Schema::parse_str(
1533                    r#"{
1534                    "type": "enum",
1535                    "name": "Suit",
1536                    "namespace": "my.namespace",
1537                    "symbols": ["diamonds", "spades", "clubs", "hearts"]
1538                }"#,
1539                )?,
1540            ),
1541            // Fixed schemas
1542            (
1543                Schema::parse_str(
1544                    r#"{
1545                    "type": "fixed",
1546                    "name": "EmployeeId",
1547                    "size": 16
1548                }"#,
1549                )?,
1550                Schema::parse_str(
1551                    r#"{
1552                    "type": "fixed",
1553                    "name": "EmployeeId",
1554                    "namespace": "my.namespace",
1555                    "size": 16
1556                }"#,
1557                )?,
1558            ),
1559        ];
1560
1561        for (schema_1, schema_2) in schemas {
1562            assert!(SchemaCompatibility::can_read(&schema_1, &schema_2).is_ok());
1563        }
1564
1565        Ok(())
1566    }
1567
1568    #[test]
1569    fn test_can_read_compatibility_errors() -> TestResult {
1570        let schemas = [
1571            (
1572                Schema::parse_str(
1573                    r#"{
1574                    "type": "record",
1575                    "name": "StatisticsMap",
1576                    "fields": [
1577                        {"name": "average", "type": "int", "default": 0},
1578                        {"name": "success", "type": {"type": "map", "values": "int"}}
1579                    ]
1580                }"#,
1581                )?,
1582                Schema::parse_str(
1583                    r#"{
1584                    "type": "record",
1585                    "name": "StatisticsMap",
1586                    "fields": [
1587                        {"name": "average", "type": "int", "default": 0},
1588                        {"name": "success", "type": ["null", {"type": "map", "values": "int"}], "default": null}
1589                    ]
1590                }"#,
1591                )?,
1592            ),
1593            (
1594                Schema::parse_str(
1595                    r#"{
1596                        "type": "record",
1597                        "name": "StatisticsArray",
1598                        "fields": [
1599                            {"name": "max_values", "type": {"type": "array", "items": "int"}}
1600                        ]
1601                    }"#,
1602                )?,
1603                Schema::parse_str(
1604                    r#"{
1605                        "type": "record",
1606                        "name": "StatisticsArray",
1607                        "fields": [
1608                            {"name": "max_values", "type": ["null", {"type": "array", "items": "int"}], "default": null}
1609                        ]
1610                    }"#,
1611                )?,
1612            ),
1613        ];
1614
1615        for (schema_1, schema_2) in schemas {
1616            assert_eq!(
1617                Compatibility::Full,
1618                SchemaCompatibility::can_read(&schema_1, &schema_2).unwrap()
1619            );
1620            assert_eq!(
1621                Compatibility::Partial,
1622                SchemaCompatibility::can_read(&schema_2, &schema_1).unwrap()
1623            );
1624        }
1625
1626        Ok(())
1627    }
1628
1629    #[test]
1630    fn avro_3974_can_read_schema_references() -> TestResult {
1631        let schema_strs = vec![
1632            r#"{
1633          "type": "record",
1634          "name": "Child",
1635          "namespace": "avro",
1636          "fields": [
1637            {
1638              "name": "val",
1639              "type": "int"
1640            }
1641          ]
1642        }
1643        "#,
1644            r#"{
1645          "type": "record",
1646          "name": "Parent",
1647          "namespace": "avro",
1648          "fields": [
1649            {
1650              "name": "child",
1651              "type": "avro.Child"
1652            }
1653          ]
1654        }
1655        "#,
1656        ];
1657
1658        let schemas = Schema::parse_list(schema_strs).unwrap();
1659        SchemaCompatibility::can_read(&schemas[1], &schemas[1])?;
1660
1661        Ok(())
1662    }
1663
1664    #[test]
1665    fn duration_and_fixed_of_different_size() -> TestResult {
1666        let schema_strs = vec![
1667            r#"{
1668          "type": "fixed",
1669          "name": "Fixed25",
1670          "size": 25
1671        }
1672        "#,
1673            r#"{
1674          "type": "fixed",
1675          "logicalType": "duration",
1676          "name": "Duration",
1677          "size": 12
1678        }
1679        "#,
1680        ];
1681
1682        let schemas = Schema::parse_list(schema_strs).unwrap();
1683        assert!(SchemaCompatibility::can_read(&schemas[0], &schemas[1]).is_err());
1684        assert!(SchemaCompatibility::can_read(&schemas[1], &schemas[0]).is_err());
1685        SchemaCompatibility::can_read(&schemas[1], &schemas[1])?;
1686        SchemaCompatibility::can_read(&schemas[0], &schemas[0])?;
1687
1688        Ok(())
1689    }
1690
1691    #[test]
1692    fn avro_rs_342_decimal_fixed_and_bytes() -> TestResult {
1693        let bytes = Schema::Decimal(DecimalSchema {
1694            precision: 20,
1695            scale: 0,
1696            inner: InnerDecimalSchema::Bytes,
1697        });
1698        let fixed = Schema::Decimal(DecimalSchema {
1699            precision: 20,
1700            scale: 0,
1701            inner: InnerDecimalSchema::Fixed(FixedSchema {
1702                name: Name::new("DecimalFixed")?,
1703                aliases: None,
1704                doc: None,
1705                size: 20,
1706                attributes: BTreeMap::default(),
1707            }),
1708        });
1709
1710        assert_eq!(
1711            Compatibility::Full,
1712            SchemaCompatibility::mutual_read(&bytes, &fixed)?
1713        );
1714
1715        let value = Value::Decimal(Decimal::from(vec![1; 10]));
1716        let fixed_value = value.clone().resolve(&fixed)?;
1717        let bytes_value = value.resolve(&bytes)?;
1718
1719        assert_eq!(fixed_value, bytes_value);
1720
1721        Ok(())
1722    }
1723}