apache_avro/
schema_compatibility.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Logic for checking schema compatibility
19use crate::{
20    error::CompatibilityError,
21    schema::{EnumSchema, FixedSchema, RecordSchema, Schema, SchemaKind},
22};
23use std::{
24    collections::{hash_map::DefaultHasher, HashSet},
25    hash::Hasher,
26    ptr,
27};
28
29fn match_ref_schemas(
30    writers_schema: &Schema,
31    readers_schema: &Schema,
32) -> Result<(), CompatibilityError> {
33    match (readers_schema, writers_schema) {
34        (Schema::Ref { name: r_name }, Schema::Ref { name: w_name }) => {
35            if r_name == w_name {
36                Ok(())
37            } else {
38                Err(CompatibilityError::NameMismatch {
39                    writer_name: w_name.fullname(None),
40                    reader_name: r_name.fullname(None),
41                })
42            }
43        }
44        _ => Err(CompatibilityError::WrongType {
45            writer_schema_type: format!("{:#?}", writers_schema),
46            reader_schema_type: format!("{:#?}", readers_schema),
47        }),
48    }
49}
50
51pub struct SchemaCompatibility;
52
53struct Checker {
54    recursion: HashSet<(u64, u64)>,
55}
56
57impl Checker {
58    /// Create a new checker, with recursion set to an empty set.
59    pub(crate) fn new() -> Self {
60        Self {
61            recursion: HashSet::new(),
62        }
63    }
64
65    pub(crate) fn can_read(
66        &mut self,
67        writers_schema: &Schema,
68        readers_schema: &Schema,
69    ) -> Result<(), CompatibilityError> {
70        self.full_match_schemas(writers_schema, readers_schema)
71    }
72
73    pub(crate) fn full_match_schemas(
74        &mut self,
75        writers_schema: &Schema,
76        readers_schema: &Schema,
77    ) -> Result<(), CompatibilityError> {
78        if self.recursion_in_progress(writers_schema, readers_schema) {
79            return Ok(());
80        }
81
82        SchemaCompatibility::match_schemas(writers_schema, readers_schema)?;
83
84        let w_type = SchemaKind::from(writers_schema);
85        let r_type = SchemaKind::from(readers_schema);
86
87        if w_type != SchemaKind::Union
88            && (r_type.is_primitive()
89                || r_type == SchemaKind::Fixed
90                || r_type == SchemaKind::Uuid
91                || r_type == SchemaKind::Date
92                || r_type == SchemaKind::TimeMillis
93                || r_type == SchemaKind::TimeMicros
94                || r_type == SchemaKind::TimestampMillis
95                || r_type == SchemaKind::TimestampMicros
96                || r_type == SchemaKind::TimestampNanos
97                || r_type == SchemaKind::LocalTimestampMillis
98                || r_type == SchemaKind::LocalTimestampMicros
99                || r_type == SchemaKind::LocalTimestampNanos)
100        {
101            return Ok(());
102        }
103
104        match r_type {
105            SchemaKind::Ref => match_ref_schemas(writers_schema, readers_schema),
106            SchemaKind::Record => self.match_record_schemas(writers_schema, readers_schema),
107            SchemaKind::Map => {
108                if let Schema::Map(w_m) = writers_schema {
109                    match readers_schema {
110                        Schema::Map(r_m) => self.full_match_schemas(&w_m.types, &r_m.types),
111                        _ => Err(CompatibilityError::WrongType {
112                            writer_schema_type: format!("{:#?}", writers_schema),
113                            reader_schema_type: format!("{:#?}", readers_schema),
114                        }),
115                    }
116                } else {
117                    Err(CompatibilityError::TypeExpected {
118                        schema_type: String::from("writers_schema"),
119                        expected_type: vec![SchemaKind::Record],
120                    })
121                }
122            }
123            SchemaKind::Array => {
124                if let Schema::Array(w_a) = writers_schema {
125                    match readers_schema {
126                        Schema::Array(r_a) => self.full_match_schemas(&w_a.items, &r_a.items),
127                        _ => Err(CompatibilityError::WrongType {
128                            writer_schema_type: format!("{:#?}", writers_schema),
129                            reader_schema_type: format!("{:#?}", readers_schema),
130                        }),
131                    }
132                } else {
133                    Err(CompatibilityError::TypeExpected {
134                        schema_type: String::from("writers_schema"),
135                        expected_type: vec![SchemaKind::Array],
136                    })
137                }
138            }
139            SchemaKind::Union => self.match_union_schemas(writers_schema, readers_schema),
140            SchemaKind::Enum => {
141                // reader's symbols must contain all writer's symbols
142                if let Schema::Enum(EnumSchema {
143                    symbols: w_symbols, ..
144                }) = writers_schema
145                {
146                    if let Schema::Enum(EnumSchema {
147                        symbols: r_symbols, ..
148                    }) = readers_schema
149                    {
150                        if w_symbols.iter().all(|e| r_symbols.contains(e)) {
151                            return Ok(());
152                        }
153                    }
154                }
155                Err(CompatibilityError::MissingSymbols)
156            }
157            _ => {
158                if w_type == SchemaKind::Union {
159                    if let Schema::Union(r) = writers_schema {
160                        if r.schemas.len() == 1 {
161                            return self.full_match_schemas(&r.schemas[0], readers_schema);
162                        }
163                    }
164                }
165                Err(CompatibilityError::Inconclusive(String::from(
166                    "writers_schema",
167                )))
168            }
169        }
170    }
171
172    fn match_record_schemas(
173        &mut self,
174        writers_schema: &Schema,
175        readers_schema: &Schema,
176    ) -> Result<(), CompatibilityError> {
177        let w_type = SchemaKind::from(writers_schema);
178
179        if w_type == SchemaKind::Union {
180            return Err(CompatibilityError::TypeExpected {
181                schema_type: String::from("writers_schema"),
182                expected_type: vec![SchemaKind::Record],
183            });
184        }
185
186        if let Schema::Record(RecordSchema {
187            fields: w_fields,
188            lookup: w_lookup,
189            ..
190        }) = writers_schema
191        {
192            if let Schema::Record(RecordSchema {
193                fields: r_fields, ..
194            }) = readers_schema
195            {
196                for field in r_fields.iter() {
197                    // get all field names in a vector (field.name + aliases)
198                    let mut fields_names = vec![&field.name];
199                    if let Some(ref aliases) = field.aliases {
200                        for alias in aliases {
201                            fields_names.push(alias);
202                        }
203                    }
204
205                    // Check whether any of the possible fields names are in the writer schema.
206                    // If the field was found, then it must have the exact same name with the writer field,
207                    // otherwise we would have a false positive with the writers aliases
208                    let position = fields_names.iter().find_map(|field_name| {
209                        if let Some(pos) = w_lookup.get(*field_name) {
210                            if &w_fields[*pos].name == *field_name {
211                                return Some(pos);
212                            }
213                        }
214                        None
215                    });
216
217                    match position {
218                        Some(pos) => {
219                            if let Err(err) =
220                                self.full_match_schemas(&w_fields[*pos].schema, &field.schema)
221                            {
222                                return Err(CompatibilityError::FieldTypeMismatch(
223                                    field.name.clone(),
224                                    Box::new(err),
225                                ));
226                            }
227                        }
228                        _ => {
229                            if field.default.is_none() {
230                                return Err(CompatibilityError::MissingDefaultValue(
231                                    field.name.clone(),
232                                ));
233                            }
234                        }
235                    }
236                }
237            }
238        }
239        Ok(())
240    }
241
242    fn match_union_schemas(
243        &mut self,
244        writers_schema: &Schema,
245        readers_schema: &Schema,
246    ) -> Result<(), CompatibilityError> {
247        if let Schema::Union(u) = writers_schema {
248            if u.schemas
249                .iter()
250                .all(|schema| self.full_match_schemas(schema, readers_schema).is_ok())
251            {
252                return Ok(());
253            } else {
254                return Err(CompatibilityError::MissingUnionElements);
255            }
256        } else if let Schema::Union(u) = readers_schema {
257            // This check is needed because the writer_schema can be not union
258            // but the type can be contain in the union of the reader schema
259            // e.g. writer_schema is string and reader_schema is [string, int]
260            if u.schemas
261                .iter()
262                .any(|schema| self.full_match_schemas(writers_schema, schema).is_ok())
263            {
264                return Ok(());
265            }
266        }
267        Err(CompatibilityError::MissingUnionElements)
268    }
269
270    fn recursion_in_progress(&mut self, writers_schema: &Schema, readers_schema: &Schema) -> bool {
271        let mut hasher = DefaultHasher::new();
272        ptr::hash(writers_schema, &mut hasher);
273        let w_hash = hasher.finish();
274
275        hasher = DefaultHasher::new();
276        ptr::hash(readers_schema, &mut hasher);
277        let r_hash = hasher.finish();
278
279        let key = (w_hash, r_hash);
280        // This is a shortcut to add if not exists *and* return false. It will return true
281        // if it was able to insert.
282        !self.recursion.insert(key)
283    }
284}
285
286impl SchemaCompatibility {
287    /// `can_read` performs a full, recursive check that a datum written using the
288    /// writers_schema can be read using the readers_schema.
289    pub fn can_read(
290        writers_schema: &Schema,
291        readers_schema: &Schema,
292    ) -> Result<(), CompatibilityError> {
293        let mut c = Checker::new();
294        c.can_read(writers_schema, readers_schema)
295    }
296
297    /// `mutual_read` performs a full, recursive check that a datum written using either
298    /// the writers_schema or the readers_schema can be read using the other schema.
299    pub fn mutual_read(
300        writers_schema: &Schema,
301        readers_schema: &Schema,
302    ) -> Result<(), CompatibilityError> {
303        SchemaCompatibility::can_read(writers_schema, readers_schema)?;
304        SchemaCompatibility::can_read(readers_schema, writers_schema)
305    }
306
307    ///  `match_schemas` performs a basic check that a datum written with the
308    ///  writers_schema could be read using the readers_schema. This check only includes
309    ///  matching the types, including schema promotion, and matching the full name for
310    ///  named types. Aliases for named types are not supported here, and the rust
311    ///  implementation of Avro in general does not include support for aliases (I think).
312    pub(crate) fn match_schemas(
313        writers_schema: &Schema,
314        readers_schema: &Schema,
315    ) -> Result<(), CompatibilityError> {
316        fn check_reader_type_multi(
317            reader_type: SchemaKind,
318            allowed_reader_types: Vec<SchemaKind>,
319            writer_type: SchemaKind,
320        ) -> Result<(), CompatibilityError> {
321            if allowed_reader_types.iter().any(|&t| t == reader_type) {
322                Ok(())
323            } else {
324                let mut allowed_types: Vec<SchemaKind> = vec![writer_type];
325                allowed_types.extend_from_slice(allowed_reader_types.as_slice());
326                Err(CompatibilityError::TypeExpected {
327                    schema_type: String::from("readers_schema"),
328                    expected_type: allowed_types,
329                })
330            }
331        }
332
333        fn check_reader_type(
334            reader_type: SchemaKind,
335            allowed_reader_type: SchemaKind,
336            writer_type: SchemaKind,
337        ) -> Result<(), CompatibilityError> {
338            if reader_type == allowed_reader_type {
339                Ok(())
340            } else {
341                Err(CompatibilityError::TypeExpected {
342                    schema_type: String::from("readers_schema"),
343                    expected_type: vec![writer_type, allowed_reader_type],
344                })
345            }
346        }
347
348        fn check_writer_type(
349            writers_schema: &Schema,
350            allowed_schema: &Schema,
351            expected_schema_types: Vec<SchemaKind>,
352        ) -> Result<(), CompatibilityError> {
353            if *allowed_schema == *writers_schema {
354                Ok(())
355            } else {
356                Err(CompatibilityError::TypeExpected {
357                    schema_type: String::from("writers_schema"),
358                    expected_type: expected_schema_types,
359                })
360            }
361        }
362
363        let w_type = SchemaKind::from(writers_schema);
364        let r_type = SchemaKind::from(readers_schema);
365
366        if w_type == SchemaKind::Union || r_type == SchemaKind::Union {
367            return Ok(());
368        }
369
370        if w_type == r_type {
371            if r_type.is_primitive() {
372                return Ok(());
373            }
374
375            match r_type {
376                SchemaKind::Record | SchemaKind::Enum => {
377                    let msg = format!("A {} type must always has a name", readers_schema);
378                    let writers_name = writers_schema.name().expect(&msg);
379                    let readers_name = readers_schema.name().expect(&msg);
380
381                    if writers_name.name == readers_name.name {
382                        return Ok(());
383                    }
384
385                    return Err(CompatibilityError::NameMismatch {
386                        writer_name: writers_name.name.clone(),
387                        reader_name: readers_name.name.clone(),
388                    });
389                }
390                SchemaKind::Fixed => {
391                    if let Schema::Fixed(FixedSchema {
392                        name: w_name,
393                        aliases: _,
394                        doc: _w_doc,
395                        size: w_size,
396                        default: _w_default,
397                        attributes: _,
398                    }) = writers_schema
399                    {
400                        if let Schema::Fixed(FixedSchema {
401                            name: r_name,
402                            aliases: _,
403                            doc: _r_doc,
404                            size: r_size,
405                            default: _r_default,
406                            attributes: _,
407                        }) = readers_schema
408                        {
409                            return (w_name.name == r_name.name && w_size == r_size)
410                                .then_some(())
411                                .ok_or(CompatibilityError::FixedMismatch);
412                        }
413                    }
414                }
415                SchemaKind::Map => {
416                    if let Schema::Map(w_m) = writers_schema {
417                        if let Schema::Map(r_m) = readers_schema {
418                            return SchemaCompatibility::match_schemas(&w_m.types, &r_m.types);
419                        }
420                    }
421                }
422                SchemaKind::Array => {
423                    if let Schema::Array(w_a) = writers_schema {
424                        if let Schema::Array(r_a) = readers_schema {
425                            return SchemaCompatibility::match_schemas(&w_a.items, &r_a.items);
426                        }
427                    }
428                }
429                SchemaKind::Uuid => {
430                    return check_writer_type(
431                        writers_schema,
432                        readers_schema,
433                        vec![r_type, SchemaKind::String],
434                    );
435                }
436                SchemaKind::Date | SchemaKind::TimeMillis => {
437                    return check_writer_type(
438                        writers_schema,
439                        readers_schema,
440                        vec![r_type, SchemaKind::Int],
441                    );
442                }
443                SchemaKind::TimeMicros
444                | SchemaKind::TimestampNanos
445                | SchemaKind::TimestampMillis
446                | SchemaKind::TimestampMicros
447                | SchemaKind::LocalTimestampMillis
448                | SchemaKind::LocalTimestampMicros
449                | SchemaKind::LocalTimestampNanos => {
450                    return check_writer_type(
451                        writers_schema,
452                        readers_schema,
453                        vec![r_type, SchemaKind::Long],
454                    );
455                }
456                SchemaKind::Duration => {
457                    return Ok(());
458                }
459                SchemaKind::Ref => return match_ref_schemas(writers_schema, readers_schema),
460                _ => {
461                    return Err(CompatibilityError::Inconclusive(String::from(
462                        "readers_schema",
463                    )))
464                }
465            };
466        }
467
468        // Here are the checks for primitive types
469        match w_type {
470            SchemaKind::Int => check_reader_type_multi(
471                r_type,
472                vec![
473                    SchemaKind::Long,
474                    SchemaKind::Float,
475                    SchemaKind::Double,
476                    SchemaKind::Date,
477                    SchemaKind::TimeMillis,
478                ],
479                w_type,
480            ),
481            SchemaKind::Long => check_reader_type_multi(
482                r_type,
483                vec![
484                    SchemaKind::Float,
485                    SchemaKind::Double,
486                    SchemaKind::TimeMicros,
487                    SchemaKind::TimestampMillis,
488                    SchemaKind::TimestampMicros,
489                    SchemaKind::TimestampNanos,
490                    SchemaKind::LocalTimestampMillis,
491                    SchemaKind::LocalTimestampMicros,
492                    SchemaKind::LocalTimestampNanos,
493                ],
494                w_type,
495            ),
496            SchemaKind::Float => {
497                check_reader_type_multi(r_type, vec![SchemaKind::Float, SchemaKind::Double], w_type)
498            }
499            SchemaKind::String => {
500                check_reader_type_multi(r_type, vec![SchemaKind::Bytes, SchemaKind::Uuid], w_type)
501            }
502            SchemaKind::Bytes => check_reader_type(r_type, SchemaKind::String, w_type),
503            SchemaKind::Uuid => check_reader_type(r_type, SchemaKind::String, w_type),
504            SchemaKind::Date | SchemaKind::TimeMillis => {
505                check_reader_type(r_type, SchemaKind::Int, w_type)
506            }
507            SchemaKind::TimeMicros
508            | SchemaKind::TimestampMicros
509            | SchemaKind::TimestampMillis
510            | SchemaKind::TimestampNanos
511            | SchemaKind::LocalTimestampMillis
512            | SchemaKind::LocalTimestampMicros
513            | SchemaKind::LocalTimestampNanos => {
514                check_reader_type(r_type, SchemaKind::Long, w_type)
515            }
516            _ => Err(CompatibilityError::Inconclusive(String::from(
517                "writers_schema",
518            ))),
519        }
520    }
521}
522
523#[cfg(test)]
524mod tests {
525    use super::*;
526    use crate::{
527        types::{Record, Value},
528        Codec, Reader, Writer,
529    };
530    use apache_avro_test_helper::TestResult;
531    use rstest::*;
532
533    fn int_array_schema() -> Schema {
534        Schema::parse_str(r#"{"type":"array", "items":"int"}"#).unwrap()
535    }
536
537    fn long_array_schema() -> Schema {
538        Schema::parse_str(r#"{"type":"array", "items":"long"}"#).unwrap()
539    }
540
541    fn string_array_schema() -> Schema {
542        Schema::parse_str(r#"{"type":"array", "items":"string"}"#).unwrap()
543    }
544
545    fn int_map_schema() -> Schema {
546        Schema::parse_str(r#"{"type":"map", "values":"int"}"#).unwrap()
547    }
548
549    fn long_map_schema() -> Schema {
550        Schema::parse_str(r#"{"type":"map", "values":"long"}"#).unwrap()
551    }
552
553    fn string_map_schema() -> Schema {
554        Schema::parse_str(r#"{"type":"map", "values":"string"}"#).unwrap()
555    }
556
557    fn enum1_ab_schema() -> Schema {
558        Schema::parse_str(r#"{"type":"enum", "name":"Enum1", "symbols":["A","B"]}"#).unwrap()
559    }
560
561    fn enum1_abc_schema() -> Schema {
562        Schema::parse_str(r#"{"type":"enum", "name":"Enum1", "symbols":["A","B","C"]}"#).unwrap()
563    }
564
565    fn enum1_bc_schema() -> Schema {
566        Schema::parse_str(r#"{"type":"enum", "name":"Enum1", "symbols":["B","C"]}"#).unwrap()
567    }
568
569    fn enum2_ab_schema() -> Schema {
570        Schema::parse_str(r#"{"type":"enum", "name":"Enum2", "symbols":["A","B"]}"#).unwrap()
571    }
572
573    fn empty_record1_schema() -> Schema {
574        Schema::parse_str(r#"{"type":"record", "name":"Record1", "fields":[]}"#).unwrap()
575    }
576
577    fn empty_record2_schema() -> Schema {
578        Schema::parse_str(r#"{"type":"record", "name":"Record2", "fields": []}"#).unwrap()
579    }
580
581    fn a_int_record1_schema() -> Schema {
582        Schema::parse_str(
583            r#"{"type":"record", "name":"Record1", "fields":[{"name":"a", "type":"int"}]}"#,
584        )
585        .unwrap()
586    }
587
588    fn a_long_record1_schema() -> Schema {
589        Schema::parse_str(
590            r#"{"type":"record", "name":"Record1", "fields":[{"name":"a", "type":"long"}]}"#,
591        )
592        .unwrap()
593    }
594
595    fn a_int_b_int_record1_schema() -> Schema {
596        Schema::parse_str(r#"{"type":"record", "name":"Record1", "fields":[{"name":"a", "type":"int"}, {"name":"b", "type":"int"}]}"#).unwrap()
597    }
598
599    fn a_dint_record1_schema() -> Schema {
600        Schema::parse_str(r#"{"type":"record", "name":"Record1", "fields":[{"name":"a", "type":"int", "default":0}]}"#).unwrap()
601    }
602
603    fn a_int_b_dint_record1_schema() -> Schema {
604        Schema::parse_str(r#"{"type":"record", "name":"Record1", "fields":[{"name":"a", "type":"int"}, {"name":"b", "type":"int", "default":0}]}"#).unwrap()
605    }
606
607    fn a_dint_b_dint_record1_schema() -> Schema {
608        Schema::parse_str(r#"{"type":"record", "name":"Record1", "fields":[{"name":"a", "type":"int", "default":0}, {"name":"b", "type":"int", "default":0}]}"#).unwrap()
609    }
610
611    fn nested_record() -> Schema {
612        Schema::parse_str(r#"{"type":"record","name":"parent","fields":[{"name":"attribute","type":{"type":"record","name":"child","fields":[{"name":"id","type":"string"}]}}]}"#).unwrap()
613    }
614
615    fn nested_optional_record() -> Schema {
616        Schema::parse_str(r#"{"type":"record","name":"parent","fields":[{"name":"attribute","type":["null",{"type":"record","name":"child","fields":[{"name":"id","type":"string"}]}],"default":null}]}"#).unwrap()
617    }
618
619    fn int_list_record_schema() -> Schema {
620        Schema::parse_str(r#"{"type":"record", "name":"List", "fields": [{"name": "head", "type": "int"},{"name": "tail", "type": "array", "items": "int"}]}"#).unwrap()
621    }
622
623    fn long_list_record_schema() -> Schema {
624        Schema::parse_str(
625            r#"
626      {
627        "type":"record", "name":"List", "fields": [
628          {"name": "head", "type": "long"},
629          {"name": "tail", "type": "array", "items": "long"}
630      ]}
631"#,
632        )
633        .unwrap()
634    }
635
636    fn union_schema(schemas: Vec<Schema>) -> Schema {
637        let schema_string = schemas
638            .iter()
639            .map(|s| s.canonical_form())
640            .collect::<Vec<String>>()
641            .join(",");
642        Schema::parse_str(&format!("[{schema_string}]")).unwrap()
643    }
644
645    fn empty_union_schema() -> Schema {
646        union_schema(vec![])
647    }
648
649    // unused
650    // fn null_union_schema() -> Schema { union_schema(vec![Schema::Null]) }
651
652    fn int_union_schema() -> Schema {
653        union_schema(vec![Schema::Int])
654    }
655
656    fn long_union_schema() -> Schema {
657        union_schema(vec![Schema::Long])
658    }
659
660    fn string_union_schema() -> Schema {
661        union_schema(vec![Schema::String])
662    }
663
664    fn int_string_union_schema() -> Schema {
665        union_schema(vec![Schema::Int, Schema::String])
666    }
667
668    fn string_int_union_schema() -> Schema {
669        union_schema(vec![Schema::String, Schema::Int])
670    }
671
672    #[test]
673    fn test_broken() {
674        assert_eq!(
675            CompatibilityError::MissingUnionElements,
676            SchemaCompatibility::can_read(&int_string_union_schema(), &int_union_schema())
677                .unwrap_err()
678        )
679    }
680
681    #[test]
682    fn test_incompatible_reader_writer_pairs() {
683        let incompatible_schemas = vec![
684            // null
685            (Schema::Null, Schema::Int),
686            (Schema::Null, Schema::Long),
687            // boolean
688            (Schema::Boolean, Schema::Int),
689            // int
690            (Schema::Int, Schema::Null),
691            (Schema::Int, Schema::Boolean),
692            (Schema::Int, Schema::Long),
693            (Schema::Int, Schema::Float),
694            (Schema::Int, Schema::Double),
695            // long
696            (Schema::Long, Schema::Float),
697            (Schema::Long, Schema::Double),
698            // float
699            (Schema::Float, Schema::Double),
700            // string
701            (Schema::String, Schema::Boolean),
702            (Schema::String, Schema::Int),
703            // bytes
704            (Schema::Bytes, Schema::Null),
705            (Schema::Bytes, Schema::Int),
706            // logical types
707            (Schema::TimeMicros, Schema::Int),
708            (Schema::TimestampMillis, Schema::Int),
709            (Schema::TimestampMicros, Schema::Int),
710            (Schema::TimestampNanos, Schema::Int),
711            (Schema::LocalTimestampMillis, Schema::Int),
712            (Schema::LocalTimestampMicros, Schema::Int),
713            (Schema::LocalTimestampNanos, Schema::Int),
714            (Schema::Date, Schema::Long),
715            (Schema::TimeMillis, Schema::Long),
716            // array and maps
717            (int_array_schema(), long_array_schema()),
718            (int_map_schema(), int_array_schema()),
719            (int_array_schema(), int_map_schema()),
720            (int_map_schema(), long_map_schema()),
721            // enum
722            (enum1_ab_schema(), enum1_abc_schema()),
723            (enum1_bc_schema(), enum1_abc_schema()),
724            (enum1_ab_schema(), enum2_ab_schema()),
725            (Schema::Int, enum2_ab_schema()),
726            (enum2_ab_schema(), Schema::Int),
727            //union
728            (int_union_schema(), int_string_union_schema()),
729            (string_union_schema(), int_string_union_schema()),
730            //record
731            (empty_record2_schema(), empty_record1_schema()),
732            (a_int_record1_schema(), empty_record1_schema()),
733            (a_int_b_dint_record1_schema(), empty_record1_schema()),
734            (int_list_record_schema(), long_list_record_schema()),
735            (nested_record(), nested_optional_record()),
736        ];
737
738        assert!(incompatible_schemas
739            .iter()
740            .any(|(reader, writer)| SchemaCompatibility::can_read(writer, reader).is_err()));
741    }
742
743    #[rstest]
744    // Record type test
745    #[case(
746        r#"{"type": "record", "name": "record_a", "fields": [{"type": "long", "name": "date"}]}"#,
747        r#"{"type": "record", "name": "record_a", "fields": [{"type": "long", "name": "date", "default": 18181}]}"#
748    )]
749    // Fixed type test
750    #[case(
751        r#"{"type": "fixed", "name": "EmployeeId", "size": 16}"#,
752        r#"{"type": "fixed", "name": "EmployeeId", "size": 16, "default": "u00ffffffffffffx"}"#
753    )]
754    // Enum type test
755    #[case(
756        r#"{"type": "enum", "name":"Enum1", "symbols": ["A","B"]}"#,
757        r#"{"type": "enum", "name":"Enum1", "symbols": ["A","B", "C"], "default": "C"}"#
758    )]
759    // Map type test
760    #[case(
761        r#"{"type": "map", "values": "int"}"#,
762        r#"{"type": "map", "values": "long"}"#
763    )]
764    // Date type
765    #[case(r#"{"type": "int"}"#, r#"{"type": "int", "logicalType": "date"}"#)]
766    // time-millis type
767    #[case(
768        r#"{"type": "int"}"#,
769        r#"{"type": "int", "logicalType": "time-millis"}"#
770    )]
771    // time-millis type
772    #[case(
773        r#"{"type": "long"}"#,
774        r#"{"type": "long", "logicalType": "time-micros"}"#
775    )]
776    // timetimestamp-nanos type
777    #[case(
778        r#"{"type": "long"}"#,
779        r#"{"type": "long", "logicalType": "timestamp-nanos"}"#
780    )]
781    // timestamp-millis type
782    #[case(
783        r#"{"type": "long"}"#,
784        r#"{"type": "long", "logicalType": "timestamp-millis"}"#
785    )]
786    // timestamp-micros type
787    #[case(
788        r#"{"type": "long"}"#,
789        r#"{"type": "long", "logicalType": "timestamp-micros"}"#
790    )]
791    // local-timestamp-millis type
792    #[case(
793        r#"{"type": "long"}"#,
794        r#"{"type": "long", "logicalType": "local-timestamp-millis"}"#
795    )]
796    // local-timestamp-micros type
797    #[case(
798        r#"{"type": "long"}"#,
799        r#"{"type": "long", "logicalType": "local-timestamp-micros"}"#
800    )]
801    // local-timestamp-nanos type
802    #[case(
803        r#"{"type": "long"}"#,
804        r#"{"type": "long", "logicalType": "local-timestamp-nanos"}"#
805    )]
806    // Array type test
807    #[case(
808        r#"{"type": "array", "items": "int"}"#,
809        r#"{"type": "array", "items": "long"}"#
810    )]
811    fn test_avro_3950_match_schemas_ok(
812        #[case] writer_schema_str: &str,
813        #[case] reader_schema_str: &str,
814    ) {
815        let writer_schema = Schema::parse_str(writer_schema_str).unwrap();
816        let reader_schema = Schema::parse_str(reader_schema_str).unwrap();
817
818        assert!(SchemaCompatibility::match_schemas(&writer_schema, &reader_schema).is_ok());
819    }
820
821    #[rstest]
822    // Record type test
823    #[case(
824        r#"{"type": "record", "name":"record_a", "fields": [{"type": "long", "name": "date"}]}"#,
825        r#"{"type": "record", "name":"record_b", "fields": [{"type": "long", "name": "date"}]}"#,
826        CompatibilityError::NameMismatch{writer_name: String::from("record_a"), reader_name: String::from("record_b")}
827    )]
828    // Fixed type test
829    #[case(
830        r#"{"type": "fixed", "name": "EmployeeId", "size": 16}"#,
831        r#"{"type": "fixed", "name": "EmployeeId", "size": 20}"#,
832        CompatibilityError::FixedMismatch
833    )]
834    // Enum type test
835    #[case(
836        r#"{"type": "enum", "name": "Enum1", "symbols": ["A","B"]}"#,
837        r#"{"type": "enum", "name": "Enum2", "symbols": ["A","B"]}"#,
838        CompatibilityError::NameMismatch{writer_name: String::from("Enum1"), reader_name: String::from("Enum2")}
839    )]
840    // Map type test
841    #[case(
842        r#"{"type":"map", "values": "long"}"#,
843        r#"{"type":"map", "values": "int"}"#,
844        CompatibilityError::TypeExpected {schema_type: String::from("readers_schema"), expected_type: vec![
845            SchemaKind::Long,
846            SchemaKind::Float,
847            SchemaKind::Double,
848            SchemaKind::TimeMicros,
849            SchemaKind::TimestampMillis,
850            SchemaKind::TimestampMicros,
851            SchemaKind::TimestampNanos,
852            SchemaKind::LocalTimestampMillis,
853            SchemaKind::LocalTimestampMicros,
854            SchemaKind::LocalTimestampNanos,
855        ]}
856    )]
857    // Array type test
858    #[case(
859        r#"{"type": "array", "items": "long"}"#,
860        r#"{"type": "array", "items": "int"}"#,
861        CompatibilityError::TypeExpected {schema_type: String::from("readers_schema"), expected_type: vec![
862            SchemaKind::Long,
863            SchemaKind::Float,
864            SchemaKind::Double,
865            SchemaKind::TimeMicros,
866            SchemaKind::TimestampMillis,
867            SchemaKind::TimestampMicros,
868            SchemaKind::TimestampNanos,
869            SchemaKind::LocalTimestampMillis,
870            SchemaKind::LocalTimestampMicros,
871            SchemaKind::LocalTimestampNanos,
872        ]}
873    )]
874    // Date type test
875    #[case(
876        r#"{"type": "string"}"#,
877        r#"{"type": "int", "logicalType": "date"}"#,
878        CompatibilityError::TypeExpected{schema_type: String::from("readers_schema"), expected_type: vec![
879            SchemaKind::String,
880            SchemaKind::Bytes,
881            SchemaKind::Uuid,
882        ]}
883    )]
884    // time-millis type
885    #[case(
886        r#"{"type": "string"}"#,
887        r#"{"type": "int", "logicalType": "time-millis"}"#,
888        CompatibilityError::TypeExpected{schema_type: String::from("readers_schema"), expected_type: vec![
889            SchemaKind::String,
890            SchemaKind::Bytes,
891            SchemaKind::Uuid,
892        ]}
893    )]
894    // time-millis type
895    #[case(
896        r#"{"type": "int"}"#,
897        r#"{"type": "long", "logicalType": "time-micros"}"#,
898        CompatibilityError::TypeExpected{schema_type: String::from("readers_schema"), expected_type: vec![
899            SchemaKind::Int,
900            SchemaKind::Long,
901            SchemaKind::Float,
902            SchemaKind::Double,
903            SchemaKind::Date,
904            SchemaKind::TimeMillis
905        ]}
906    )]
907    // timestamp-nanos type. This test should fail because it is not supported on schema parse_complex
908    // #[case(
909    //     r#"{"type": "string"}"#,
910    //     r#"{"type": "long", "logicalType": "timestamp-nanos"}"#,
911    //     CompatibilityError::TypeExpected{schema_type: String::from("readers_schema"), expected_type: vec![
912    //         SchemaKind::Int,
913    //         SchemaKind::Long,
914    //         SchemaKind::Float,
915    //         SchemaKind::Double,
916    //         SchemaKind::Date,
917    //         SchemaKind::TimeMillis
918    //     ]}
919    // )]
920    // timestamp-millis type
921    #[case(
922        r#"{"type": "int"}"#,
923        r#"{"type": "long", "logicalType": "timestamp-millis"}"#,
924        CompatibilityError::TypeExpected{schema_type: String::from("readers_schema"), expected_type: vec![
925            SchemaKind::Int,
926            SchemaKind::Long,
927            SchemaKind::Float,
928            SchemaKind::Double,
929            SchemaKind::Date,
930            SchemaKind::TimeMillis
931        ]}
932    )]
933    // timestamp-micros type
934    #[case(
935        r#"{"type": "int"}"#,
936        r#"{"type": "long", "logicalType": "timestamp-micros"}"#,
937        CompatibilityError::TypeExpected{schema_type: String::from("readers_schema"), expected_type: vec![
938            SchemaKind::Int,
939            SchemaKind::Long,
940            SchemaKind::Float,
941            SchemaKind::Double,
942            SchemaKind::Date,
943            SchemaKind::TimeMillis
944        ]}
945    )]
946    // local-timestamp-millis type
947    #[case(
948        r#"{"type": "int"}"#,
949        r#"{"type": "long", "logicalType": "local-timestamp-millis"}"#,
950        CompatibilityError::TypeExpected{schema_type: String::from("readers_schema"), expected_type: vec![
951            SchemaKind::Int,
952            SchemaKind::Long,
953            SchemaKind::Float,
954            SchemaKind::Double,
955            SchemaKind::Date,
956            SchemaKind::TimeMillis
957        ]}
958    )]
959    // local-timestamp-micros type
960    #[case(
961        r#"{"type": "int"}"#,
962        r#"{"type": "long", "logicalType": "local-timestamp-micros"}"#,
963        CompatibilityError::TypeExpected{schema_type: String::from("readers_schema"), expected_type: vec![
964            SchemaKind::Int,
965            SchemaKind::Long,
966            SchemaKind::Float,
967            SchemaKind::Double,
968            SchemaKind::Date,
969            SchemaKind::TimeMillis
970        ]}
971    )]
972    // local-timestamp-nanos type. This test should fail because it is not supported on schema parse_complex
973    // #[case(
974    //     r#"{"type": "int"}"#,
975    //     r#"{"type": "long", "logicalType": "local-timestamp-nanos"}"#,
976    //     CompatibilityError::TypeExpected{schema_type: String::from("readers_schema"), expected_type: vec![
977    //         SchemaKind::Int,
978    //         SchemaKind::Long,
979    //         SchemaKind::Float,
980    //         SchemaKind::Double,
981    //         SchemaKind::Date,
982    //         SchemaKind::TimeMillis
983    //     ]}
984    // )]
985    // When comparing different types we always get Inconclusive
986    #[case(
987        r#"{"type": "record", "name":"record_b", "fields": [{"type": "long", "name": "date"}]}"#,
988        r#"{"type": "fixed", "name": "EmployeeId", "size": 16}"#,
989        CompatibilityError::Inconclusive(String::from("writers_schema"))
990    )]
991    fn test_avro_3950_match_schemas_error(
992        #[case] writer_schema_str: &str,
993        #[case] reader_schema_str: &str,
994        #[case] expected_error: CompatibilityError,
995    ) {
996        let writer_schema = Schema::parse_str(writer_schema_str).unwrap();
997        let reader_schema = Schema::parse_str(reader_schema_str).unwrap();
998
999        assert_eq!(
1000            expected_error,
1001            SchemaCompatibility::match_schemas(&writer_schema, &reader_schema).unwrap_err()
1002        )
1003    }
1004
1005    #[test]
1006    fn test_compatible_reader_writer_pairs() {
1007        let compatible_schemas = vec![
1008            (Schema::Null, Schema::Null),
1009            (Schema::Long, Schema::Int),
1010            (Schema::Float, Schema::Int),
1011            (Schema::Float, Schema::Long),
1012            (Schema::Double, Schema::Long),
1013            (Schema::Double, Schema::Int),
1014            (Schema::Double, Schema::Float),
1015            (Schema::String, Schema::Bytes),
1016            (Schema::Bytes, Schema::String),
1017            // logical types
1018            (Schema::Uuid, Schema::Uuid),
1019            (Schema::Uuid, Schema::String),
1020            (Schema::Date, Schema::Int),
1021            (Schema::TimeMillis, Schema::Int),
1022            (Schema::TimeMicros, Schema::Long),
1023            (Schema::TimestampMillis, Schema::Long),
1024            (Schema::TimestampMicros, Schema::Long),
1025            (Schema::TimestampNanos, Schema::Long),
1026            (Schema::LocalTimestampMillis, Schema::Long),
1027            (Schema::LocalTimestampMicros, Schema::Long),
1028            (Schema::LocalTimestampNanos, Schema::Long),
1029            (Schema::String, Schema::Uuid),
1030            (Schema::Int, Schema::Date),
1031            (Schema::Int, Schema::TimeMillis),
1032            (Schema::Long, Schema::TimeMicros),
1033            (Schema::Long, Schema::TimestampMillis),
1034            (Schema::Long, Schema::TimestampMicros),
1035            (Schema::Long, Schema::TimestampNanos),
1036            (Schema::Long, Schema::LocalTimestampMillis),
1037            (Schema::Long, Schema::LocalTimestampMicros),
1038            (Schema::Long, Schema::LocalTimestampNanos),
1039            (int_array_schema(), int_array_schema()),
1040            (long_array_schema(), int_array_schema()),
1041            (int_map_schema(), int_map_schema()),
1042            (long_map_schema(), int_map_schema()),
1043            (enum1_ab_schema(), enum1_ab_schema()),
1044            (enum1_abc_schema(), enum1_ab_schema()),
1045            (empty_union_schema(), empty_union_schema()),
1046            (int_union_schema(), int_union_schema()),
1047            (int_string_union_schema(), string_int_union_schema()),
1048            (int_union_schema(), empty_union_schema()),
1049            (long_union_schema(), int_union_schema()),
1050            (int_union_schema(), Schema::Int),
1051            (Schema::Int, int_union_schema()),
1052            (empty_record1_schema(), empty_record1_schema()),
1053            (empty_record1_schema(), a_int_record1_schema()),
1054            (a_int_record1_schema(), a_int_record1_schema()),
1055            (a_dint_record1_schema(), a_int_record1_schema()),
1056            (a_dint_record1_schema(), a_dint_record1_schema()),
1057            (a_int_record1_schema(), a_dint_record1_schema()),
1058            (a_long_record1_schema(), a_int_record1_schema()),
1059            (a_int_record1_schema(), a_int_b_int_record1_schema()),
1060            (a_dint_record1_schema(), a_int_b_int_record1_schema()),
1061            (a_int_b_dint_record1_schema(), a_int_record1_schema()),
1062            (a_dint_b_dint_record1_schema(), empty_record1_schema()),
1063            (a_dint_b_dint_record1_schema(), a_int_record1_schema()),
1064            (a_int_b_int_record1_schema(), a_dint_b_dint_record1_schema()),
1065            (int_list_record_schema(), int_list_record_schema()),
1066            (long_list_record_schema(), long_list_record_schema()),
1067            (long_list_record_schema(), int_list_record_schema()),
1068            (nested_optional_record(), nested_record()),
1069        ];
1070
1071        assert!(compatible_schemas
1072            .iter()
1073            .all(|(reader, writer)| SchemaCompatibility::can_read(writer, reader).is_ok()));
1074    }
1075
1076    fn writer_schema() -> Schema {
1077        Schema::parse_str(
1078            r#"
1079      {"type":"record", "name":"Record", "fields":[
1080        {"name":"oldfield1", "type":"int"},
1081        {"name":"oldfield2", "type":"string"}
1082      ]}
1083"#,
1084        )
1085        .unwrap()
1086    }
1087
1088    #[test]
1089    fn test_missing_field() -> TestResult {
1090        let reader_schema = Schema::parse_str(
1091            r#"
1092      {"type":"record", "name":"Record", "fields":[
1093        {"name":"oldfield1", "type":"int"}
1094      ]}
1095"#,
1096        )?;
1097        assert!(SchemaCompatibility::can_read(&writer_schema(), &reader_schema,).is_ok());
1098        assert_eq!(
1099            CompatibilityError::MissingDefaultValue(String::from("oldfield2")),
1100            SchemaCompatibility::can_read(&reader_schema, &writer_schema()).unwrap_err()
1101        );
1102
1103        Ok(())
1104    }
1105
1106    #[test]
1107    fn test_missing_second_field() -> TestResult {
1108        let reader_schema = Schema::parse_str(
1109            r#"
1110        {"type":"record", "name":"Record", "fields":[
1111          {"name":"oldfield2", "type":"string"}
1112        ]}
1113"#,
1114        )?;
1115        assert!(SchemaCompatibility::can_read(&writer_schema(), &reader_schema).is_ok());
1116        assert_eq!(
1117            CompatibilityError::MissingDefaultValue(String::from("oldfield1")),
1118            SchemaCompatibility::can_read(&reader_schema, &writer_schema()).unwrap_err()
1119        );
1120
1121        Ok(())
1122    }
1123
1124    #[test]
1125    fn test_all_fields() -> TestResult {
1126        let reader_schema = Schema::parse_str(
1127            r#"
1128        {"type":"record", "name":"Record", "fields":[
1129          {"name":"oldfield1", "type":"int"},
1130          {"name":"oldfield2", "type":"string"}
1131        ]}
1132"#,
1133        )?;
1134        assert!(SchemaCompatibility::can_read(&writer_schema(), &reader_schema).is_ok());
1135        assert!(SchemaCompatibility::can_read(&reader_schema, &writer_schema()).is_ok());
1136
1137        Ok(())
1138    }
1139
1140    #[test]
1141    fn test_new_field_with_default() -> TestResult {
1142        let reader_schema = Schema::parse_str(
1143            r#"
1144        {"type":"record", "name":"Record", "fields":[
1145          {"name":"oldfield1", "type":"int"},
1146          {"name":"newfield1", "type":"int", "default":42}
1147        ]}
1148"#,
1149        )?;
1150        assert!(SchemaCompatibility::can_read(&writer_schema(), &reader_schema).is_ok());
1151        assert_eq!(
1152            CompatibilityError::MissingDefaultValue(String::from("oldfield2")),
1153            SchemaCompatibility::can_read(&reader_schema, &writer_schema()).unwrap_err()
1154        );
1155
1156        Ok(())
1157    }
1158
1159    #[test]
1160    fn test_new_field() -> TestResult {
1161        let reader_schema = Schema::parse_str(
1162            r#"
1163        {"type":"record", "name":"Record", "fields":[
1164          {"name":"oldfield1", "type":"int"},
1165          {"name":"newfield1", "type":"int"}
1166        ]}
1167"#,
1168        )?;
1169        assert_eq!(
1170            CompatibilityError::MissingDefaultValue(String::from("newfield1")),
1171            SchemaCompatibility::can_read(&writer_schema(), &reader_schema).unwrap_err()
1172        );
1173        assert_eq!(
1174            CompatibilityError::MissingDefaultValue(String::from("oldfield2")),
1175            SchemaCompatibility::can_read(&reader_schema, &writer_schema()).unwrap_err()
1176        );
1177
1178        Ok(())
1179    }
1180
1181    #[test]
1182    fn test_array_writer_schema() {
1183        let valid_reader = string_array_schema();
1184        let invalid_reader = string_map_schema();
1185
1186        assert!(SchemaCompatibility::can_read(&string_array_schema(), &valid_reader).is_ok());
1187        assert_eq!(
1188            CompatibilityError::Inconclusive(String::from("writers_schema")),
1189            SchemaCompatibility::can_read(&string_array_schema(), &invalid_reader).unwrap_err()
1190        );
1191    }
1192
1193    #[test]
1194    fn test_primitive_writer_schema() {
1195        let valid_reader = Schema::String;
1196        assert!(SchemaCompatibility::can_read(&Schema::String, &valid_reader).is_ok());
1197        assert_eq!(
1198            CompatibilityError::TypeExpected {
1199                schema_type: String::from("readers_schema"),
1200                expected_type: vec![
1201                    SchemaKind::Int,
1202                    SchemaKind::Long,
1203                    SchemaKind::Float,
1204                    SchemaKind::Double,
1205                    SchemaKind::Date,
1206                    SchemaKind::TimeMillis
1207                ],
1208            },
1209            SchemaCompatibility::can_read(&Schema::Int, &Schema::String).unwrap_err()
1210        );
1211    }
1212
1213    #[test]
1214    fn test_union_reader_writer_subset_incompatibility() {
1215        // reader union schema must contain all writer union branches
1216        let union_writer = union_schema(vec![Schema::Int, Schema::String]);
1217        let union_reader = union_schema(vec![Schema::String]);
1218
1219        assert_eq!(
1220            CompatibilityError::MissingUnionElements,
1221            SchemaCompatibility::can_read(&union_writer, &union_reader).unwrap_err()
1222        );
1223        assert!(SchemaCompatibility::can_read(&union_reader, &union_writer).is_ok());
1224    }
1225
1226    #[test]
1227    fn test_incompatible_record_field() -> TestResult {
1228        let string_schema = Schema::parse_str(
1229            r#"
1230        {"type":"record", "name":"MyRecord", "namespace":"ns", "fields": [
1231            {"name":"field1", "type":"string"}
1232        ]}
1233        "#,
1234        )?;
1235
1236        let int_schema = Schema::parse_str(
1237            r#"
1238              {"type":"record", "name":"MyRecord", "namespace":"ns", "fields": [
1239                {"name":"field1", "type":"int"}
1240              ]}
1241        "#,
1242        )?;
1243
1244        assert_eq!(
1245            CompatibilityError::FieldTypeMismatch(
1246                "field1".to_owned(),
1247                Box::new(CompatibilityError::TypeExpected {
1248                    schema_type: "readers_schema".to_owned(),
1249                    expected_type: vec![SchemaKind::String, SchemaKind::Bytes, SchemaKind::Uuid]
1250                })
1251            ),
1252            SchemaCompatibility::can_read(&string_schema, &int_schema).unwrap_err()
1253        );
1254
1255        Ok(())
1256    }
1257
1258    #[test]
1259    fn test_enum_symbols() -> TestResult {
1260        let enum_schema1 = Schema::parse_str(
1261            r#"
1262      {"type":"enum", "name":"MyEnum", "symbols":["A","B"]}
1263"#,
1264        )?;
1265        let enum_schema2 =
1266            Schema::parse_str(r#"{"type":"enum", "name":"MyEnum", "symbols":["A","B","C"]}"#)?;
1267        assert_eq!(
1268            CompatibilityError::MissingSymbols,
1269            SchemaCompatibility::can_read(&enum_schema2, &enum_schema1).unwrap_err()
1270        );
1271        assert!(SchemaCompatibility::can_read(&enum_schema1, &enum_schema2).is_ok());
1272
1273        Ok(())
1274    }
1275
1276    fn point_2d_schema() -> Schema {
1277        Schema::parse_str(
1278            r#"
1279      {"type":"record", "name":"Point2D", "fields":[
1280        {"name":"x", "type":"double"},
1281        {"name":"y", "type":"double"}
1282      ]}
1283    "#,
1284        )
1285        .unwrap()
1286    }
1287
1288    fn point_2d_fullname_schema() -> Schema {
1289        Schema::parse_str(
1290            r#"
1291      {"type":"record", "name":"Point", "namespace":"written", "fields":[
1292        {"name":"x", "type":"double"},
1293        {"name":"y", "type":"double"}
1294      ]}
1295    "#,
1296        )
1297        .unwrap()
1298    }
1299
1300    fn point_3d_no_default_schema() -> Schema {
1301        Schema::parse_str(
1302            r#"
1303      {"type":"record", "name":"Point", "fields":[
1304        {"name":"x", "type":"double"},
1305        {"name":"y", "type":"double"},
1306        {"name":"z", "type":"double"}
1307      ]}
1308    "#,
1309        )
1310        .unwrap()
1311    }
1312
1313    fn point_3d_schema() -> Schema {
1314        Schema::parse_str(
1315            r#"
1316      {"type":"record", "name":"Point3D", "fields":[
1317        {"name":"x", "type":"double"},
1318        {"name":"y", "type":"double"},
1319        {"name":"z", "type":"double", "default": 0.0}
1320      ]}
1321    "#,
1322        )
1323        .unwrap()
1324    }
1325
1326    fn point_3d_match_name_schema() -> Schema {
1327        Schema::parse_str(
1328            r#"
1329      {"type":"record", "name":"Point", "fields":[
1330        {"name":"x", "type":"double"},
1331        {"name":"y", "type":"double"},
1332        {"name":"z", "type":"double", "default": 0.0}
1333      ]}
1334    "#,
1335        )
1336        .unwrap()
1337    }
1338
1339    #[test]
1340    fn test_union_resolution_no_structure_match() {
1341        // short name match, but no structure match
1342        let read_schema = union_schema(vec![Schema::Null, point_3d_no_default_schema()]);
1343        assert_eq!(
1344            CompatibilityError::MissingUnionElements,
1345            SchemaCompatibility::can_read(&point_2d_fullname_schema(), &read_schema).unwrap_err()
1346        );
1347    }
1348
1349    #[test]
1350    fn test_union_resolution_first_structure_match_2d() {
1351        // multiple structure matches with no name matches
1352        let read_schema = union_schema(vec![
1353            Schema::Null,
1354            point_3d_no_default_schema(),
1355            point_2d_schema(),
1356            point_3d_schema(),
1357        ]);
1358        assert_eq!(
1359            CompatibilityError::MissingUnionElements,
1360            SchemaCompatibility::can_read(&point_2d_fullname_schema(), &read_schema).unwrap_err()
1361        );
1362    }
1363
1364    #[test]
1365    fn test_union_resolution_first_structure_match_3d() {
1366        // multiple structure matches with no name matches
1367        let read_schema = union_schema(vec![
1368            Schema::Null,
1369            point_3d_no_default_schema(),
1370            point_3d_schema(),
1371            point_2d_schema(),
1372        ]);
1373        assert_eq!(
1374            CompatibilityError::MissingUnionElements,
1375            SchemaCompatibility::can_read(&point_2d_fullname_schema(), &read_schema).unwrap_err()
1376        );
1377    }
1378
1379    #[test]
1380    fn test_union_resolution_named_structure_match() {
1381        // multiple structure matches with a short name match
1382        let read_schema = union_schema(vec![
1383            Schema::Null,
1384            point_2d_schema(),
1385            point_3d_match_name_schema(),
1386            point_3d_schema(),
1387        ]);
1388        assert_eq!(
1389            CompatibilityError::MissingUnionElements,
1390            SchemaCompatibility::can_read(&point_2d_fullname_schema(), &read_schema).unwrap_err()
1391        );
1392    }
1393
1394    #[test]
1395    fn test_union_resolution_full_name_match() {
1396        // there is a full name match that should be chosen
1397        let read_schema = union_schema(vec![
1398            Schema::Null,
1399            point_2d_schema(),
1400            point_3d_match_name_schema(),
1401            point_3d_schema(),
1402            point_2d_fullname_schema(),
1403        ]);
1404        assert!(SchemaCompatibility::can_read(&point_2d_fullname_schema(), &read_schema).is_ok());
1405    }
1406
1407    #[test]
1408    fn test_avro_3772_enum_default() -> TestResult {
1409        let writer_raw_schema = r#"
1410        {
1411          "type": "record",
1412          "name": "test",
1413          "fields": [
1414            {"name": "a", "type": "long", "default": 42},
1415            {"name": "b", "type": "string"},
1416            {
1417              "name": "c",
1418              "type": {
1419                "type": "enum",
1420                "name": "suit",
1421                "symbols": ["diamonds", "spades", "clubs", "hearts"],
1422                "default": "spades"
1423              }
1424            }
1425          ]
1426        }
1427        "#;
1428
1429        let reader_raw_schema = r#"
1430        {
1431          "type": "record",
1432          "name": "test",
1433          "fields": [
1434            {"name": "a", "type": "long", "default": 42},
1435            {"name": "b", "type": "string"},
1436            {
1437              "name": "c",
1438              "type": {
1439                 "type": "enum",
1440                 "name": "suit",
1441                 "symbols": ["diamonds", "spades", "ninja", "hearts"],
1442                 "default": "spades"
1443              }
1444            }
1445          ]
1446        }
1447      "#;
1448        let writer_schema = Schema::parse_str(writer_raw_schema)?;
1449        let reader_schema = Schema::parse_str(reader_raw_schema)?;
1450        let mut writer = Writer::with_codec(&writer_schema, Vec::new(), Codec::Null);
1451        let mut record = Record::new(writer.schema()).unwrap();
1452        record.put("a", 27i64);
1453        record.put("b", "foo");
1454        record.put("c", "clubs");
1455        writer.append(record).unwrap();
1456        let input = writer.into_inner()?;
1457        let mut reader = Reader::with_schema(&reader_schema, &input[..])?;
1458        assert_eq!(
1459            reader.next().unwrap().unwrap(),
1460            Value::Record(vec![
1461                ("a".to_string(), Value::Long(27)),
1462                ("b".to_string(), Value::String("foo".to_string())),
1463                ("c".to_string(), Value::Enum(1, "spades".to_string())),
1464            ])
1465        );
1466        assert!(reader.next().is_none());
1467
1468        Ok(())
1469    }
1470
1471    #[test]
1472    fn test_avro_3772_enum_default_less_symbols() -> TestResult {
1473        let writer_raw_schema = r#"
1474        {
1475          "type": "record",
1476          "name": "test",
1477          "fields": [
1478            {"name": "a", "type": "long", "default": 42},
1479            {"name": "b", "type": "string"},
1480            {
1481              "name": "c",
1482              "type": {
1483                "type": "enum",
1484                "name": "suit",
1485                "symbols": ["diamonds", "spades", "clubs", "hearts"],
1486                "default": "spades"
1487              }
1488            }
1489          ]
1490        }
1491        "#;
1492
1493        let reader_raw_schema = r#"
1494        {
1495          "type": "record",
1496          "name": "test",
1497          "fields": [
1498            {"name": "a", "type": "long", "default": 42},
1499            {"name": "b", "type": "string"},
1500            {
1501              "name": "c",
1502              "type": {
1503                 "type": "enum",
1504                  "name": "suit",
1505                  "symbols": ["hearts", "spades"],
1506                  "default": "spades"
1507              }
1508            }
1509          ]
1510        }
1511      "#;
1512        let writer_schema = Schema::parse_str(writer_raw_schema)?;
1513        let reader_schema = Schema::parse_str(reader_raw_schema)?;
1514        let mut writer = Writer::with_codec(&writer_schema, Vec::new(), Codec::Null);
1515        let mut record = Record::new(writer.schema()).unwrap();
1516        record.put("a", 27i64);
1517        record.put("b", "foo");
1518        record.put("c", "hearts");
1519        writer.append(record).unwrap();
1520        let input = writer.into_inner()?;
1521        let mut reader = Reader::with_schema(&reader_schema, &input[..])?;
1522        assert_eq!(
1523            reader.next().unwrap().unwrap(),
1524            Value::Record(vec![
1525                ("a".to_string(), Value::Long(27)),
1526                ("b".to_string(), Value::String("foo".to_string())),
1527                ("c".to_string(), Value::Enum(0, "hearts".to_string())),
1528            ])
1529        );
1530        assert!(reader.next().is_none());
1531
1532        Ok(())
1533    }
1534
1535    #[test]
1536    fn avro_3894_take_aliases_into_account_when_serializing_for_schema_compatibility() -> TestResult
1537    {
1538        let schema_v1 = Schema::parse_str(
1539            r#"
1540        {
1541            "type": "record",
1542            "name": "Conference",
1543            "namespace": "advdaba",
1544            "fields": [
1545                {"type": "string", "name": "name"},
1546                {"type": "long", "name": "date"}
1547            ]
1548        }"#,
1549        )?;
1550
1551        let schema_v2 = Schema::parse_str(
1552            r#"
1553        {
1554            "type": "record",
1555            "name": "Conference",
1556            "namespace": "advdaba",
1557            "fields": [
1558                {"type": "string", "name": "name"},
1559                {"type": "long", "name": "date", "aliases" : [ "time" ]}
1560            ]
1561        }"#,
1562        )?;
1563
1564        assert!(SchemaCompatibility::mutual_read(&schema_v1, &schema_v2).is_ok());
1565
1566        Ok(())
1567    }
1568
1569    #[test]
1570    fn avro_3917_take_aliases_into_account_for_schema_compatibility() -> TestResult {
1571        let schema_v1 = Schema::parse_str(
1572            r#"
1573        {
1574            "type": "record",
1575            "name": "Conference",
1576            "namespace": "advdaba",
1577            "fields": [
1578                {"type": "string", "name": "name"},
1579                {"type": "long", "name": "date", "aliases" : [ "time" ]}
1580            ]
1581        }"#,
1582        )?;
1583
1584        let schema_v2 = Schema::parse_str(
1585            r#"
1586        {
1587            "type": "record",
1588            "name": "Conference",
1589            "namespace": "advdaba",
1590            "fields": [
1591                {"type": "string", "name": "name"},
1592                {"type": "long", "name": "time"}
1593            ]
1594        }"#,
1595        )?;
1596
1597        assert!(SchemaCompatibility::can_read(&schema_v2, &schema_v1).is_ok());
1598        assert_eq!(
1599            CompatibilityError::MissingDefaultValue(String::from("time")),
1600            SchemaCompatibility::can_read(&schema_v1, &schema_v2).unwrap_err()
1601        );
1602
1603        Ok(())
1604    }
1605
1606    #[test]
1607    fn test_avro_3898_record_schemas_match_by_unqualified_name() -> TestResult {
1608        let schemas = [
1609            // Record schemas
1610            (
1611                Schema::parse_str(
1612                    r#"{
1613              "type": "record",
1614              "name": "Statistics",
1615              "fields": [
1616                { "name": "success", "type": "int" },
1617                { "name": "fail", "type": "int" },
1618                { "name": "time", "type": "string" },
1619                { "name": "max", "type": "int", "default": 0 }
1620              ]
1621            }"#,
1622                )?,
1623                Schema::parse_str(
1624                    r#"{
1625              "type": "record",
1626              "name": "Statistics",
1627              "namespace": "my.namespace",
1628              "fields": [
1629                { "name": "success", "type": "int" },
1630                { "name": "fail", "type": "int" },
1631                { "name": "time", "type": "string" },
1632                { "name": "average", "type": "int", "default": 0}
1633              ]
1634            }"#,
1635                )?,
1636            ),
1637            // Enum schemas
1638            (
1639                Schema::parse_str(
1640                    r#"{
1641                    "type": "enum",
1642                    "name": "Suit",
1643                    "symbols": ["diamonds", "spades", "clubs"]
1644                }"#,
1645                )?,
1646                Schema::parse_str(
1647                    r#"{
1648                    "type": "enum",
1649                    "name": "Suit",
1650                    "namespace": "my.namespace",
1651                    "symbols": ["diamonds", "spades", "clubs", "hearts"]
1652                }"#,
1653                )?,
1654            ),
1655            // Fixed schemas
1656            (
1657                Schema::parse_str(
1658                    r#"{
1659                    "type": "fixed",
1660                    "name": "EmployeeId",
1661                    "size": 16
1662                }"#,
1663                )?,
1664                Schema::parse_str(
1665                    r#"{
1666                    "type": "fixed",
1667                    "name": "EmployeeId",
1668                    "namespace": "my.namespace",
1669                    "size": 16
1670                }"#,
1671                )?,
1672            ),
1673        ];
1674
1675        for (schema_1, schema_2) in schemas {
1676            assert!(SchemaCompatibility::can_read(&schema_1, &schema_2).is_ok());
1677        }
1678
1679        Ok(())
1680    }
1681
1682    #[test]
1683    fn test_can_read_compatibility_errors() -> TestResult {
1684        let schemas = [
1685            (
1686                Schema::parse_str(
1687                r#"{
1688                    "type": "record",
1689                    "name": "StatisticsMap",
1690                    "fields": [
1691                        {"name": "average", "type": "int", "default": 0},
1692                        {"name": "success", "type": {"type": "map", "values": "int"}}
1693                    ]
1694                }"#)?,
1695                Schema::parse_str(
1696                        r#"{
1697                    "type": "record",
1698                    "name": "StatisticsMap",
1699                    "fields": [
1700                        {"name": "average", "type": "int", "default": 0},
1701                        {"name": "success", "type": ["null", {"type": "map", "values": "int"}], "default": null}
1702                    ]
1703                }"#)?,
1704                "Incompatible schemata! Field 'success' in reader schema does not match the type in the writer schema"
1705            ),
1706            (
1707                Schema::parse_str(
1708                    r#"{
1709                        "type": "record",
1710                        "name": "StatisticsArray",
1711                        "fields": [
1712                            {"name": "max_values", "type": {"type": "array", "items": "int"}}
1713                        ]
1714                    }"#)?,
1715                    Schema::parse_str(
1716                    r#"{
1717                        "type": "record",
1718                        "name": "StatisticsArray",
1719                        "fields": [
1720                            {"name": "max_values", "type": ["null", {"type": "array", "items": "int"}], "default": null}
1721                        ]
1722                    }"#)?,
1723                    "Incompatible schemata! Field 'max_values' in reader schema does not match the type in the writer schema"
1724            )
1725        ];
1726
1727        for (schema_1, schema_2, error) in schemas {
1728            assert!(SchemaCompatibility::can_read(&schema_1, &schema_2).is_ok());
1729            assert_eq!(
1730                error,
1731                SchemaCompatibility::can_read(&schema_2, &schema_1)
1732                    .unwrap_err()
1733                    .to_string()
1734            );
1735        }
1736
1737        Ok(())
1738    }
1739
1740    #[test]
1741    fn avro_3974_can_read_schema_references() -> TestResult {
1742        let schema_strs = vec![
1743            r#"{
1744          "type": "record",
1745          "name": "Child",
1746          "namespace": "avro",
1747          "fields": [
1748            {
1749              "name": "val",
1750              "type": "int"
1751            }
1752          ]
1753        }
1754        "#,
1755            r#"{
1756          "type": "record",
1757          "name": "Parent",
1758          "namespace": "avro",
1759          "fields": [
1760            {
1761              "name": "child",
1762              "type": "avro.Child"
1763            }
1764          ]
1765        }
1766        "#,
1767        ];
1768
1769        let schemas = Schema::parse_list(&schema_strs).unwrap();
1770        SchemaCompatibility::can_read(&schemas[1], &schemas[1])?;
1771
1772        Ok(())
1773    }
1774}