apache_avro/
schema_compatibility.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Logic for checking schema compatibility
19use crate::{
20    error::CompatibilityError,
21    schema::{EnumSchema, FixedSchema, RecordSchema, Schema, SchemaKind},
22};
23use std::{
24    collections::{HashSet, hash_map::DefaultHasher},
25    hash::Hasher,
26    ptr,
27};
28
29fn match_ref_schemas(
30    writers_schema: &Schema,
31    readers_schema: &Schema,
32) -> Result<(), CompatibilityError> {
33    match (readers_schema, writers_schema) {
34        (Schema::Ref { name: r_name }, Schema::Ref { name: w_name }) => {
35            if r_name == w_name {
36                Ok(())
37            } else {
38                Err(CompatibilityError::NameMismatch {
39                    writer_name: w_name.fullname(None),
40                    reader_name: r_name.fullname(None),
41                })
42            }
43        }
44        _ => Err(CompatibilityError::WrongType {
45            writer_schema_type: format!("{writers_schema:#?}"),
46            reader_schema_type: format!("{readers_schema:#?}"),
47        }),
48    }
49}
50
51pub struct SchemaCompatibility;
52
53struct Checker {
54    recursion: HashSet<(u64, u64)>,
55}
56
57impl Checker {
58    /// Create a new checker, with recursion set to an empty set.
59    pub(crate) fn new() -> Self {
60        Self {
61            recursion: HashSet::new(),
62        }
63    }
64
65    pub(crate) fn can_read(
66        &mut self,
67        writers_schema: &Schema,
68        readers_schema: &Schema,
69    ) -> Result<(), CompatibilityError> {
70        self.full_match_schemas(writers_schema, readers_schema)
71    }
72
73    pub(crate) fn full_match_schemas(
74        &mut self,
75        writers_schema: &Schema,
76        readers_schema: &Schema,
77    ) -> Result<(), CompatibilityError> {
78        if self.recursion_in_progress(writers_schema, readers_schema) {
79            return Ok(());
80        }
81
82        SchemaCompatibility::match_schemas(writers_schema, readers_schema)?;
83
84        let w_type = SchemaKind::from(writers_schema);
85        let r_type = SchemaKind::from(readers_schema);
86
87        if w_type != SchemaKind::Union
88            && (r_type.is_primitive()
89                || r_type == SchemaKind::Fixed
90                || r_type == SchemaKind::Uuid
91                || r_type == SchemaKind::Date
92                || r_type == SchemaKind::TimeMillis
93                || r_type == SchemaKind::TimeMicros
94                || r_type == SchemaKind::TimestampMillis
95                || r_type == SchemaKind::TimestampMicros
96                || r_type == SchemaKind::TimestampNanos
97                || r_type == SchemaKind::LocalTimestampMillis
98                || r_type == SchemaKind::LocalTimestampMicros
99                || r_type == SchemaKind::LocalTimestampNanos)
100        {
101            return Ok(());
102        }
103
104        match r_type {
105            SchemaKind::Ref => match_ref_schemas(writers_schema, readers_schema),
106            SchemaKind::Record => self.match_record_schemas(writers_schema, readers_schema),
107            SchemaKind::Map => {
108                if let Schema::Map(w_m) = writers_schema {
109                    match readers_schema {
110                        Schema::Map(r_m) => self.full_match_schemas(&w_m.types, &r_m.types),
111                        _ => Err(CompatibilityError::WrongType {
112                            writer_schema_type: format!("{writers_schema:#?}"),
113                            reader_schema_type: format!("{readers_schema:#?}"),
114                        }),
115                    }
116                } else {
117                    Err(CompatibilityError::TypeExpected {
118                        schema_type: String::from("writers_schema"),
119                        expected_type: vec![SchemaKind::Record],
120                    })
121                }
122            }
123            SchemaKind::Array => {
124                if let Schema::Array(w_a) = writers_schema {
125                    match readers_schema {
126                        Schema::Array(r_a) => self.full_match_schemas(&w_a.items, &r_a.items),
127                        _ => Err(CompatibilityError::WrongType {
128                            writer_schema_type: format!("{writers_schema:#?}"),
129                            reader_schema_type: format!("{readers_schema:#?}"),
130                        }),
131                    }
132                } else {
133                    Err(CompatibilityError::TypeExpected {
134                        schema_type: String::from("writers_schema"),
135                        expected_type: vec![SchemaKind::Array],
136                    })
137                }
138            }
139            SchemaKind::Union => self.match_union_schemas(writers_schema, readers_schema),
140            SchemaKind::Enum => {
141                // reader's symbols must contain all writer's symbols
142                if let Schema::Enum(EnumSchema {
143                    symbols: w_symbols, ..
144                }) = writers_schema
145                {
146                    if let Schema::Enum(EnumSchema {
147                        symbols: r_symbols, ..
148                    }) = readers_schema
149                    {
150                        if w_symbols.iter().all(|e| r_symbols.contains(e)) {
151                            return Ok(());
152                        }
153                    }
154                }
155                Err(CompatibilityError::MissingSymbols)
156            }
157            _ => {
158                if w_type == SchemaKind::Union {
159                    if let Schema::Union(r) = writers_schema {
160                        if r.schemas.len() == 1 {
161                            return self.full_match_schemas(&r.schemas[0], readers_schema);
162                        }
163                    }
164                }
165                Err(CompatibilityError::Inconclusive(String::from(
166                    "writers_schema",
167                )))
168            }
169        }
170    }
171
172    fn match_record_schemas(
173        &mut self,
174        writers_schema: &Schema,
175        readers_schema: &Schema,
176    ) -> Result<(), CompatibilityError> {
177        let w_type = SchemaKind::from(writers_schema);
178
179        if w_type == SchemaKind::Union {
180            return Err(CompatibilityError::TypeExpected {
181                schema_type: String::from("writers_schema"),
182                expected_type: vec![SchemaKind::Record],
183            });
184        }
185
186        if let Schema::Record(RecordSchema {
187            fields: w_fields,
188            lookup: w_lookup,
189            ..
190        }) = writers_schema
191        {
192            if let Schema::Record(RecordSchema {
193                fields: r_fields, ..
194            }) = readers_schema
195            {
196                for field in r_fields.iter() {
197                    // get all field names in a vector (field.name + aliases)
198                    let mut fields_names = vec![&field.name];
199                    if let Some(ref aliases) = field.aliases {
200                        for alias in aliases {
201                            fields_names.push(alias);
202                        }
203                    }
204
205                    // Check whether any of the possible fields names are in the writer schema.
206                    // If the field was found, then it must have the exact same name with the writer field,
207                    // otherwise we would have a false positive with the writers aliases
208                    let position = fields_names.iter().find_map(|field_name| {
209                        if let Some(pos) = w_lookup.get(*field_name) {
210                            if &w_fields[*pos].name == *field_name {
211                                return Some(pos);
212                            }
213                        }
214                        None
215                    });
216
217                    match position {
218                        Some(pos) => {
219                            if let Err(err) =
220                                self.full_match_schemas(&w_fields[*pos].schema, &field.schema)
221                            {
222                                return Err(CompatibilityError::FieldTypeMismatch(
223                                    field.name.clone(),
224                                    Box::new(err),
225                                ));
226                            }
227                        }
228                        _ => {
229                            if field.default.is_none() {
230                                return Err(CompatibilityError::MissingDefaultValue(
231                                    field.name.clone(),
232                                ));
233                            }
234                        }
235                    }
236                }
237            }
238        }
239        Ok(())
240    }
241
242    fn match_union_schemas(
243        &mut self,
244        writers_schema: &Schema,
245        readers_schema: &Schema,
246    ) -> Result<(), CompatibilityError> {
247        if let Schema::Union(u) = writers_schema {
248            if u.schemas
249                .iter()
250                .all(|schema| self.full_match_schemas(schema, readers_schema).is_ok())
251            {
252                return Ok(());
253            } else {
254                return Err(CompatibilityError::MissingUnionElements);
255            }
256        } else if let Schema::Union(u) = readers_schema {
257            // This check is needed because the writer_schema can be not union
258            // but the type can be contain in the union of the reader schema
259            // e.g. writer_schema is string and reader_schema is [string, int]
260            if u.schemas
261                .iter()
262                .any(|schema| self.full_match_schemas(writers_schema, schema).is_ok())
263            {
264                return Ok(());
265            }
266        }
267        Err(CompatibilityError::MissingUnionElements)
268    }
269
270    fn recursion_in_progress(&mut self, writers_schema: &Schema, readers_schema: &Schema) -> bool {
271        let mut hasher = DefaultHasher::new();
272        ptr::hash(writers_schema, &mut hasher);
273        let w_hash = hasher.finish();
274
275        hasher = DefaultHasher::new();
276        ptr::hash(readers_schema, &mut hasher);
277        let r_hash = hasher.finish();
278
279        let key = (w_hash, r_hash);
280        // This is a shortcut to add if not exists *and* return false. It will return true
281        // if it was able to insert.
282        !self.recursion.insert(key)
283    }
284}
285
286impl SchemaCompatibility {
287    /// `can_read` performs a full, recursive check that a datum written using the
288    /// writers_schema can be read using the readers_schema.
289    pub fn can_read(
290        writers_schema: &Schema,
291        readers_schema: &Schema,
292    ) -> Result<(), CompatibilityError> {
293        let mut c = Checker::new();
294        c.can_read(writers_schema, readers_schema)
295    }
296
297    /// `mutual_read` performs a full, recursive check that a datum written using either
298    /// the writers_schema or the readers_schema can be read using the other schema.
299    pub fn mutual_read(
300        writers_schema: &Schema,
301        readers_schema: &Schema,
302    ) -> Result<(), CompatibilityError> {
303        SchemaCompatibility::can_read(writers_schema, readers_schema)?;
304        SchemaCompatibility::can_read(readers_schema, writers_schema)
305    }
306
307    ///  `match_schemas` performs a basic check that a datum written with the
308    ///  writers_schema could be read using the readers_schema. This check only includes
309    ///  matching the types, including schema promotion, and matching the full name for
310    ///  named types. Aliases for named types are not supported here, and the rust
311    ///  implementation of Avro in general does not include support for aliases (I think).
312    pub(crate) fn match_schemas(
313        writers_schema: &Schema,
314        readers_schema: &Schema,
315    ) -> Result<(), CompatibilityError> {
316        fn check_reader_type_multi(
317            reader_type: SchemaKind,
318            allowed_reader_types: Vec<SchemaKind>,
319            writer_type: SchemaKind,
320        ) -> Result<(), CompatibilityError> {
321            if allowed_reader_types.contains(&reader_type) {
322                Ok(())
323            } else {
324                let mut allowed_types: Vec<SchemaKind> = vec![writer_type];
325                allowed_types.extend_from_slice(allowed_reader_types.as_slice());
326                Err(CompatibilityError::TypeExpected {
327                    schema_type: String::from("readers_schema"),
328                    expected_type: allowed_types,
329                })
330            }
331        }
332
333        fn check_reader_type(
334            reader_type: SchemaKind,
335            allowed_reader_type: SchemaKind,
336            writer_type: SchemaKind,
337        ) -> Result<(), CompatibilityError> {
338            if reader_type == allowed_reader_type {
339                Ok(())
340            } else {
341                Err(CompatibilityError::TypeExpected {
342                    schema_type: String::from("readers_schema"),
343                    expected_type: vec![writer_type, allowed_reader_type],
344                })
345            }
346        }
347
348        fn check_writer_type(
349            writers_schema: &Schema,
350            allowed_schema: &Schema,
351            expected_schema_types: Vec<SchemaKind>,
352        ) -> Result<(), CompatibilityError> {
353            if *allowed_schema == *writers_schema {
354                Ok(())
355            } else {
356                Err(CompatibilityError::TypeExpected {
357                    schema_type: String::from("writers_schema"),
358                    expected_type: expected_schema_types,
359                })
360            }
361        }
362
363        let w_type = SchemaKind::from(writers_schema);
364        let r_type = SchemaKind::from(readers_schema);
365
366        if w_type == SchemaKind::Union || r_type == SchemaKind::Union {
367            return Ok(());
368        }
369
370        if w_type == r_type {
371            if r_type.is_primitive() {
372                return Ok(());
373            }
374
375            match r_type {
376                SchemaKind::Record | SchemaKind::Enum => {
377                    let msg = format!("A {readers_schema} type must always has a name");
378                    let writers_name = writers_schema.name().expect(&msg);
379                    let readers_name = readers_schema.name().expect(&msg);
380
381                    if writers_name.name == readers_name.name {
382                        return Ok(());
383                    }
384
385                    return Err(CompatibilityError::NameMismatch {
386                        writer_name: writers_name.name.clone(),
387                        reader_name: readers_name.name.clone(),
388                    });
389                }
390                SchemaKind::Fixed => {
391                    if let Schema::Fixed(FixedSchema {
392                        name: w_name,
393                        aliases: _,
394                        doc: _w_doc,
395                        size: w_size,
396                        default: _w_default,
397                        attributes: _,
398                    }) = writers_schema
399                    {
400                        if let Schema::Fixed(FixedSchema {
401                            name: r_name,
402                            aliases: _,
403                            doc: _r_doc,
404                            size: r_size,
405                            default: _r_default,
406                            attributes: _,
407                        }) = readers_schema
408                        {
409                            return (w_name.name == r_name.name && w_size == r_size)
410                                .then_some(())
411                                .ok_or(CompatibilityError::FixedMismatch);
412                        }
413                    }
414                }
415                SchemaKind::Map => {
416                    if let Schema::Map(w_m) = writers_schema {
417                        if let Schema::Map(r_m) = readers_schema {
418                            return SchemaCompatibility::match_schemas(&w_m.types, &r_m.types);
419                        }
420                    }
421                }
422                SchemaKind::Array => {
423                    if let Schema::Array(w_a) = writers_schema {
424                        if let Schema::Array(r_a) = readers_schema {
425                            return SchemaCompatibility::match_schemas(&w_a.items, &r_a.items);
426                        }
427                    }
428                }
429                SchemaKind::Uuid => {
430                    return check_writer_type(
431                        writers_schema,
432                        readers_schema,
433                        vec![r_type, SchemaKind::String],
434                    );
435                }
436                SchemaKind::Date | SchemaKind::TimeMillis => {
437                    return check_writer_type(
438                        writers_schema,
439                        readers_schema,
440                        vec![r_type, SchemaKind::Int],
441                    );
442                }
443                SchemaKind::TimeMicros
444                | SchemaKind::TimestampNanos
445                | SchemaKind::TimestampMillis
446                | SchemaKind::TimestampMicros
447                | SchemaKind::LocalTimestampMillis
448                | SchemaKind::LocalTimestampMicros
449                | SchemaKind::LocalTimestampNanos => {
450                    return check_writer_type(
451                        writers_schema,
452                        readers_schema,
453                        vec![r_type, SchemaKind::Long],
454                    );
455                }
456                SchemaKind::Duration => {
457                    return Ok(());
458                }
459                SchemaKind::Ref => return match_ref_schemas(writers_schema, readers_schema),
460                _ => {
461                    return Err(CompatibilityError::Inconclusive(String::from(
462                        "readers_schema",
463                    )));
464                }
465            };
466        }
467
468        // Here are the checks for primitive types
469        match w_type {
470            SchemaKind::Int => check_reader_type_multi(
471                r_type,
472                vec![
473                    SchemaKind::Long,
474                    SchemaKind::Float,
475                    SchemaKind::Double,
476                    SchemaKind::Date,
477                    SchemaKind::TimeMillis,
478                ],
479                w_type,
480            ),
481            SchemaKind::Long => check_reader_type_multi(
482                r_type,
483                vec![
484                    SchemaKind::Float,
485                    SchemaKind::Double,
486                    SchemaKind::TimeMicros,
487                    SchemaKind::TimestampMillis,
488                    SchemaKind::TimestampMicros,
489                    SchemaKind::TimestampNanos,
490                    SchemaKind::LocalTimestampMillis,
491                    SchemaKind::LocalTimestampMicros,
492                    SchemaKind::LocalTimestampNanos,
493                ],
494                w_type,
495            ),
496            SchemaKind::Float => {
497                check_reader_type_multi(r_type, vec![SchemaKind::Float, SchemaKind::Double], w_type)
498            }
499            SchemaKind::String => {
500                check_reader_type_multi(r_type, vec![SchemaKind::Bytes, SchemaKind::Uuid], w_type)
501            }
502            SchemaKind::Bytes => check_reader_type(r_type, SchemaKind::String, w_type),
503            SchemaKind::Uuid => check_reader_type(r_type, SchemaKind::String, w_type),
504            SchemaKind::Date | SchemaKind::TimeMillis => {
505                check_reader_type(r_type, SchemaKind::Int, w_type)
506            }
507            SchemaKind::TimeMicros
508            | SchemaKind::TimestampMicros
509            | SchemaKind::TimestampMillis
510            | SchemaKind::TimestampNanos
511            | SchemaKind::LocalTimestampMillis
512            | SchemaKind::LocalTimestampMicros
513            | SchemaKind::LocalTimestampNanos => {
514                check_reader_type(r_type, SchemaKind::Long, w_type)
515            }
516            _ => Err(CompatibilityError::Inconclusive(String::from(
517                "writers_schema",
518            ))),
519        }
520    }
521}
522
523#[cfg(test)]
524mod tests {
525    use super::*;
526    use crate::{
527        Codec, Reader, Writer,
528        types::{Record, Value},
529    };
530    use apache_avro_test_helper::TestResult;
531    use rstest::*;
532
533    fn int_array_schema() -> Schema {
534        Schema::parse_str(r#"{"type":"array", "items":"int"}"#).unwrap()
535    }
536
537    fn long_array_schema() -> Schema {
538        Schema::parse_str(r#"{"type":"array", "items":"long"}"#).unwrap()
539    }
540
541    fn string_array_schema() -> Schema {
542        Schema::parse_str(r#"{"type":"array", "items":"string"}"#).unwrap()
543    }
544
545    fn int_map_schema() -> Schema {
546        Schema::parse_str(r#"{"type":"map", "values":"int"}"#).unwrap()
547    }
548
549    fn long_map_schema() -> Schema {
550        Schema::parse_str(r#"{"type":"map", "values":"long"}"#).unwrap()
551    }
552
553    fn string_map_schema() -> Schema {
554        Schema::parse_str(r#"{"type":"map", "values":"string"}"#).unwrap()
555    }
556
557    fn enum1_ab_schema() -> Schema {
558        Schema::parse_str(r#"{"type":"enum", "name":"Enum1", "symbols":["A","B"]}"#).unwrap()
559    }
560
561    fn enum1_abc_schema() -> Schema {
562        Schema::parse_str(r#"{"type":"enum", "name":"Enum1", "symbols":["A","B","C"]}"#).unwrap()
563    }
564
565    fn enum1_bc_schema() -> Schema {
566        Schema::parse_str(r#"{"type":"enum", "name":"Enum1", "symbols":["B","C"]}"#).unwrap()
567    }
568
569    fn enum2_ab_schema() -> Schema {
570        Schema::parse_str(r#"{"type":"enum", "name":"Enum2", "symbols":["A","B"]}"#).unwrap()
571    }
572
573    fn empty_record1_schema() -> Schema {
574        Schema::parse_str(r#"{"type":"record", "name":"Record1", "fields":[]}"#).unwrap()
575    }
576
577    fn empty_record2_schema() -> Schema {
578        Schema::parse_str(r#"{"type":"record", "name":"Record2", "fields": []}"#).unwrap()
579    }
580
581    fn a_int_record1_schema() -> Schema {
582        Schema::parse_str(
583            r#"{"type":"record", "name":"Record1", "fields":[{"name":"a", "type":"int"}]}"#,
584        )
585        .unwrap()
586    }
587
588    fn a_long_record1_schema() -> Schema {
589        Schema::parse_str(
590            r#"{"type":"record", "name":"Record1", "fields":[{"name":"a", "type":"long"}]}"#,
591        )
592        .unwrap()
593    }
594
595    fn a_int_b_int_record1_schema() -> Schema {
596        Schema::parse_str(r#"{"type":"record", "name":"Record1", "fields":[{"name":"a", "type":"int"}, {"name":"b", "type":"int"}]}"#).unwrap()
597    }
598
599    fn a_dint_record1_schema() -> Schema {
600        Schema::parse_str(r#"{"type":"record", "name":"Record1", "fields":[{"name":"a", "type":"int", "default":0}]}"#).unwrap()
601    }
602
603    fn a_int_b_dint_record1_schema() -> Schema {
604        Schema::parse_str(r#"{"type":"record", "name":"Record1", "fields":[{"name":"a", "type":"int"}, {"name":"b", "type":"int", "default":0}]}"#).unwrap()
605    }
606
607    fn a_dint_b_dint_record1_schema() -> Schema {
608        Schema::parse_str(r#"{"type":"record", "name":"Record1", "fields":[{"name":"a", "type":"int", "default":0}, {"name":"b", "type":"int", "default":0}]}"#).unwrap()
609    }
610
611    fn nested_record() -> Schema {
612        Schema::parse_str(r#"{"type":"record","name":"parent","fields":[{"name":"attribute","type":{"type":"record","name":"child","fields":[{"name":"id","type":"string"}]}}]}"#).unwrap()
613    }
614
615    fn nested_optional_record() -> Schema {
616        Schema::parse_str(r#"{"type":"record","name":"parent","fields":[{"name":"attribute","type":["null",{"type":"record","name":"child","fields":[{"name":"id","type":"string"}]}],"default":null}]}"#).unwrap()
617    }
618
619    fn int_list_record_schema() -> Schema {
620        Schema::parse_str(r#"{"type":"record", "name":"List", "fields": [{"name": "head", "type": "int"},{"name": "tail", "type": "array", "items": "int"}]}"#).unwrap()
621    }
622
623    fn long_list_record_schema() -> Schema {
624        Schema::parse_str(
625            r#"
626      {
627        "type":"record", "name":"List", "fields": [
628          {"name": "head", "type": "long"},
629          {"name": "tail", "type": "array", "items": "long"}
630      ]}
631"#,
632        )
633        .unwrap()
634    }
635
636    fn union_schema(schemas: Vec<Schema>) -> Schema {
637        let schema_string = schemas
638            .iter()
639            .map(|s| s.canonical_form())
640            .collect::<Vec<String>>()
641            .join(",");
642        Schema::parse_str(&format!("[{schema_string}]")).unwrap()
643    }
644
645    fn empty_union_schema() -> Schema {
646        union_schema(vec![])
647    }
648
649    // unused
650    // fn null_union_schema() -> Schema { union_schema(vec![Schema::Null]) }
651
652    fn int_union_schema() -> Schema {
653        union_schema(vec![Schema::Int])
654    }
655
656    fn long_union_schema() -> Schema {
657        union_schema(vec![Schema::Long])
658    }
659
660    fn string_union_schema() -> Schema {
661        union_schema(vec![Schema::String])
662    }
663
664    fn int_string_union_schema() -> Schema {
665        union_schema(vec![Schema::Int, Schema::String])
666    }
667
668    fn string_int_union_schema() -> Schema {
669        union_schema(vec![Schema::String, Schema::Int])
670    }
671
672    #[test]
673    fn test_broken() {
674        assert_eq!(
675            CompatibilityError::MissingUnionElements,
676            SchemaCompatibility::can_read(&int_string_union_schema(), &int_union_schema())
677                .unwrap_err()
678        )
679    }
680
681    #[test]
682    fn test_incompatible_reader_writer_pairs() {
683        let incompatible_schemas = vec![
684            // null
685            (Schema::Null, Schema::Int),
686            (Schema::Null, Schema::Long),
687            // boolean
688            (Schema::Boolean, Schema::Int),
689            // int
690            (Schema::Int, Schema::Null),
691            (Schema::Int, Schema::Boolean),
692            (Schema::Int, Schema::Long),
693            (Schema::Int, Schema::Float),
694            (Schema::Int, Schema::Double),
695            // long
696            (Schema::Long, Schema::Float),
697            (Schema::Long, Schema::Double),
698            // float
699            (Schema::Float, Schema::Double),
700            // string
701            (Schema::String, Schema::Boolean),
702            (Schema::String, Schema::Int),
703            // bytes
704            (Schema::Bytes, Schema::Null),
705            (Schema::Bytes, Schema::Int),
706            // logical types
707            (Schema::TimeMicros, Schema::Int),
708            (Schema::TimestampMillis, Schema::Int),
709            (Schema::TimestampMicros, Schema::Int),
710            (Schema::TimestampNanos, Schema::Int),
711            (Schema::LocalTimestampMillis, Schema::Int),
712            (Schema::LocalTimestampMicros, Schema::Int),
713            (Schema::LocalTimestampNanos, Schema::Int),
714            (Schema::Date, Schema::Long),
715            (Schema::TimeMillis, Schema::Long),
716            // array and maps
717            (int_array_schema(), long_array_schema()),
718            (int_map_schema(), int_array_schema()),
719            (int_array_schema(), int_map_schema()),
720            (int_map_schema(), long_map_schema()),
721            // enum
722            (enum1_ab_schema(), enum1_abc_schema()),
723            (enum1_bc_schema(), enum1_abc_schema()),
724            (enum1_ab_schema(), enum2_ab_schema()),
725            (Schema::Int, enum2_ab_schema()),
726            (enum2_ab_schema(), Schema::Int),
727            //union
728            (int_union_schema(), int_string_union_schema()),
729            (string_union_schema(), int_string_union_schema()),
730            //record
731            (empty_record2_schema(), empty_record1_schema()),
732            (a_int_record1_schema(), empty_record1_schema()),
733            (a_int_b_dint_record1_schema(), empty_record1_schema()),
734            (int_list_record_schema(), long_list_record_schema()),
735            (nested_record(), nested_optional_record()),
736        ];
737
738        assert!(
739            incompatible_schemas
740                .iter()
741                .any(|(reader, writer)| SchemaCompatibility::can_read(writer, reader).is_err())
742        );
743    }
744
745    #[rstest]
746    // Record type test
747    #[case(
748        r#"{"type": "record", "name": "record_a", "fields": [{"type": "long", "name": "date"}]}"#,
749        r#"{"type": "record", "name": "record_a", "fields": [{"type": "long", "name": "date", "default": 18181}]}"#
750    )]
751    // Fixed type test
752    #[case(
753        r#"{"type": "fixed", "name": "EmployeeId", "size": 16}"#,
754        r#"{"type": "fixed", "name": "EmployeeId", "size": 16, "default": "u00ffffffffffffx"}"#
755    )]
756    // Enum type test
757    #[case(
758        r#"{"type": "enum", "name":"Enum1", "symbols": ["A","B"]}"#,
759        r#"{"type": "enum", "name":"Enum1", "symbols": ["A","B", "C"], "default": "C"}"#
760    )]
761    // Map type test
762    #[case(
763        r#"{"type": "map", "values": "int"}"#,
764        r#"{"type": "map", "values": "long"}"#
765    )]
766    // Date type
767    #[case(r#"{"type": "int"}"#, r#"{"type": "int", "logicalType": "date"}"#)]
768    // time-millis type
769    #[case(
770        r#"{"type": "int"}"#,
771        r#"{"type": "int", "logicalType": "time-millis"}"#
772    )]
773    // time-millis type
774    #[case(
775        r#"{"type": "long"}"#,
776        r#"{"type": "long", "logicalType": "time-micros"}"#
777    )]
778    // timetimestamp-nanos type
779    #[case(
780        r#"{"type": "long"}"#,
781        r#"{"type": "long", "logicalType": "timestamp-nanos"}"#
782    )]
783    // timestamp-millis type
784    #[case(
785        r#"{"type": "long"}"#,
786        r#"{"type": "long", "logicalType": "timestamp-millis"}"#
787    )]
788    // timestamp-micros type
789    #[case(
790        r#"{"type": "long"}"#,
791        r#"{"type": "long", "logicalType": "timestamp-micros"}"#
792    )]
793    // local-timestamp-millis type
794    #[case(
795        r#"{"type": "long"}"#,
796        r#"{"type": "long", "logicalType": "local-timestamp-millis"}"#
797    )]
798    // local-timestamp-micros type
799    #[case(
800        r#"{"type": "long"}"#,
801        r#"{"type": "long", "logicalType": "local-timestamp-micros"}"#
802    )]
803    // local-timestamp-nanos type
804    #[case(
805        r#"{"type": "long"}"#,
806        r#"{"type": "long", "logicalType": "local-timestamp-nanos"}"#
807    )]
808    // Array type test
809    #[case(
810        r#"{"type": "array", "items": "int"}"#,
811        r#"{"type": "array", "items": "long"}"#
812    )]
813    fn test_avro_3950_match_schemas_ok(
814        #[case] writer_schema_str: &str,
815        #[case] reader_schema_str: &str,
816    ) {
817        let writer_schema = Schema::parse_str(writer_schema_str).unwrap();
818        let reader_schema = Schema::parse_str(reader_schema_str).unwrap();
819
820        assert!(SchemaCompatibility::match_schemas(&writer_schema, &reader_schema).is_ok());
821    }
822
823    #[rstest]
824    // Record type test
825    #[case(
826        r#"{"type": "record", "name":"record_a", "fields": [{"type": "long", "name": "date"}]}"#,
827        r#"{"type": "record", "name":"record_b", "fields": [{"type": "long", "name": "date"}]}"#,
828        CompatibilityError::NameMismatch{writer_name: String::from("record_a"), reader_name: String::from("record_b")}
829    )]
830    // Fixed type test
831    #[case(
832        r#"{"type": "fixed", "name": "EmployeeId", "size": 16}"#,
833        r#"{"type": "fixed", "name": "EmployeeId", "size": 20}"#,
834        CompatibilityError::FixedMismatch
835    )]
836    // Enum type test
837    #[case(
838        r#"{"type": "enum", "name": "Enum1", "symbols": ["A","B"]}"#,
839        r#"{"type": "enum", "name": "Enum2", "symbols": ["A","B"]}"#,
840        CompatibilityError::NameMismatch{writer_name: String::from("Enum1"), reader_name: String::from("Enum2")}
841    )]
842    // Map type test
843    #[case(
844        r#"{"type":"map", "values": "long"}"#,
845        r#"{"type":"map", "values": "int"}"#,
846        CompatibilityError::TypeExpected {schema_type: String::from("readers_schema"), expected_type: vec![
847            SchemaKind::Long,
848            SchemaKind::Float,
849            SchemaKind::Double,
850            SchemaKind::TimeMicros,
851            SchemaKind::TimestampMillis,
852            SchemaKind::TimestampMicros,
853            SchemaKind::TimestampNanos,
854            SchemaKind::LocalTimestampMillis,
855            SchemaKind::LocalTimestampMicros,
856            SchemaKind::LocalTimestampNanos,
857        ]}
858    )]
859    // Array type test
860    #[case(
861        r#"{"type": "array", "items": "long"}"#,
862        r#"{"type": "array", "items": "int"}"#,
863        CompatibilityError::TypeExpected {schema_type: String::from("readers_schema"), expected_type: vec![
864            SchemaKind::Long,
865            SchemaKind::Float,
866            SchemaKind::Double,
867            SchemaKind::TimeMicros,
868            SchemaKind::TimestampMillis,
869            SchemaKind::TimestampMicros,
870            SchemaKind::TimestampNanos,
871            SchemaKind::LocalTimestampMillis,
872            SchemaKind::LocalTimestampMicros,
873            SchemaKind::LocalTimestampNanos,
874        ]}
875    )]
876    // Date type test
877    #[case(
878        r#"{"type": "string"}"#,
879        r#"{"type": "int", "logicalType": "date"}"#,
880        CompatibilityError::TypeExpected{schema_type: String::from("readers_schema"), expected_type: vec![
881            SchemaKind::String,
882            SchemaKind::Bytes,
883            SchemaKind::Uuid,
884        ]}
885    )]
886    // time-millis type
887    #[case(
888        r#"{"type": "string"}"#,
889        r#"{"type": "int", "logicalType": "time-millis"}"#,
890        CompatibilityError::TypeExpected{schema_type: String::from("readers_schema"), expected_type: vec![
891            SchemaKind::String,
892            SchemaKind::Bytes,
893            SchemaKind::Uuid,
894        ]}
895    )]
896    // time-millis type
897    #[case(
898        r#"{"type": "int"}"#,
899        r#"{"type": "long", "logicalType": "time-micros"}"#,
900        CompatibilityError::TypeExpected{schema_type: String::from("readers_schema"), expected_type: vec![
901            SchemaKind::Int,
902            SchemaKind::Long,
903            SchemaKind::Float,
904            SchemaKind::Double,
905            SchemaKind::Date,
906            SchemaKind::TimeMillis
907        ]}
908    )]
909    // timestamp-nanos type. This test should fail because it is not supported on schema parse_complex
910    // #[case(
911    //     r#"{"type": "string"}"#,
912    //     r#"{"type": "long", "logicalType": "timestamp-nanos"}"#,
913    //     CompatibilityError::TypeExpected{schema_type: String::from("readers_schema"), expected_type: vec![
914    //         SchemaKind::Int,
915    //         SchemaKind::Long,
916    //         SchemaKind::Float,
917    //         SchemaKind::Double,
918    //         SchemaKind::Date,
919    //         SchemaKind::TimeMillis
920    //     ]}
921    // )]
922    // timestamp-millis type
923    #[case(
924        r#"{"type": "int"}"#,
925        r#"{"type": "long", "logicalType": "timestamp-millis"}"#,
926        CompatibilityError::TypeExpected{schema_type: String::from("readers_schema"), expected_type: vec![
927            SchemaKind::Int,
928            SchemaKind::Long,
929            SchemaKind::Float,
930            SchemaKind::Double,
931            SchemaKind::Date,
932            SchemaKind::TimeMillis
933        ]}
934    )]
935    // timestamp-micros type
936    #[case(
937        r#"{"type": "int"}"#,
938        r#"{"type": "long", "logicalType": "timestamp-micros"}"#,
939        CompatibilityError::TypeExpected{schema_type: String::from("readers_schema"), expected_type: vec![
940            SchemaKind::Int,
941            SchemaKind::Long,
942            SchemaKind::Float,
943            SchemaKind::Double,
944            SchemaKind::Date,
945            SchemaKind::TimeMillis
946        ]}
947    )]
948    // local-timestamp-millis type
949    #[case(
950        r#"{"type": "int"}"#,
951        r#"{"type": "long", "logicalType": "local-timestamp-millis"}"#,
952        CompatibilityError::TypeExpected{schema_type: String::from("readers_schema"), expected_type: vec![
953            SchemaKind::Int,
954            SchemaKind::Long,
955            SchemaKind::Float,
956            SchemaKind::Double,
957            SchemaKind::Date,
958            SchemaKind::TimeMillis
959        ]}
960    )]
961    // local-timestamp-micros type
962    #[case(
963        r#"{"type": "int"}"#,
964        r#"{"type": "long", "logicalType": "local-timestamp-micros"}"#,
965        CompatibilityError::TypeExpected{schema_type: String::from("readers_schema"), expected_type: vec![
966            SchemaKind::Int,
967            SchemaKind::Long,
968            SchemaKind::Float,
969            SchemaKind::Double,
970            SchemaKind::Date,
971            SchemaKind::TimeMillis
972        ]}
973    )]
974    // local-timestamp-nanos type. This test should fail because it is not supported on schema parse_complex
975    // #[case(
976    //     r#"{"type": "int"}"#,
977    //     r#"{"type": "long", "logicalType": "local-timestamp-nanos"}"#,
978    //     CompatibilityError::TypeExpected{schema_type: String::from("readers_schema"), expected_type: vec![
979    //         SchemaKind::Int,
980    //         SchemaKind::Long,
981    //         SchemaKind::Float,
982    //         SchemaKind::Double,
983    //         SchemaKind::Date,
984    //         SchemaKind::TimeMillis
985    //     ]}
986    // )]
987    // When comparing different types we always get Inconclusive
988    #[case(
989        r#"{"type": "record", "name":"record_b", "fields": [{"type": "long", "name": "date"}]}"#,
990        r#"{"type": "fixed", "name": "EmployeeId", "size": 16}"#,
991        CompatibilityError::Inconclusive(String::from("writers_schema"))
992    )]
993    fn test_avro_3950_match_schemas_error(
994        #[case] writer_schema_str: &str,
995        #[case] reader_schema_str: &str,
996        #[case] expected_error: CompatibilityError,
997    ) {
998        let writer_schema = Schema::parse_str(writer_schema_str).unwrap();
999        let reader_schema = Schema::parse_str(reader_schema_str).unwrap();
1000
1001        assert_eq!(
1002            expected_error,
1003            SchemaCompatibility::match_schemas(&writer_schema, &reader_schema).unwrap_err()
1004        )
1005    }
1006
1007    #[test]
1008    fn test_compatible_reader_writer_pairs() {
1009        let compatible_schemas = vec![
1010            (Schema::Null, Schema::Null),
1011            (Schema::Long, Schema::Int),
1012            (Schema::Float, Schema::Int),
1013            (Schema::Float, Schema::Long),
1014            (Schema::Double, Schema::Long),
1015            (Schema::Double, Schema::Int),
1016            (Schema::Double, Schema::Float),
1017            (Schema::String, Schema::Bytes),
1018            (Schema::Bytes, Schema::String),
1019            // logical types
1020            (Schema::Uuid, Schema::Uuid),
1021            (Schema::Uuid, Schema::String),
1022            (Schema::Date, Schema::Int),
1023            (Schema::TimeMillis, Schema::Int),
1024            (Schema::TimeMicros, Schema::Long),
1025            (Schema::TimestampMillis, Schema::Long),
1026            (Schema::TimestampMicros, Schema::Long),
1027            (Schema::TimestampNanos, Schema::Long),
1028            (Schema::LocalTimestampMillis, Schema::Long),
1029            (Schema::LocalTimestampMicros, Schema::Long),
1030            (Schema::LocalTimestampNanos, Schema::Long),
1031            (Schema::String, Schema::Uuid),
1032            (Schema::Int, Schema::Date),
1033            (Schema::Int, Schema::TimeMillis),
1034            (Schema::Long, Schema::TimeMicros),
1035            (Schema::Long, Schema::TimestampMillis),
1036            (Schema::Long, Schema::TimestampMicros),
1037            (Schema::Long, Schema::TimestampNanos),
1038            (Schema::Long, Schema::LocalTimestampMillis),
1039            (Schema::Long, Schema::LocalTimestampMicros),
1040            (Schema::Long, Schema::LocalTimestampNanos),
1041            (int_array_schema(), int_array_schema()),
1042            (long_array_schema(), int_array_schema()),
1043            (int_map_schema(), int_map_schema()),
1044            (long_map_schema(), int_map_schema()),
1045            (enum1_ab_schema(), enum1_ab_schema()),
1046            (enum1_abc_schema(), enum1_ab_schema()),
1047            (empty_union_schema(), empty_union_schema()),
1048            (int_union_schema(), int_union_schema()),
1049            (int_string_union_schema(), string_int_union_schema()),
1050            (int_union_schema(), empty_union_schema()),
1051            (long_union_schema(), int_union_schema()),
1052            (int_union_schema(), Schema::Int),
1053            (Schema::Int, int_union_schema()),
1054            (empty_record1_schema(), empty_record1_schema()),
1055            (empty_record1_schema(), a_int_record1_schema()),
1056            (a_int_record1_schema(), a_int_record1_schema()),
1057            (a_dint_record1_schema(), a_int_record1_schema()),
1058            (a_dint_record1_schema(), a_dint_record1_schema()),
1059            (a_int_record1_schema(), a_dint_record1_schema()),
1060            (a_long_record1_schema(), a_int_record1_schema()),
1061            (a_int_record1_schema(), a_int_b_int_record1_schema()),
1062            (a_dint_record1_schema(), a_int_b_int_record1_schema()),
1063            (a_int_b_dint_record1_schema(), a_int_record1_schema()),
1064            (a_dint_b_dint_record1_schema(), empty_record1_schema()),
1065            (a_dint_b_dint_record1_schema(), a_int_record1_schema()),
1066            (a_int_b_int_record1_schema(), a_dint_b_dint_record1_schema()),
1067            (int_list_record_schema(), int_list_record_schema()),
1068            (long_list_record_schema(), long_list_record_schema()),
1069            (long_list_record_schema(), int_list_record_schema()),
1070            (nested_optional_record(), nested_record()),
1071        ];
1072
1073        assert!(
1074            compatible_schemas
1075                .iter()
1076                .all(|(reader, writer)| SchemaCompatibility::can_read(writer, reader).is_ok())
1077        );
1078    }
1079
1080    fn writer_schema() -> Schema {
1081        Schema::parse_str(
1082            r#"
1083      {"type":"record", "name":"Record", "fields":[
1084        {"name":"oldfield1", "type":"int"},
1085        {"name":"oldfield2", "type":"string"}
1086      ]}
1087"#,
1088        )
1089        .unwrap()
1090    }
1091
1092    #[test]
1093    fn test_missing_field() -> TestResult {
1094        let reader_schema = Schema::parse_str(
1095            r#"
1096      {"type":"record", "name":"Record", "fields":[
1097        {"name":"oldfield1", "type":"int"}
1098      ]}
1099"#,
1100        )?;
1101        assert!(SchemaCompatibility::can_read(&writer_schema(), &reader_schema,).is_ok());
1102        assert_eq!(
1103            CompatibilityError::MissingDefaultValue(String::from("oldfield2")),
1104            SchemaCompatibility::can_read(&reader_schema, &writer_schema()).unwrap_err()
1105        );
1106
1107        Ok(())
1108    }
1109
1110    #[test]
1111    fn test_missing_second_field() -> TestResult {
1112        let reader_schema = Schema::parse_str(
1113            r#"
1114        {"type":"record", "name":"Record", "fields":[
1115          {"name":"oldfield2", "type":"string"}
1116        ]}
1117"#,
1118        )?;
1119        assert!(SchemaCompatibility::can_read(&writer_schema(), &reader_schema).is_ok());
1120        assert_eq!(
1121            CompatibilityError::MissingDefaultValue(String::from("oldfield1")),
1122            SchemaCompatibility::can_read(&reader_schema, &writer_schema()).unwrap_err()
1123        );
1124
1125        Ok(())
1126    }
1127
1128    #[test]
1129    fn test_all_fields() -> TestResult {
1130        let reader_schema = Schema::parse_str(
1131            r#"
1132        {"type":"record", "name":"Record", "fields":[
1133          {"name":"oldfield1", "type":"int"},
1134          {"name":"oldfield2", "type":"string"}
1135        ]}
1136"#,
1137        )?;
1138        assert!(SchemaCompatibility::can_read(&writer_schema(), &reader_schema).is_ok());
1139        assert!(SchemaCompatibility::can_read(&reader_schema, &writer_schema()).is_ok());
1140
1141        Ok(())
1142    }
1143
1144    #[test]
1145    fn test_new_field_with_default() -> TestResult {
1146        let reader_schema = Schema::parse_str(
1147            r#"
1148        {"type":"record", "name":"Record", "fields":[
1149          {"name":"oldfield1", "type":"int"},
1150          {"name":"newfield1", "type":"int", "default":42}
1151        ]}
1152"#,
1153        )?;
1154        assert!(SchemaCompatibility::can_read(&writer_schema(), &reader_schema).is_ok());
1155        assert_eq!(
1156            CompatibilityError::MissingDefaultValue(String::from("oldfield2")),
1157            SchemaCompatibility::can_read(&reader_schema, &writer_schema()).unwrap_err()
1158        );
1159
1160        Ok(())
1161    }
1162
1163    #[test]
1164    fn test_new_field() -> TestResult {
1165        let reader_schema = Schema::parse_str(
1166            r#"
1167        {"type":"record", "name":"Record", "fields":[
1168          {"name":"oldfield1", "type":"int"},
1169          {"name":"newfield1", "type":"int"}
1170        ]}
1171"#,
1172        )?;
1173        assert_eq!(
1174            CompatibilityError::MissingDefaultValue(String::from("newfield1")),
1175            SchemaCompatibility::can_read(&writer_schema(), &reader_schema).unwrap_err()
1176        );
1177        assert_eq!(
1178            CompatibilityError::MissingDefaultValue(String::from("oldfield2")),
1179            SchemaCompatibility::can_read(&reader_schema, &writer_schema()).unwrap_err()
1180        );
1181
1182        Ok(())
1183    }
1184
1185    #[test]
1186    fn test_array_writer_schema() {
1187        let valid_reader = string_array_schema();
1188        let invalid_reader = string_map_schema();
1189
1190        assert!(SchemaCompatibility::can_read(&string_array_schema(), &valid_reader).is_ok());
1191        assert_eq!(
1192            CompatibilityError::Inconclusive(String::from("writers_schema")),
1193            SchemaCompatibility::can_read(&string_array_schema(), &invalid_reader).unwrap_err()
1194        );
1195    }
1196
1197    #[test]
1198    fn test_primitive_writer_schema() {
1199        let valid_reader = Schema::String;
1200        assert!(SchemaCompatibility::can_read(&Schema::String, &valid_reader).is_ok());
1201        assert_eq!(
1202            CompatibilityError::TypeExpected {
1203                schema_type: String::from("readers_schema"),
1204                expected_type: vec![
1205                    SchemaKind::Int,
1206                    SchemaKind::Long,
1207                    SchemaKind::Float,
1208                    SchemaKind::Double,
1209                    SchemaKind::Date,
1210                    SchemaKind::TimeMillis
1211                ],
1212            },
1213            SchemaCompatibility::can_read(&Schema::Int, &Schema::String).unwrap_err()
1214        );
1215    }
1216
1217    #[test]
1218    fn test_union_reader_writer_subset_incompatibility() {
1219        // reader union schema must contain all writer union branches
1220        let union_writer = union_schema(vec![Schema::Int, Schema::String]);
1221        let union_reader = union_schema(vec![Schema::String]);
1222
1223        assert_eq!(
1224            CompatibilityError::MissingUnionElements,
1225            SchemaCompatibility::can_read(&union_writer, &union_reader).unwrap_err()
1226        );
1227        assert!(SchemaCompatibility::can_read(&union_reader, &union_writer).is_ok());
1228    }
1229
1230    #[test]
1231    fn test_incompatible_record_field() -> TestResult {
1232        let string_schema = Schema::parse_str(
1233            r#"
1234        {"type":"record", "name":"MyRecord", "namespace":"ns", "fields": [
1235            {"name":"field1", "type":"string"}
1236        ]}
1237        "#,
1238        )?;
1239
1240        let int_schema = Schema::parse_str(
1241            r#"
1242              {"type":"record", "name":"MyRecord", "namespace":"ns", "fields": [
1243                {"name":"field1", "type":"int"}
1244              ]}
1245        "#,
1246        )?;
1247
1248        assert_eq!(
1249            CompatibilityError::FieldTypeMismatch(
1250                "field1".to_owned(),
1251                Box::new(CompatibilityError::TypeExpected {
1252                    schema_type: "readers_schema".to_owned(),
1253                    expected_type: vec![SchemaKind::String, SchemaKind::Bytes, SchemaKind::Uuid]
1254                })
1255            ),
1256            SchemaCompatibility::can_read(&string_schema, &int_schema).unwrap_err()
1257        );
1258
1259        Ok(())
1260    }
1261
1262    #[test]
1263    fn test_enum_symbols() -> TestResult {
1264        let enum_schema1 = Schema::parse_str(
1265            r#"
1266      {"type":"enum", "name":"MyEnum", "symbols":["A","B"]}
1267"#,
1268        )?;
1269        let enum_schema2 =
1270            Schema::parse_str(r#"{"type":"enum", "name":"MyEnum", "symbols":["A","B","C"]}"#)?;
1271        assert_eq!(
1272            CompatibilityError::MissingSymbols,
1273            SchemaCompatibility::can_read(&enum_schema2, &enum_schema1).unwrap_err()
1274        );
1275        assert!(SchemaCompatibility::can_read(&enum_schema1, &enum_schema2).is_ok());
1276
1277        Ok(())
1278    }
1279
1280    fn point_2d_schema() -> Schema {
1281        Schema::parse_str(
1282            r#"
1283      {"type":"record", "name":"Point2D", "fields":[
1284        {"name":"x", "type":"double"},
1285        {"name":"y", "type":"double"}
1286      ]}
1287    "#,
1288        )
1289        .unwrap()
1290    }
1291
1292    fn point_2d_fullname_schema() -> Schema {
1293        Schema::parse_str(
1294            r#"
1295      {"type":"record", "name":"Point", "namespace":"written", "fields":[
1296        {"name":"x", "type":"double"},
1297        {"name":"y", "type":"double"}
1298      ]}
1299    "#,
1300        )
1301        .unwrap()
1302    }
1303
1304    fn point_3d_no_default_schema() -> Schema {
1305        Schema::parse_str(
1306            r#"
1307      {"type":"record", "name":"Point", "fields":[
1308        {"name":"x", "type":"double"},
1309        {"name":"y", "type":"double"},
1310        {"name":"z", "type":"double"}
1311      ]}
1312    "#,
1313        )
1314        .unwrap()
1315    }
1316
1317    fn point_3d_schema() -> Schema {
1318        Schema::parse_str(
1319            r#"
1320      {"type":"record", "name":"Point3D", "fields":[
1321        {"name":"x", "type":"double"},
1322        {"name":"y", "type":"double"},
1323        {"name":"z", "type":"double", "default": 0.0}
1324      ]}
1325    "#,
1326        )
1327        .unwrap()
1328    }
1329
1330    fn point_3d_match_name_schema() -> Schema {
1331        Schema::parse_str(
1332            r#"
1333      {"type":"record", "name":"Point", "fields":[
1334        {"name":"x", "type":"double"},
1335        {"name":"y", "type":"double"},
1336        {"name":"z", "type":"double", "default": 0.0}
1337      ]}
1338    "#,
1339        )
1340        .unwrap()
1341    }
1342
1343    #[test]
1344    fn test_union_resolution_no_structure_match() {
1345        // short name match, but no structure match
1346        let read_schema = union_schema(vec![Schema::Null, point_3d_no_default_schema()]);
1347        assert_eq!(
1348            CompatibilityError::MissingUnionElements,
1349            SchemaCompatibility::can_read(&point_2d_fullname_schema(), &read_schema).unwrap_err()
1350        );
1351    }
1352
1353    #[test]
1354    fn test_union_resolution_first_structure_match_2d() {
1355        // multiple structure matches with no name matches
1356        let read_schema = union_schema(vec![
1357            Schema::Null,
1358            point_3d_no_default_schema(),
1359            point_2d_schema(),
1360            point_3d_schema(),
1361        ]);
1362        assert_eq!(
1363            CompatibilityError::MissingUnionElements,
1364            SchemaCompatibility::can_read(&point_2d_fullname_schema(), &read_schema).unwrap_err()
1365        );
1366    }
1367
1368    #[test]
1369    fn test_union_resolution_first_structure_match_3d() {
1370        // multiple structure matches with no name matches
1371        let read_schema = union_schema(vec![
1372            Schema::Null,
1373            point_3d_no_default_schema(),
1374            point_3d_schema(),
1375            point_2d_schema(),
1376        ]);
1377        assert_eq!(
1378            CompatibilityError::MissingUnionElements,
1379            SchemaCompatibility::can_read(&point_2d_fullname_schema(), &read_schema).unwrap_err()
1380        );
1381    }
1382
1383    #[test]
1384    fn test_union_resolution_named_structure_match() {
1385        // multiple structure matches with a short name match
1386        let read_schema = union_schema(vec![
1387            Schema::Null,
1388            point_2d_schema(),
1389            point_3d_match_name_schema(),
1390            point_3d_schema(),
1391        ]);
1392        assert_eq!(
1393            CompatibilityError::MissingUnionElements,
1394            SchemaCompatibility::can_read(&point_2d_fullname_schema(), &read_schema).unwrap_err()
1395        );
1396    }
1397
1398    #[test]
1399    fn test_union_resolution_full_name_match() {
1400        // there is a full name match that should be chosen
1401        let read_schema = union_schema(vec![
1402            Schema::Null,
1403            point_2d_schema(),
1404            point_3d_match_name_schema(),
1405            point_3d_schema(),
1406            point_2d_fullname_schema(),
1407        ]);
1408        assert!(SchemaCompatibility::can_read(&point_2d_fullname_schema(), &read_schema).is_ok());
1409    }
1410
1411    #[test]
1412    fn test_avro_3772_enum_default() -> TestResult {
1413        let writer_raw_schema = r#"
1414        {
1415          "type": "record",
1416          "name": "test",
1417          "fields": [
1418            {"name": "a", "type": "long", "default": 42},
1419            {"name": "b", "type": "string"},
1420            {
1421              "name": "c",
1422              "type": {
1423                "type": "enum",
1424                "name": "suit",
1425                "symbols": ["diamonds", "spades", "clubs", "hearts"],
1426                "default": "spades"
1427              }
1428            }
1429          ]
1430        }
1431        "#;
1432
1433        let reader_raw_schema = r#"
1434        {
1435          "type": "record",
1436          "name": "test",
1437          "fields": [
1438            {"name": "a", "type": "long", "default": 42},
1439            {"name": "b", "type": "string"},
1440            {
1441              "name": "c",
1442              "type": {
1443                 "type": "enum",
1444                 "name": "suit",
1445                 "symbols": ["diamonds", "spades", "ninja", "hearts"],
1446                 "default": "spades"
1447              }
1448            }
1449          ]
1450        }
1451      "#;
1452        let writer_schema = Schema::parse_str(writer_raw_schema)?;
1453        let reader_schema = Schema::parse_str(reader_raw_schema)?;
1454        let mut writer = Writer::with_codec(&writer_schema, Vec::new(), Codec::Null);
1455        let mut record = Record::new(writer.schema()).unwrap();
1456        record.put("a", 27i64);
1457        record.put("b", "foo");
1458        record.put("c", "clubs");
1459        writer.append(record).unwrap();
1460        let input = writer.into_inner()?;
1461        let mut reader = Reader::with_schema(&reader_schema, &input[..])?;
1462        assert_eq!(
1463            reader.next().unwrap().unwrap(),
1464            Value::Record(vec![
1465                ("a".to_string(), Value::Long(27)),
1466                ("b".to_string(), Value::String("foo".to_string())),
1467                ("c".to_string(), Value::Enum(1, "spades".to_string())),
1468            ])
1469        );
1470        assert!(reader.next().is_none());
1471
1472        Ok(())
1473    }
1474
1475    #[test]
1476    fn test_avro_3772_enum_default_less_symbols() -> TestResult {
1477        let writer_raw_schema = r#"
1478        {
1479          "type": "record",
1480          "name": "test",
1481          "fields": [
1482            {"name": "a", "type": "long", "default": 42},
1483            {"name": "b", "type": "string"},
1484            {
1485              "name": "c",
1486              "type": {
1487                "type": "enum",
1488                "name": "suit",
1489                "symbols": ["diamonds", "spades", "clubs", "hearts"],
1490                "default": "spades"
1491              }
1492            }
1493          ]
1494        }
1495        "#;
1496
1497        let reader_raw_schema = r#"
1498        {
1499          "type": "record",
1500          "name": "test",
1501          "fields": [
1502            {"name": "a", "type": "long", "default": 42},
1503            {"name": "b", "type": "string"},
1504            {
1505              "name": "c",
1506              "type": {
1507                 "type": "enum",
1508                  "name": "suit",
1509                  "symbols": ["hearts", "spades"],
1510                  "default": "spades"
1511              }
1512            }
1513          ]
1514        }
1515      "#;
1516        let writer_schema = Schema::parse_str(writer_raw_schema)?;
1517        let reader_schema = Schema::parse_str(reader_raw_schema)?;
1518        let mut writer = Writer::with_codec(&writer_schema, Vec::new(), Codec::Null);
1519        let mut record = Record::new(writer.schema()).unwrap();
1520        record.put("a", 27i64);
1521        record.put("b", "foo");
1522        record.put("c", "hearts");
1523        writer.append(record).unwrap();
1524        let input = writer.into_inner()?;
1525        let mut reader = Reader::with_schema(&reader_schema, &input[..])?;
1526        assert_eq!(
1527            reader.next().unwrap().unwrap(),
1528            Value::Record(vec![
1529                ("a".to_string(), Value::Long(27)),
1530                ("b".to_string(), Value::String("foo".to_string())),
1531                ("c".to_string(), Value::Enum(0, "hearts".to_string())),
1532            ])
1533        );
1534        assert!(reader.next().is_none());
1535
1536        Ok(())
1537    }
1538
1539    #[test]
1540    fn avro_3894_take_aliases_into_account_when_serializing_for_schema_compatibility() -> TestResult
1541    {
1542        let schema_v1 = Schema::parse_str(
1543            r#"
1544        {
1545            "type": "record",
1546            "name": "Conference",
1547            "namespace": "advdaba",
1548            "fields": [
1549                {"type": "string", "name": "name"},
1550                {"type": "long", "name": "date"}
1551            ]
1552        }"#,
1553        )?;
1554
1555        let schema_v2 = Schema::parse_str(
1556            r#"
1557        {
1558            "type": "record",
1559            "name": "Conference",
1560            "namespace": "advdaba",
1561            "fields": [
1562                {"type": "string", "name": "name"},
1563                {"type": "long", "name": "date", "aliases" : [ "time" ]}
1564            ]
1565        }"#,
1566        )?;
1567
1568        assert!(SchemaCompatibility::mutual_read(&schema_v1, &schema_v2).is_ok());
1569
1570        Ok(())
1571    }
1572
1573    #[test]
1574    fn avro_3917_take_aliases_into_account_for_schema_compatibility() -> TestResult {
1575        let schema_v1 = Schema::parse_str(
1576            r#"
1577        {
1578            "type": "record",
1579            "name": "Conference",
1580            "namespace": "advdaba",
1581            "fields": [
1582                {"type": "string", "name": "name"},
1583                {"type": "long", "name": "date", "aliases" : [ "time" ]}
1584            ]
1585        }"#,
1586        )?;
1587
1588        let schema_v2 = Schema::parse_str(
1589            r#"
1590        {
1591            "type": "record",
1592            "name": "Conference",
1593            "namespace": "advdaba",
1594            "fields": [
1595                {"type": "string", "name": "name"},
1596                {"type": "long", "name": "time"}
1597            ]
1598        }"#,
1599        )?;
1600
1601        assert!(SchemaCompatibility::can_read(&schema_v2, &schema_v1).is_ok());
1602        assert_eq!(
1603            CompatibilityError::MissingDefaultValue(String::from("time")),
1604            SchemaCompatibility::can_read(&schema_v1, &schema_v2).unwrap_err()
1605        );
1606
1607        Ok(())
1608    }
1609
1610    #[test]
1611    fn test_avro_3898_record_schemas_match_by_unqualified_name() -> TestResult {
1612        let schemas = [
1613            // Record schemas
1614            (
1615                Schema::parse_str(
1616                    r#"{
1617              "type": "record",
1618              "name": "Statistics",
1619              "fields": [
1620                { "name": "success", "type": "int" },
1621                { "name": "fail", "type": "int" },
1622                { "name": "time", "type": "string" },
1623                { "name": "max", "type": "int", "default": 0 }
1624              ]
1625            }"#,
1626                )?,
1627                Schema::parse_str(
1628                    r#"{
1629              "type": "record",
1630              "name": "Statistics",
1631              "namespace": "my.namespace",
1632              "fields": [
1633                { "name": "success", "type": "int" },
1634                { "name": "fail", "type": "int" },
1635                { "name": "time", "type": "string" },
1636                { "name": "average", "type": "int", "default": 0}
1637              ]
1638            }"#,
1639                )?,
1640            ),
1641            // Enum schemas
1642            (
1643                Schema::parse_str(
1644                    r#"{
1645                    "type": "enum",
1646                    "name": "Suit",
1647                    "symbols": ["diamonds", "spades", "clubs"]
1648                }"#,
1649                )?,
1650                Schema::parse_str(
1651                    r#"{
1652                    "type": "enum",
1653                    "name": "Suit",
1654                    "namespace": "my.namespace",
1655                    "symbols": ["diamonds", "spades", "clubs", "hearts"]
1656                }"#,
1657                )?,
1658            ),
1659            // Fixed schemas
1660            (
1661                Schema::parse_str(
1662                    r#"{
1663                    "type": "fixed",
1664                    "name": "EmployeeId",
1665                    "size": 16
1666                }"#,
1667                )?,
1668                Schema::parse_str(
1669                    r#"{
1670                    "type": "fixed",
1671                    "name": "EmployeeId",
1672                    "namespace": "my.namespace",
1673                    "size": 16
1674                }"#,
1675                )?,
1676            ),
1677        ];
1678
1679        for (schema_1, schema_2) in schemas {
1680            assert!(SchemaCompatibility::can_read(&schema_1, &schema_2).is_ok());
1681        }
1682
1683        Ok(())
1684    }
1685
1686    #[test]
1687    fn test_can_read_compatibility_errors() -> TestResult {
1688        let schemas = [
1689            (
1690                Schema::parse_str(
1691                    r#"{
1692                    "type": "record",
1693                    "name": "StatisticsMap",
1694                    "fields": [
1695                        {"name": "average", "type": "int", "default": 0},
1696                        {"name": "success", "type": {"type": "map", "values": "int"}}
1697                    ]
1698                }"#,
1699                )?,
1700                Schema::parse_str(
1701                    r#"{
1702                    "type": "record",
1703                    "name": "StatisticsMap",
1704                    "fields": [
1705                        {"name": "average", "type": "int", "default": 0},
1706                        {"name": "success", "type": ["null", {"type": "map", "values": "int"}], "default": null}
1707                    ]
1708                }"#,
1709                )?,
1710                "Incompatible schemata! Field 'success' in reader schema does not match the type in the writer schema",
1711            ),
1712            (
1713                Schema::parse_str(
1714                    r#"{
1715                        "type": "record",
1716                        "name": "StatisticsArray",
1717                        "fields": [
1718                            {"name": "max_values", "type": {"type": "array", "items": "int"}}
1719                        ]
1720                    }"#,
1721                )?,
1722                Schema::parse_str(
1723                    r#"{
1724                        "type": "record",
1725                        "name": "StatisticsArray",
1726                        "fields": [
1727                            {"name": "max_values", "type": ["null", {"type": "array", "items": "int"}], "default": null}
1728                        ]
1729                    }"#,
1730                )?,
1731                "Incompatible schemata! Field 'max_values' in reader schema does not match the type in the writer schema",
1732            ),
1733        ];
1734
1735        for (schema_1, schema_2, error) in schemas {
1736            assert!(SchemaCompatibility::can_read(&schema_1, &schema_2).is_ok());
1737            assert_eq!(
1738                error,
1739                SchemaCompatibility::can_read(&schema_2, &schema_1)
1740                    .unwrap_err()
1741                    .to_string()
1742            );
1743        }
1744
1745        Ok(())
1746    }
1747
1748    #[test]
1749    fn avro_3974_can_read_schema_references() -> TestResult {
1750        let schema_strs = vec![
1751            r#"{
1752          "type": "record",
1753          "name": "Child",
1754          "namespace": "avro",
1755          "fields": [
1756            {
1757              "name": "val",
1758              "type": "int"
1759            }
1760          ]
1761        }
1762        "#,
1763            r#"{
1764          "type": "record",
1765          "name": "Parent",
1766          "namespace": "avro",
1767          "fields": [
1768            {
1769              "name": "child",
1770              "type": "avro.Child"
1771            }
1772          ]
1773        }
1774        "#,
1775        ];
1776
1777        let schemas = Schema::parse_list(schema_strs).unwrap();
1778        SchemaCompatibility::can_read(&schemas[1], &schemas[1])?;
1779
1780        Ok(())
1781    }
1782}