apache_avro/
schema_compatibility.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Logic for checking schema compatibility
19use crate::schema::UuidSchema;
20use crate::{
21    error::CompatibilityError,
22    schema::{EnumSchema, FixedSchema, RecordSchema, Schema, SchemaKind},
23};
24use std::{
25    collections::{HashSet, hash_map::DefaultHasher},
26    hash::Hasher,
27    ptr,
28};
29
30fn match_ref_schemas(
31    writers_schema: &Schema,
32    readers_schema: &Schema,
33) -> Result<(), CompatibilityError> {
34    match (readers_schema, writers_schema) {
35        (Schema::Ref { name: r_name }, Schema::Ref { name: w_name }) => {
36            if r_name == w_name {
37                Ok(())
38            } else {
39                Err(CompatibilityError::NameMismatch {
40                    writer_name: w_name.fullname(None),
41                    reader_name: r_name.fullname(None),
42                })
43            }
44        }
45        _ => Err(CompatibilityError::WrongType {
46            writer_schema_type: format!("{writers_schema:#?}"),
47            reader_schema_type: format!("{readers_schema:#?}"),
48        }),
49    }
50}
51
52pub struct SchemaCompatibility;
53
54struct Checker {
55    recursion: HashSet<(u64, u64)>,
56}
57
58impl Checker {
59    /// Create a new checker, with recursion set to an empty set.
60    pub(crate) fn new() -> Self {
61        Self {
62            recursion: HashSet::new(),
63        }
64    }
65
66    pub(crate) fn can_read(
67        &mut self,
68        writers_schema: &Schema,
69        readers_schema: &Schema,
70    ) -> Result<(), CompatibilityError> {
71        self.full_match_schemas(writers_schema, readers_schema)
72    }
73
74    pub(crate) fn full_match_schemas(
75        &mut self,
76        writers_schema: &Schema,
77        readers_schema: &Schema,
78    ) -> Result<(), CompatibilityError> {
79        if self.recursion_in_progress(writers_schema, readers_schema) {
80            return Ok(());
81        }
82
83        SchemaCompatibility::match_schemas(writers_schema, readers_schema)?;
84
85        let w_type = SchemaKind::from(writers_schema);
86        let r_type = SchemaKind::from(readers_schema);
87
88        if w_type != SchemaKind::Union
89            && (r_type.is_primitive()
90                || r_type == SchemaKind::Fixed
91                || r_type == SchemaKind::Uuid
92                || r_type == SchemaKind::Date
93                || r_type == SchemaKind::TimeMillis
94                || r_type == SchemaKind::TimeMicros
95                || r_type == SchemaKind::TimestampMillis
96                || r_type == SchemaKind::TimestampMicros
97                || r_type == SchemaKind::TimestampNanos
98                || r_type == SchemaKind::LocalTimestampMillis
99                || r_type == SchemaKind::LocalTimestampMicros
100                || r_type == SchemaKind::LocalTimestampNanos)
101        {
102            return Ok(());
103        }
104
105        match r_type {
106            SchemaKind::Ref => match_ref_schemas(writers_schema, readers_schema),
107            SchemaKind::Record => self.match_record_schemas(writers_schema, readers_schema),
108            SchemaKind::Map => {
109                if let Schema::Map(w_m) = writers_schema {
110                    match readers_schema {
111                        Schema::Map(r_m) => self.full_match_schemas(&w_m.types, &r_m.types),
112                        _ => Err(CompatibilityError::WrongType {
113                            writer_schema_type: format!("{writers_schema:#?}"),
114                            reader_schema_type: format!("{readers_schema:#?}"),
115                        }),
116                    }
117                } else {
118                    Err(CompatibilityError::TypeExpected {
119                        schema_type: String::from("writers_schema"),
120                        expected_type: vec![SchemaKind::Record],
121                    })
122                }
123            }
124            SchemaKind::Array => {
125                if let Schema::Array(w_a) = writers_schema {
126                    match readers_schema {
127                        Schema::Array(r_a) => self.full_match_schemas(&w_a.items, &r_a.items),
128                        _ => Err(CompatibilityError::WrongType {
129                            writer_schema_type: format!("{writers_schema:#?}"),
130                            reader_schema_type: format!("{readers_schema:#?}"),
131                        }),
132                    }
133                } else {
134                    Err(CompatibilityError::TypeExpected {
135                        schema_type: String::from("writers_schema"),
136                        expected_type: vec![SchemaKind::Array],
137                    })
138                }
139            }
140            SchemaKind::Union => self.match_union_schemas(writers_schema, readers_schema),
141            SchemaKind::Enum => {
142                // reader's symbols must contain all writer's symbols
143                if let Schema::Enum(EnumSchema {
144                    symbols: w_symbols, ..
145                }) = writers_schema
146                {
147                    if let Schema::Enum(EnumSchema {
148                        symbols: r_symbols, ..
149                    }) = readers_schema
150                    {
151                        if w_symbols.iter().all(|e| r_symbols.contains(e)) {
152                            return Ok(());
153                        }
154                    }
155                }
156                Err(CompatibilityError::MissingSymbols)
157            }
158            _ => {
159                if w_type == SchemaKind::Union {
160                    if let Schema::Union(r) = writers_schema {
161                        if r.schemas.len() == 1 {
162                            return self.full_match_schemas(&r.schemas[0], readers_schema);
163                        }
164                    }
165                }
166                Err(CompatibilityError::Inconclusive(String::from(
167                    "writers_schema",
168                )))
169            }
170        }
171    }
172
173    fn match_record_schemas(
174        &mut self,
175        writers_schema: &Schema,
176        readers_schema: &Schema,
177    ) -> Result<(), CompatibilityError> {
178        let w_type = SchemaKind::from(writers_schema);
179
180        if w_type == SchemaKind::Union {
181            return Err(CompatibilityError::TypeExpected {
182                schema_type: String::from("writers_schema"),
183                expected_type: vec![SchemaKind::Record],
184            });
185        }
186
187        if let Schema::Record(RecordSchema {
188            fields: w_fields,
189            lookup: w_lookup,
190            ..
191        }) = writers_schema
192        {
193            if let Schema::Record(RecordSchema {
194                fields: r_fields, ..
195            }) = readers_schema
196            {
197                for field in r_fields.iter() {
198                    // get all field names in a vector (field.name + aliases)
199                    let mut fields_names = vec![&field.name];
200                    if let Some(ref aliases) = field.aliases {
201                        for alias in aliases {
202                            fields_names.push(alias);
203                        }
204                    }
205
206                    // Check whether any of the possible fields names are in the writer schema.
207                    // If the field was found, then it must have the exact same name with the writer field,
208                    // otherwise we would have a false positive with the writers aliases
209                    let position = fields_names.iter().find_map(|field_name| {
210                        if let Some(pos) = w_lookup.get(*field_name) {
211                            if &w_fields[*pos].name == *field_name {
212                                return Some(pos);
213                            }
214                        }
215                        None
216                    });
217
218                    match position {
219                        Some(pos) => {
220                            if let Err(err) =
221                                self.full_match_schemas(&w_fields[*pos].schema, &field.schema)
222                            {
223                                return Err(CompatibilityError::FieldTypeMismatch(
224                                    field.name.clone(),
225                                    Box::new(err),
226                                ));
227                            }
228                        }
229                        _ => {
230                            if field.default.is_none() {
231                                return Err(CompatibilityError::MissingDefaultValue(
232                                    field.name.clone(),
233                                ));
234                            }
235                        }
236                    }
237                }
238            }
239        }
240        Ok(())
241    }
242
243    fn match_union_schemas(
244        &mut self,
245        writers_schema: &Schema,
246        readers_schema: &Schema,
247    ) -> Result<(), CompatibilityError> {
248        if let Schema::Union(u) = writers_schema {
249            if u.schemas
250                .iter()
251                .all(|schema| self.full_match_schemas(schema, readers_schema).is_ok())
252            {
253                return Ok(());
254            } else {
255                return Err(CompatibilityError::MissingUnionElements);
256            }
257        } else if let Schema::Union(u) = readers_schema {
258            // This check is needed because the writer_schema can be not union
259            // but the type can be contain in the union of the reader schema
260            // e.g. writer_schema is string and reader_schema is [string, int]
261            if u.schemas
262                .iter()
263                .any(|schema| self.full_match_schemas(writers_schema, schema).is_ok())
264            {
265                return Ok(());
266            }
267        }
268        Err(CompatibilityError::MissingUnionElements)
269    }
270
271    fn recursion_in_progress(&mut self, writers_schema: &Schema, readers_schema: &Schema) -> bool {
272        let mut hasher = DefaultHasher::new();
273        ptr::hash(writers_schema, &mut hasher);
274        let w_hash = hasher.finish();
275
276        hasher = DefaultHasher::new();
277        ptr::hash(readers_schema, &mut hasher);
278        let r_hash = hasher.finish();
279
280        let key = (w_hash, r_hash);
281        // This is a shortcut to add if not exists *and* return false. It will return true
282        // if it was able to insert.
283        !self.recursion.insert(key)
284    }
285}
286
287impl SchemaCompatibility {
288    /// `can_read` performs a full, recursive check that a datum written using the
289    /// writers_schema can be read using the readers_schema.
290    pub fn can_read(
291        writers_schema: &Schema,
292        readers_schema: &Schema,
293    ) -> Result<(), CompatibilityError> {
294        let mut c = Checker::new();
295        c.can_read(writers_schema, readers_schema)
296    }
297
298    /// `mutual_read` performs a full, recursive check that a datum written using either
299    /// the writers_schema or the readers_schema can be read using the other schema.
300    pub fn mutual_read(
301        writers_schema: &Schema,
302        readers_schema: &Schema,
303    ) -> Result<(), CompatibilityError> {
304        SchemaCompatibility::can_read(writers_schema, readers_schema)?;
305        SchemaCompatibility::can_read(readers_schema, writers_schema)
306    }
307
308    ///  `match_schemas` performs a basic check that a datum written with the
309    ///  writers_schema could be read using the readers_schema. This check only includes
310    ///  matching the types, including schema promotion, and matching the full name for
311    ///  named types. Aliases for named types are not supported here, and the rust
312    ///  implementation of Avro in general does not include support for aliases (I think).
313    pub(crate) fn match_schemas(
314        writers_schema: &Schema,
315        readers_schema: &Schema,
316    ) -> Result<(), CompatibilityError> {
317        fn check_reader_type_multi(
318            reader_type: SchemaKind,
319            allowed_reader_types: Vec<SchemaKind>,
320            writer_type: SchemaKind,
321        ) -> Result<(), CompatibilityError> {
322            if allowed_reader_types.contains(&reader_type) {
323                Ok(())
324            } else {
325                let mut allowed_types: Vec<SchemaKind> = vec![writer_type];
326                allowed_types.extend_from_slice(allowed_reader_types.as_slice());
327                Err(CompatibilityError::TypeExpected {
328                    schema_type: String::from("readers_schema"),
329                    expected_type: allowed_types,
330                })
331            }
332        }
333
334        fn check_reader_type(
335            reader_type: SchemaKind,
336            allowed_reader_type: SchemaKind,
337            writer_type: SchemaKind,
338        ) -> Result<(), CompatibilityError> {
339            if reader_type == allowed_reader_type {
340                Ok(())
341            } else {
342                Err(CompatibilityError::TypeExpected {
343                    schema_type: String::from("readers_schema"),
344                    expected_type: vec![writer_type, allowed_reader_type],
345                })
346            }
347        }
348
349        fn check_writer_type(
350            writers_schema: &Schema,
351            allowed_schema: &Schema,
352            expected_schema_types: Vec<SchemaKind>,
353        ) -> Result<(), CompatibilityError> {
354            if *allowed_schema == *writers_schema {
355                Ok(())
356            } else {
357                Err(CompatibilityError::TypeExpected {
358                    schema_type: String::from("writers_schema"),
359                    expected_type: expected_schema_types,
360                })
361            }
362        }
363
364        let w_type = SchemaKind::from(writers_schema);
365        let r_type = SchemaKind::from(readers_schema);
366
367        if w_type == SchemaKind::Union || r_type == SchemaKind::Union {
368            return Ok(());
369        }
370
371        if w_type == r_type {
372            if r_type.is_primitive() {
373                return Ok(());
374            }
375
376            match r_type {
377                SchemaKind::Record | SchemaKind::Enum => {
378                    let msg = format!("A {readers_schema} type must always has a name");
379                    let writers_name = writers_schema.name().expect(&msg);
380                    let readers_name = readers_schema.name().expect(&msg);
381
382                    if writers_name.name == readers_name.name {
383                        return Ok(());
384                    }
385
386                    return Err(CompatibilityError::NameMismatch {
387                        writer_name: writers_name.name.clone(),
388                        reader_name: readers_name.name.clone(),
389                    });
390                }
391                SchemaKind::Fixed => {
392                    if let Schema::Fixed(FixedSchema {
393                        name: w_name,
394                        aliases: _,
395                        doc: _w_doc,
396                        size: w_size,
397                        default: _w_default,
398                        attributes: _,
399                    }) = writers_schema
400                    {
401                        if let Schema::Fixed(FixedSchema {
402                            name: r_name,
403                            aliases: _,
404                            doc: _r_doc,
405                            size: r_size,
406                            default: _r_default,
407                            attributes: _,
408                        }) = readers_schema
409                        {
410                            return (w_name.name == r_name.name && w_size == r_size)
411                                .then_some(())
412                                .ok_or(CompatibilityError::FixedMismatch);
413                        }
414                    }
415                }
416                SchemaKind::Map => {
417                    if let Schema::Map(w_m) = writers_schema {
418                        if let Schema::Map(r_m) = readers_schema {
419                            return SchemaCompatibility::match_schemas(&w_m.types, &r_m.types);
420                        }
421                    }
422                }
423                SchemaKind::Array => {
424                    if let Schema::Array(w_a) = writers_schema {
425                        if let Schema::Array(r_a) = readers_schema {
426                            return SchemaCompatibility::match_schemas(&w_a.items, &r_a.items);
427                        }
428                    }
429                }
430                SchemaKind::Uuid => match readers_schema {
431                    Schema::Uuid(UuidSchema::Bytes) => {
432                        return check_writer_type(
433                            writers_schema,
434                            readers_schema,
435                            vec![r_type, SchemaKind::Bytes],
436                        );
437                    }
438                    Schema::Uuid(UuidSchema::String) => {
439                        return check_writer_type(
440                            writers_schema,
441                            readers_schema,
442                            vec![r_type, SchemaKind::String],
443                        );
444                    }
445                    Schema::Uuid(UuidSchema::Fixed(FixedSchema {
446                        name: r_name,
447                        size: r_size,
448                        ..
449                    })) => match writers_schema {
450                        Schema::Uuid(UuidSchema::Fixed(FixedSchema {
451                            name: w_name,
452                            size: w_size,
453                            ..
454                        }))
455                        | Schema::Fixed(FixedSchema {
456                            name: w_name,
457                            size: w_size,
458                            ..
459                        }) => {
460                            return (w_name.name == r_name.name && w_size == r_size)
461                                .then_some(())
462                                .ok_or(CompatibilityError::FixedMismatch);
463                        }
464                        _ => {
465                            return Err(CompatibilityError::TypeExpected {
466                                schema_type: String::from("writers_schema"),
467                                expected_type: vec![SchemaKind::Uuid, SchemaKind::Fixed],
468                            });
469                        }
470                    },
471                    Schema::Null
472                    | Schema::Boolean
473                    | Schema::Int
474                    | Schema::Long
475                    | Schema::Float
476                    | Schema::Double
477                    | Schema::Bytes
478                    | Schema::String
479                    | Schema::Array(_)
480                    | Schema::Map(_)
481                    | Schema::Union(_)
482                    | Schema::Record(_)
483                    | Schema::Enum(_)
484                    | Schema::Fixed(_)
485                    | Schema::Decimal(_)
486                    | Schema::BigDecimal
487                    | Schema::Date
488                    | Schema::TimeMillis
489                    | Schema::TimeMicros
490                    | Schema::TimestampMillis
491                    | Schema::TimestampMicros
492                    | Schema::TimestampNanos
493                    | Schema::LocalTimestampMillis
494                    | Schema::LocalTimestampMicros
495                    | Schema::LocalTimestampNanos
496                    | Schema::Duration
497                    | Schema::Ref { .. } => {
498                        unreachable!("SchemaKind::Uuid can only be a Schema::Uuid")
499                    }
500                },
501                SchemaKind::Date | SchemaKind::TimeMillis => {
502                    return check_writer_type(
503                        writers_schema,
504                        readers_schema,
505                        vec![r_type, SchemaKind::Int],
506                    );
507                }
508                SchemaKind::TimeMicros
509                | SchemaKind::TimestampNanos
510                | SchemaKind::TimestampMillis
511                | SchemaKind::TimestampMicros
512                | SchemaKind::LocalTimestampMillis
513                | SchemaKind::LocalTimestampMicros
514                | SchemaKind::LocalTimestampNanos => {
515                    return check_writer_type(
516                        writers_schema,
517                        readers_schema,
518                        vec![r_type, SchemaKind::Long],
519                    );
520                }
521                SchemaKind::Duration => {
522                    return Ok(());
523                }
524                SchemaKind::Ref => return match_ref_schemas(writers_schema, readers_schema),
525                _ => {
526                    return Err(CompatibilityError::Inconclusive(String::from(
527                        "readers_schema",
528                    )));
529                }
530            };
531        }
532
533        // Here are the checks for primitive types
534        match w_type {
535            SchemaKind::Int => check_reader_type_multi(
536                r_type,
537                vec![
538                    SchemaKind::Long,
539                    SchemaKind::Float,
540                    SchemaKind::Double,
541                    SchemaKind::Date,
542                    SchemaKind::TimeMillis,
543                ],
544                w_type,
545            ),
546            SchemaKind::Long => check_reader_type_multi(
547                r_type,
548                vec![
549                    SchemaKind::Float,
550                    SchemaKind::Double,
551                    SchemaKind::TimeMicros,
552                    SchemaKind::TimestampMillis,
553                    SchemaKind::TimestampMicros,
554                    SchemaKind::TimestampNanos,
555                    SchemaKind::LocalTimestampMillis,
556                    SchemaKind::LocalTimestampMicros,
557                    SchemaKind::LocalTimestampNanos,
558                ],
559                w_type,
560            ),
561            SchemaKind::Float => {
562                check_reader_type_multi(r_type, vec![SchemaKind::Float, SchemaKind::Double], w_type)
563            }
564            SchemaKind::String => {
565                check_reader_type_multi(r_type, vec![SchemaKind::Bytes, SchemaKind::Uuid], w_type)
566            }
567            SchemaKind::Bytes => {
568                check_reader_type_multi(r_type, vec![SchemaKind::String, SchemaKind::Uuid], w_type)
569            }
570            SchemaKind::Uuid => check_reader_type_multi(
571                r_type,
572                vec![SchemaKind::String, SchemaKind::Bytes, SchemaKind::Fixed],
573                w_type,
574            ),
575            SchemaKind::Fixed => check_reader_type_multi(
576                r_type,
577                vec![SchemaKind::Duration, SchemaKind::Uuid],
578                w_type,
579            ),
580            SchemaKind::Date | SchemaKind::TimeMillis => {
581                check_reader_type(r_type, SchemaKind::Int, w_type)
582            }
583            SchemaKind::TimeMicros
584            | SchemaKind::TimestampMicros
585            | SchemaKind::TimestampMillis
586            | SchemaKind::TimestampNanos
587            | SchemaKind::LocalTimestampMillis
588            | SchemaKind::LocalTimestampMicros
589            | SchemaKind::LocalTimestampNanos => {
590                check_reader_type(r_type, SchemaKind::Long, w_type)
591            }
592            _ => Err(CompatibilityError::Inconclusive(String::from(
593                "writers_schema",
594            ))),
595        }
596    }
597}
598
599#[cfg(test)]
600mod tests {
601    use super::*;
602    use crate::schema::{Name, UuidSchema};
603    use crate::{
604        Codec, Reader, Writer,
605        types::{Record, Value},
606    };
607    use apache_avro_test_helper::TestResult;
608    use rstest::*;
609
610    fn int_array_schema() -> Schema {
611        Schema::parse_str(r#"{"type":"array", "items":"int"}"#).unwrap()
612    }
613
614    fn long_array_schema() -> Schema {
615        Schema::parse_str(r#"{"type":"array", "items":"long"}"#).unwrap()
616    }
617
618    fn string_array_schema() -> Schema {
619        Schema::parse_str(r#"{"type":"array", "items":"string"}"#).unwrap()
620    }
621
622    fn int_map_schema() -> Schema {
623        Schema::parse_str(r#"{"type":"map", "values":"int"}"#).unwrap()
624    }
625
626    fn long_map_schema() -> Schema {
627        Schema::parse_str(r#"{"type":"map", "values":"long"}"#).unwrap()
628    }
629
630    fn string_map_schema() -> Schema {
631        Schema::parse_str(r#"{"type":"map", "values":"string"}"#).unwrap()
632    }
633
634    fn enum1_ab_schema() -> Schema {
635        Schema::parse_str(r#"{"type":"enum", "name":"Enum1", "symbols":["A","B"]}"#).unwrap()
636    }
637
638    fn enum1_abc_schema() -> Schema {
639        Schema::parse_str(r#"{"type":"enum", "name":"Enum1", "symbols":["A","B","C"]}"#).unwrap()
640    }
641
642    fn enum1_bc_schema() -> Schema {
643        Schema::parse_str(r#"{"type":"enum", "name":"Enum1", "symbols":["B","C"]}"#).unwrap()
644    }
645
646    fn enum2_ab_schema() -> Schema {
647        Schema::parse_str(r#"{"type":"enum", "name":"Enum2", "symbols":["A","B"]}"#).unwrap()
648    }
649
650    fn empty_record1_schema() -> Schema {
651        Schema::parse_str(r#"{"type":"record", "name":"Record1", "fields":[]}"#).unwrap()
652    }
653
654    fn empty_record2_schema() -> Schema {
655        Schema::parse_str(r#"{"type":"record", "name":"Record2", "fields": []}"#).unwrap()
656    }
657
658    fn a_int_record1_schema() -> Schema {
659        Schema::parse_str(
660            r#"{"type":"record", "name":"Record1", "fields":[{"name":"a", "type":"int"}]}"#,
661        )
662        .unwrap()
663    }
664
665    fn a_long_record1_schema() -> Schema {
666        Schema::parse_str(
667            r#"{"type":"record", "name":"Record1", "fields":[{"name":"a", "type":"long"}]}"#,
668        )
669        .unwrap()
670    }
671
672    fn a_int_b_int_record1_schema() -> Schema {
673        Schema::parse_str(r#"{"type":"record", "name":"Record1", "fields":[{"name":"a", "type":"int"}, {"name":"b", "type":"int"}]}"#).unwrap()
674    }
675
676    fn a_dint_record1_schema() -> Schema {
677        Schema::parse_str(r#"{"type":"record", "name":"Record1", "fields":[{"name":"a", "type":"int", "default":0}]}"#).unwrap()
678    }
679
680    fn a_int_b_dint_record1_schema() -> Schema {
681        Schema::parse_str(r#"{"type":"record", "name":"Record1", "fields":[{"name":"a", "type":"int"}, {"name":"b", "type":"int", "default":0}]}"#).unwrap()
682    }
683
684    fn a_dint_b_dint_record1_schema() -> Schema {
685        Schema::parse_str(r#"{"type":"record", "name":"Record1", "fields":[{"name":"a", "type":"int", "default":0}, {"name":"b", "type":"int", "default":0}]}"#).unwrap()
686    }
687
688    fn nested_record() -> Schema {
689        Schema::parse_str(r#"{"type":"record","name":"parent","fields":[{"name":"attribute","type":{"type":"record","name":"child","fields":[{"name":"id","type":"string"}]}}]}"#).unwrap()
690    }
691
692    fn nested_optional_record() -> Schema {
693        Schema::parse_str(r#"{"type":"record","name":"parent","fields":[{"name":"attribute","type":["null",{"type":"record","name":"child","fields":[{"name":"id","type":"string"}]}],"default":null}]}"#).unwrap()
694    }
695
696    fn int_list_record_schema() -> Schema {
697        Schema::parse_str(r#"{"type":"record", "name":"List", "fields": [{"name": "head", "type": "int"},{"name": "tail", "type": "array", "items": "int"}]}"#).unwrap()
698    }
699
700    fn long_list_record_schema() -> Schema {
701        Schema::parse_str(
702            r#"
703      {
704        "type":"record", "name":"List", "fields": [
705          {"name": "head", "type": "long"},
706          {"name": "tail", "type": "array", "items": "long"}
707      ]}
708"#,
709        )
710        .unwrap()
711    }
712
713    fn union_schema(schemas: Vec<Schema>) -> Schema {
714        let schema_string = schemas
715            .iter()
716            .map(|s| s.canonical_form())
717            .collect::<Vec<String>>()
718            .join(",");
719        Schema::parse_str(&format!("[{schema_string}]")).unwrap()
720    }
721
722    fn empty_union_schema() -> Schema {
723        union_schema(vec![])
724    }
725
726    // unused
727    // fn null_union_schema() -> Schema { union_schema(vec![Schema::Null]) }
728
729    fn int_union_schema() -> Schema {
730        union_schema(vec![Schema::Int])
731    }
732
733    fn long_union_schema() -> Schema {
734        union_schema(vec![Schema::Long])
735    }
736
737    fn string_union_schema() -> Schema {
738        union_schema(vec![Schema::String])
739    }
740
741    fn int_string_union_schema() -> Schema {
742        union_schema(vec![Schema::Int, Schema::String])
743    }
744
745    fn string_int_union_schema() -> Schema {
746        union_schema(vec![Schema::String, Schema::Int])
747    }
748
749    #[test]
750    fn test_broken() {
751        assert_eq!(
752            CompatibilityError::MissingUnionElements,
753            SchemaCompatibility::can_read(&int_string_union_schema(), &int_union_schema())
754                .unwrap_err()
755        )
756    }
757
758    #[test]
759    fn test_incompatible_reader_writer_pairs() {
760        let incompatible_schemas = vec![
761            // null
762            (Schema::Null, Schema::Int),
763            (Schema::Null, Schema::Long),
764            // boolean
765            (Schema::Boolean, Schema::Int),
766            // int
767            (Schema::Int, Schema::Null),
768            (Schema::Int, Schema::Boolean),
769            (Schema::Int, Schema::Long),
770            (Schema::Int, Schema::Float),
771            (Schema::Int, Schema::Double),
772            // long
773            (Schema::Long, Schema::Float),
774            (Schema::Long, Schema::Double),
775            // float
776            (Schema::Float, Schema::Double),
777            // string
778            (Schema::String, Schema::Boolean),
779            (Schema::String, Schema::Int),
780            // bytes
781            (Schema::Bytes, Schema::Null),
782            (Schema::Bytes, Schema::Int),
783            // logical types
784            (Schema::TimeMicros, Schema::Int),
785            (Schema::TimestampMillis, Schema::Int),
786            (Schema::TimestampMicros, Schema::Int),
787            (Schema::TimestampNanos, Schema::Int),
788            (Schema::LocalTimestampMillis, Schema::Int),
789            (Schema::LocalTimestampMicros, Schema::Int),
790            (Schema::LocalTimestampNanos, Schema::Int),
791            (Schema::Date, Schema::Long),
792            (Schema::TimeMillis, Schema::Long),
793            // array and maps
794            (int_array_schema(), long_array_schema()),
795            (int_map_schema(), int_array_schema()),
796            (int_array_schema(), int_map_schema()),
797            (int_map_schema(), long_map_schema()),
798            // enum
799            (enum1_ab_schema(), enum1_abc_schema()),
800            (enum1_bc_schema(), enum1_abc_schema()),
801            (enum1_ab_schema(), enum2_ab_schema()),
802            (Schema::Int, enum2_ab_schema()),
803            (enum2_ab_schema(), Schema::Int),
804            //union
805            (int_union_schema(), int_string_union_schema()),
806            (string_union_schema(), int_string_union_schema()),
807            //record
808            (empty_record2_schema(), empty_record1_schema()),
809            (a_int_record1_schema(), empty_record1_schema()),
810            (a_int_b_dint_record1_schema(), empty_record1_schema()),
811            (int_list_record_schema(), long_list_record_schema()),
812            (nested_record(), nested_optional_record()),
813        ];
814
815        assert!(
816            incompatible_schemas
817                .iter()
818                .any(|(reader, writer)| SchemaCompatibility::can_read(writer, reader).is_err())
819        );
820    }
821
822    #[rstest]
823    // Record type test
824    #[case(
825        r#"{"type": "record", "name": "record_a", "fields": [{"type": "long", "name": "date"}]}"#,
826        r#"{"type": "record", "name": "record_a", "fields": [{"type": "long", "name": "date", "default": 18181}]}"#
827    )]
828    // Fixed type test
829    #[case(
830        r#"{"type": "fixed", "name": "EmployeeId", "size": 16}"#,
831        r#"{"type": "fixed", "name": "EmployeeId", "size": 16, "default": "u00ffffffffffffx"}"#
832    )]
833    // Enum type test
834    #[case(
835        r#"{"type": "enum", "name":"Enum1", "symbols": ["A","B"]}"#,
836        r#"{"type": "enum", "name":"Enum1", "symbols": ["A","B", "C"], "default": "C"}"#
837    )]
838    // Map type test
839    #[case(
840        r#"{"type": "map", "values": "int"}"#,
841        r#"{"type": "map", "values": "long"}"#
842    )]
843    // Date type
844    #[case(r#"{"type": "int"}"#, r#"{"type": "int", "logicalType": "date"}"#)]
845    // time-millis type
846    #[case(
847        r#"{"type": "int"}"#,
848        r#"{"type": "int", "logicalType": "time-millis"}"#
849    )]
850    // time-millis type
851    #[case(
852        r#"{"type": "long"}"#,
853        r#"{"type": "long", "logicalType": "time-micros"}"#
854    )]
855    // timetimestamp-nanos type
856    #[case(
857        r#"{"type": "long"}"#,
858        r#"{"type": "long", "logicalType": "timestamp-nanos"}"#
859    )]
860    // timestamp-millis type
861    #[case(
862        r#"{"type": "long"}"#,
863        r#"{"type": "long", "logicalType": "timestamp-millis"}"#
864    )]
865    // timestamp-micros type
866    #[case(
867        r#"{"type": "long"}"#,
868        r#"{"type": "long", "logicalType": "timestamp-micros"}"#
869    )]
870    // local-timestamp-millis type
871    #[case(
872        r#"{"type": "long"}"#,
873        r#"{"type": "long", "logicalType": "local-timestamp-millis"}"#
874    )]
875    // local-timestamp-micros type
876    #[case(
877        r#"{"type": "long"}"#,
878        r#"{"type": "long", "logicalType": "local-timestamp-micros"}"#
879    )]
880    // local-timestamp-nanos type
881    #[case(
882        r#"{"type": "long"}"#,
883        r#"{"type": "long", "logicalType": "local-timestamp-nanos"}"#
884    )]
885    // Array type test
886    #[case(
887        r#"{"type": "array", "items": "int"}"#,
888        r#"{"type": "array", "items": "long"}"#
889    )]
890    fn test_avro_3950_match_schemas_ok(
891        #[case] writer_schema_str: &str,
892        #[case] reader_schema_str: &str,
893    ) {
894        let writer_schema = Schema::parse_str(writer_schema_str).unwrap();
895        let reader_schema = Schema::parse_str(reader_schema_str).unwrap();
896
897        assert!(SchemaCompatibility::match_schemas(&writer_schema, &reader_schema).is_ok());
898    }
899
900    #[rstest]
901    // Record type test
902    #[case(
903        r#"{"type": "record", "name":"record_a", "fields": [{"type": "long", "name": "date"}]}"#,
904        r#"{"type": "record", "name":"record_b", "fields": [{"type": "long", "name": "date"}]}"#,
905        CompatibilityError::NameMismatch{writer_name: String::from("record_a"), reader_name: String::from("record_b")}
906    )]
907    // Fixed type test
908    #[case(
909        r#"{"type": "fixed", "name": "EmployeeId", "size": 16}"#,
910        r#"{"type": "fixed", "name": "EmployeeId", "size": 20}"#,
911        CompatibilityError::FixedMismatch
912    )]
913    // Enum type test
914    #[case(
915        r#"{"type": "enum", "name": "Enum1", "symbols": ["A","B"]}"#,
916        r#"{"type": "enum", "name": "Enum2", "symbols": ["A","B"]}"#,
917        CompatibilityError::NameMismatch{writer_name: String::from("Enum1"), reader_name: String::from("Enum2")}
918    )]
919    // Map type test
920    #[case(
921        r#"{"type":"map", "values": "long"}"#,
922        r#"{"type":"map", "values": "int"}"#,
923        CompatibilityError::TypeExpected {schema_type: String::from("readers_schema"), expected_type: vec![
924            SchemaKind::Long,
925            SchemaKind::Float,
926            SchemaKind::Double,
927            SchemaKind::TimeMicros,
928            SchemaKind::TimestampMillis,
929            SchemaKind::TimestampMicros,
930            SchemaKind::TimestampNanos,
931            SchemaKind::LocalTimestampMillis,
932            SchemaKind::LocalTimestampMicros,
933            SchemaKind::LocalTimestampNanos,
934        ]}
935    )]
936    // Array type test
937    #[case(
938        r#"{"type": "array", "items": "long"}"#,
939        r#"{"type": "array", "items": "int"}"#,
940        CompatibilityError::TypeExpected {schema_type: String::from("readers_schema"), expected_type: vec![
941            SchemaKind::Long,
942            SchemaKind::Float,
943            SchemaKind::Double,
944            SchemaKind::TimeMicros,
945            SchemaKind::TimestampMillis,
946            SchemaKind::TimestampMicros,
947            SchemaKind::TimestampNanos,
948            SchemaKind::LocalTimestampMillis,
949            SchemaKind::LocalTimestampMicros,
950            SchemaKind::LocalTimestampNanos,
951        ]}
952    )]
953    // Date type test
954    #[case(
955        r#"{"type": "string"}"#,
956        r#"{"type": "int", "logicalType": "date"}"#,
957        CompatibilityError::TypeExpected{schema_type: String::from("readers_schema"), expected_type: vec![
958            SchemaKind::String,
959            SchemaKind::Bytes,
960            SchemaKind::Uuid,
961        ]}
962    )]
963    // time-millis type
964    #[case(
965        r#"{"type": "string"}"#,
966        r#"{"type": "int", "logicalType": "time-millis"}"#,
967        CompatibilityError::TypeExpected{schema_type: String::from("readers_schema"), expected_type: vec![
968            SchemaKind::String,
969            SchemaKind::Bytes,
970            SchemaKind::Uuid,
971        ]}
972    )]
973    // time-millis type
974    #[case(
975        r#"{"type": "int"}"#,
976        r#"{"type": "long", "logicalType": "time-micros"}"#,
977        CompatibilityError::TypeExpected{schema_type: String::from("readers_schema"), expected_type: vec![
978            SchemaKind::Int,
979            SchemaKind::Long,
980            SchemaKind::Float,
981            SchemaKind::Double,
982            SchemaKind::Date,
983            SchemaKind::TimeMillis
984        ]}
985    )]
986    // timestamp-nanos type. This test should fail because it is not supported on schema parse_complex
987    // #[case(
988    //     r#"{"type": "string"}"#,
989    //     r#"{"type": "long", "logicalType": "timestamp-nanos"}"#,
990    //     CompatibilityError::TypeExpected{schema_type: String::from("readers_schema"), expected_type: vec![
991    //         SchemaKind::Int,
992    //         SchemaKind::Long,
993    //         SchemaKind::Float,
994    //         SchemaKind::Double,
995    //         SchemaKind::Date,
996    //         SchemaKind::TimeMillis
997    //     ]}
998    // )]
999    // timestamp-millis type
1000    #[case(
1001        r#"{"type": "int"}"#,
1002        r#"{"type": "long", "logicalType": "timestamp-millis"}"#,
1003        CompatibilityError::TypeExpected{schema_type: String::from("readers_schema"), expected_type: vec![
1004            SchemaKind::Int,
1005            SchemaKind::Long,
1006            SchemaKind::Float,
1007            SchemaKind::Double,
1008            SchemaKind::Date,
1009            SchemaKind::TimeMillis
1010        ]}
1011    )]
1012    // timestamp-micros type
1013    #[case(
1014        r#"{"type": "int"}"#,
1015        r#"{"type": "long", "logicalType": "timestamp-micros"}"#,
1016        CompatibilityError::TypeExpected{schema_type: String::from("readers_schema"), expected_type: vec![
1017            SchemaKind::Int,
1018            SchemaKind::Long,
1019            SchemaKind::Float,
1020            SchemaKind::Double,
1021            SchemaKind::Date,
1022            SchemaKind::TimeMillis
1023        ]}
1024    )]
1025    // local-timestamp-millis type
1026    #[case(
1027        r#"{"type": "int"}"#,
1028        r#"{"type": "long", "logicalType": "local-timestamp-millis"}"#,
1029        CompatibilityError::TypeExpected{schema_type: String::from("readers_schema"), expected_type: vec![
1030            SchemaKind::Int,
1031            SchemaKind::Long,
1032            SchemaKind::Float,
1033            SchemaKind::Double,
1034            SchemaKind::Date,
1035            SchemaKind::TimeMillis
1036        ]}
1037    )]
1038    // local-timestamp-micros type
1039    #[case(
1040        r#"{"type": "int"}"#,
1041        r#"{"type": "long", "logicalType": "local-timestamp-micros"}"#,
1042        CompatibilityError::TypeExpected{schema_type: String::from("readers_schema"), expected_type: vec![
1043            SchemaKind::Int,
1044            SchemaKind::Long,
1045            SchemaKind::Float,
1046            SchemaKind::Double,
1047            SchemaKind::Date,
1048            SchemaKind::TimeMillis
1049        ]}
1050    )]
1051    // local-timestamp-nanos type. This test should fail because it is not supported on schema parse_complex
1052    // #[case(
1053    //     r#"{"type": "int"}"#,
1054    //     r#"{"type": "long", "logicalType": "local-timestamp-nanos"}"#,
1055    //     CompatibilityError::TypeExpected{schema_type: String::from("readers_schema"), expected_type: vec![
1056    //         SchemaKind::Int,
1057    //         SchemaKind::Long,
1058    //         SchemaKind::Float,
1059    //         SchemaKind::Double,
1060    //         SchemaKind::Date,
1061    //         SchemaKind::TimeMillis
1062    //     ]}
1063    // )]
1064    // When comparing different types we always get Inconclusive
1065    #[case(
1066        r#"{"type": "record", "name":"record_b", "fields": [{"type": "long", "name": "date"}]}"#,
1067        r#"{"type": "fixed", "name": "EmployeeId", "size": 16}"#,
1068        CompatibilityError::Inconclusive(String::from("writers_schema"))
1069    )]
1070    fn test_avro_3950_match_schemas_error(
1071        #[case] writer_schema_str: &str,
1072        #[case] reader_schema_str: &str,
1073        #[case] expected_error: CompatibilityError,
1074    ) {
1075        let writer_schema = Schema::parse_str(writer_schema_str).unwrap();
1076        let reader_schema = Schema::parse_str(reader_schema_str).unwrap();
1077
1078        assert_eq!(
1079            expected_error,
1080            SchemaCompatibility::match_schemas(&writer_schema, &reader_schema).unwrap_err()
1081        )
1082    }
1083
1084    #[test]
1085    fn test_compatible_reader_writer_pairs() {
1086        let uuid_fixed = FixedSchema {
1087            name: Name {
1088                name: String::new(),
1089                namespace: None,
1090            },
1091            aliases: None,
1092            doc: None,
1093            size: 16,
1094            default: None,
1095            attributes: Default::default(),
1096        };
1097
1098        let compatible_schemas = vec![
1099            (Schema::Null, Schema::Null),
1100            (Schema::Long, Schema::Int),
1101            (Schema::Float, Schema::Int),
1102            (Schema::Float, Schema::Long),
1103            (Schema::Double, Schema::Long),
1104            (Schema::Double, Schema::Int),
1105            (Schema::Double, Schema::Float),
1106            (Schema::String, Schema::Bytes),
1107            (Schema::Bytes, Schema::String),
1108            // logical types
1109            (
1110                Schema::Uuid(UuidSchema::String),
1111                Schema::Uuid(UuidSchema::String),
1112            ),
1113            (Schema::Uuid(UuidSchema::String), Schema::String),
1114            (Schema::String, Schema::Uuid(UuidSchema::String)),
1115            (
1116                Schema::Uuid(UuidSchema::Bytes),
1117                Schema::Uuid(UuidSchema::Bytes),
1118            ),
1119            (Schema::Uuid(UuidSchema::Bytes), Schema::Bytes),
1120            (Schema::Bytes, Schema::Uuid(UuidSchema::Bytes)),
1121            (
1122                Schema::Uuid(UuidSchema::Fixed(uuid_fixed.clone())),
1123                Schema::Uuid(UuidSchema::Fixed(uuid_fixed.clone())),
1124            ),
1125            (
1126                Schema::Uuid(UuidSchema::Fixed(uuid_fixed.clone())),
1127                Schema::Fixed(uuid_fixed.clone()),
1128            ),
1129            (
1130                Schema::Fixed(uuid_fixed.clone()),
1131                Schema::Uuid(UuidSchema::Fixed(uuid_fixed.clone())),
1132            ),
1133            (Schema::Date, Schema::Int),
1134            (Schema::TimeMillis, Schema::Int),
1135            (Schema::TimeMicros, Schema::Long),
1136            (Schema::TimestampMillis, Schema::Long),
1137            (Schema::TimestampMicros, Schema::Long),
1138            (Schema::TimestampNanos, Schema::Long),
1139            (Schema::LocalTimestampMillis, Schema::Long),
1140            (Schema::LocalTimestampMicros, Schema::Long),
1141            (Schema::LocalTimestampNanos, Schema::Long),
1142            (Schema::Int, Schema::Date),
1143            (Schema::Int, Schema::TimeMillis),
1144            (Schema::Long, Schema::TimeMicros),
1145            (Schema::Long, Schema::TimestampMillis),
1146            (Schema::Long, Schema::TimestampMicros),
1147            (Schema::Long, Schema::TimestampNanos),
1148            (Schema::Long, Schema::LocalTimestampMillis),
1149            (Schema::Long, Schema::LocalTimestampMicros),
1150            (Schema::Long, Schema::LocalTimestampNanos),
1151            (int_array_schema(), int_array_schema()),
1152            (long_array_schema(), int_array_schema()),
1153            (int_map_schema(), int_map_schema()),
1154            (long_map_schema(), int_map_schema()),
1155            (enum1_ab_schema(), enum1_ab_schema()),
1156            (enum1_abc_schema(), enum1_ab_schema()),
1157            (empty_union_schema(), empty_union_schema()),
1158            (int_union_schema(), int_union_schema()),
1159            (int_string_union_schema(), string_int_union_schema()),
1160            (int_union_schema(), empty_union_schema()),
1161            (long_union_schema(), int_union_schema()),
1162            (int_union_schema(), Schema::Int),
1163            (Schema::Int, int_union_schema()),
1164            (empty_record1_schema(), empty_record1_schema()),
1165            (empty_record1_schema(), a_int_record1_schema()),
1166            (a_int_record1_schema(), a_int_record1_schema()),
1167            (a_dint_record1_schema(), a_int_record1_schema()),
1168            (a_dint_record1_schema(), a_dint_record1_schema()),
1169            (a_int_record1_schema(), a_dint_record1_schema()),
1170            (a_long_record1_schema(), a_int_record1_schema()),
1171            (a_int_record1_schema(), a_int_b_int_record1_schema()),
1172            (a_dint_record1_schema(), a_int_b_int_record1_schema()),
1173            (a_int_b_dint_record1_schema(), a_int_record1_schema()),
1174            (a_dint_b_dint_record1_schema(), empty_record1_schema()),
1175            (a_dint_b_dint_record1_schema(), a_int_record1_schema()),
1176            (a_int_b_int_record1_schema(), a_dint_b_dint_record1_schema()),
1177            (int_list_record_schema(), int_list_record_schema()),
1178            (long_list_record_schema(), long_list_record_schema()),
1179            (long_list_record_schema(), int_list_record_schema()),
1180            (nested_optional_record(), nested_record()),
1181        ];
1182
1183        for (reader, writer) in compatible_schemas {
1184            SchemaCompatibility::can_read(&writer, &reader).unwrap();
1185        }
1186    }
1187
1188    fn writer_schema() -> Schema {
1189        Schema::parse_str(
1190            r#"
1191      {"type":"record", "name":"Record", "fields":[
1192        {"name":"oldfield1", "type":"int"},
1193        {"name":"oldfield2", "type":"string"}
1194      ]}
1195"#,
1196        )
1197        .unwrap()
1198    }
1199
1200    #[test]
1201    fn test_missing_field() -> TestResult {
1202        let reader_schema = Schema::parse_str(
1203            r#"
1204      {"type":"record", "name":"Record", "fields":[
1205        {"name":"oldfield1", "type":"int"}
1206      ]}
1207"#,
1208        )?;
1209        assert!(SchemaCompatibility::can_read(&writer_schema(), &reader_schema,).is_ok());
1210        assert_eq!(
1211            CompatibilityError::MissingDefaultValue(String::from("oldfield2")),
1212            SchemaCompatibility::can_read(&reader_schema, &writer_schema()).unwrap_err()
1213        );
1214
1215        Ok(())
1216    }
1217
1218    #[test]
1219    fn test_missing_second_field() -> TestResult {
1220        let reader_schema = Schema::parse_str(
1221            r#"
1222        {"type":"record", "name":"Record", "fields":[
1223          {"name":"oldfield2", "type":"string"}
1224        ]}
1225"#,
1226        )?;
1227        assert!(SchemaCompatibility::can_read(&writer_schema(), &reader_schema).is_ok());
1228        assert_eq!(
1229            CompatibilityError::MissingDefaultValue(String::from("oldfield1")),
1230            SchemaCompatibility::can_read(&reader_schema, &writer_schema()).unwrap_err()
1231        );
1232
1233        Ok(())
1234    }
1235
1236    #[test]
1237    fn test_all_fields() -> TestResult {
1238        let reader_schema = Schema::parse_str(
1239            r#"
1240        {"type":"record", "name":"Record", "fields":[
1241          {"name":"oldfield1", "type":"int"},
1242          {"name":"oldfield2", "type":"string"}
1243        ]}
1244"#,
1245        )?;
1246        assert!(SchemaCompatibility::can_read(&writer_schema(), &reader_schema).is_ok());
1247        assert!(SchemaCompatibility::can_read(&reader_schema, &writer_schema()).is_ok());
1248
1249        Ok(())
1250    }
1251
1252    #[test]
1253    fn test_new_field_with_default() -> TestResult {
1254        let reader_schema = Schema::parse_str(
1255            r#"
1256        {"type":"record", "name":"Record", "fields":[
1257          {"name":"oldfield1", "type":"int"},
1258          {"name":"newfield1", "type":"int", "default":42}
1259        ]}
1260"#,
1261        )?;
1262        assert!(SchemaCompatibility::can_read(&writer_schema(), &reader_schema).is_ok());
1263        assert_eq!(
1264            CompatibilityError::MissingDefaultValue(String::from("oldfield2")),
1265            SchemaCompatibility::can_read(&reader_schema, &writer_schema()).unwrap_err()
1266        );
1267
1268        Ok(())
1269    }
1270
1271    #[test]
1272    fn test_new_field() -> TestResult {
1273        let reader_schema = Schema::parse_str(
1274            r#"
1275        {"type":"record", "name":"Record", "fields":[
1276          {"name":"oldfield1", "type":"int"},
1277          {"name":"newfield1", "type":"int"}
1278        ]}
1279"#,
1280        )?;
1281        assert_eq!(
1282            CompatibilityError::MissingDefaultValue(String::from("newfield1")),
1283            SchemaCompatibility::can_read(&writer_schema(), &reader_schema).unwrap_err()
1284        );
1285        assert_eq!(
1286            CompatibilityError::MissingDefaultValue(String::from("oldfield2")),
1287            SchemaCompatibility::can_read(&reader_schema, &writer_schema()).unwrap_err()
1288        );
1289
1290        Ok(())
1291    }
1292
1293    #[test]
1294    fn test_array_writer_schema() {
1295        let valid_reader = string_array_schema();
1296        let invalid_reader = string_map_schema();
1297
1298        assert!(SchemaCompatibility::can_read(&string_array_schema(), &valid_reader).is_ok());
1299        assert_eq!(
1300            CompatibilityError::Inconclusive(String::from("writers_schema")),
1301            SchemaCompatibility::can_read(&string_array_schema(), &invalid_reader).unwrap_err()
1302        );
1303    }
1304
1305    #[test]
1306    fn test_primitive_writer_schema() {
1307        let valid_reader = Schema::String;
1308        assert!(SchemaCompatibility::can_read(&Schema::String, &valid_reader).is_ok());
1309        assert_eq!(
1310            CompatibilityError::TypeExpected {
1311                schema_type: String::from("readers_schema"),
1312                expected_type: vec![
1313                    SchemaKind::Int,
1314                    SchemaKind::Long,
1315                    SchemaKind::Float,
1316                    SchemaKind::Double,
1317                    SchemaKind::Date,
1318                    SchemaKind::TimeMillis
1319                ],
1320            },
1321            SchemaCompatibility::can_read(&Schema::Int, &Schema::String).unwrap_err()
1322        );
1323    }
1324
1325    #[test]
1326    fn test_union_reader_writer_subset_incompatibility() {
1327        // reader union schema must contain all writer union branches
1328        let union_writer = union_schema(vec![Schema::Int, Schema::String]);
1329        let union_reader = union_schema(vec![Schema::String]);
1330
1331        assert_eq!(
1332            CompatibilityError::MissingUnionElements,
1333            SchemaCompatibility::can_read(&union_writer, &union_reader).unwrap_err()
1334        );
1335        assert!(SchemaCompatibility::can_read(&union_reader, &union_writer).is_ok());
1336    }
1337
1338    #[test]
1339    fn test_incompatible_record_field() -> TestResult {
1340        let string_schema = Schema::parse_str(
1341            r#"
1342        {"type":"record", "name":"MyRecord", "namespace":"ns", "fields": [
1343            {"name":"field1", "type":"string"}
1344        ]}
1345        "#,
1346        )?;
1347
1348        let int_schema = Schema::parse_str(
1349            r#"
1350              {"type":"record", "name":"MyRecord", "namespace":"ns", "fields": [
1351                {"name":"field1", "type":"int"}
1352              ]}
1353        "#,
1354        )?;
1355
1356        assert_eq!(
1357            CompatibilityError::FieldTypeMismatch(
1358                "field1".to_owned(),
1359                Box::new(CompatibilityError::TypeExpected {
1360                    schema_type: "readers_schema".to_owned(),
1361                    expected_type: vec![SchemaKind::String, SchemaKind::Bytes, SchemaKind::Uuid]
1362                })
1363            ),
1364            SchemaCompatibility::can_read(&string_schema, &int_schema).unwrap_err()
1365        );
1366
1367        Ok(())
1368    }
1369
1370    #[test]
1371    fn test_enum_symbols() -> TestResult {
1372        let enum_schema1 = Schema::parse_str(
1373            r#"
1374      {"type":"enum", "name":"MyEnum", "symbols":["A","B"]}
1375"#,
1376        )?;
1377        let enum_schema2 =
1378            Schema::parse_str(r#"{"type":"enum", "name":"MyEnum", "symbols":["A","B","C"]}"#)?;
1379        assert_eq!(
1380            CompatibilityError::MissingSymbols,
1381            SchemaCompatibility::can_read(&enum_schema2, &enum_schema1).unwrap_err()
1382        );
1383        assert!(SchemaCompatibility::can_read(&enum_schema1, &enum_schema2).is_ok());
1384
1385        Ok(())
1386    }
1387
1388    fn point_2d_schema() -> Schema {
1389        Schema::parse_str(
1390            r#"
1391      {"type":"record", "name":"Point2D", "fields":[
1392        {"name":"x", "type":"double"},
1393        {"name":"y", "type":"double"}
1394      ]}
1395    "#,
1396        )
1397        .unwrap()
1398    }
1399
1400    fn point_2d_fullname_schema() -> Schema {
1401        Schema::parse_str(
1402            r#"
1403      {"type":"record", "name":"Point", "namespace":"written", "fields":[
1404        {"name":"x", "type":"double"},
1405        {"name":"y", "type":"double"}
1406      ]}
1407    "#,
1408        )
1409        .unwrap()
1410    }
1411
1412    fn point_3d_no_default_schema() -> Schema {
1413        Schema::parse_str(
1414            r#"
1415      {"type":"record", "name":"Point", "fields":[
1416        {"name":"x", "type":"double"},
1417        {"name":"y", "type":"double"},
1418        {"name":"z", "type":"double"}
1419      ]}
1420    "#,
1421        )
1422        .unwrap()
1423    }
1424
1425    fn point_3d_schema() -> Schema {
1426        Schema::parse_str(
1427            r#"
1428      {"type":"record", "name":"Point3D", "fields":[
1429        {"name":"x", "type":"double"},
1430        {"name":"y", "type":"double"},
1431        {"name":"z", "type":"double", "default": 0.0}
1432      ]}
1433    "#,
1434        )
1435        .unwrap()
1436    }
1437
1438    fn point_3d_match_name_schema() -> Schema {
1439        Schema::parse_str(
1440            r#"
1441      {"type":"record", "name":"Point", "fields":[
1442        {"name":"x", "type":"double"},
1443        {"name":"y", "type":"double"},
1444        {"name":"z", "type":"double", "default": 0.0}
1445      ]}
1446    "#,
1447        )
1448        .unwrap()
1449    }
1450
1451    #[test]
1452    fn test_union_resolution_no_structure_match() {
1453        // short name match, but no structure match
1454        let read_schema = union_schema(vec![Schema::Null, point_3d_no_default_schema()]);
1455        assert_eq!(
1456            CompatibilityError::MissingUnionElements,
1457            SchemaCompatibility::can_read(&point_2d_fullname_schema(), &read_schema).unwrap_err()
1458        );
1459    }
1460
1461    #[test]
1462    fn test_union_resolution_first_structure_match_2d() {
1463        // multiple structure matches with no name matches
1464        let read_schema = union_schema(vec![
1465            Schema::Null,
1466            point_3d_no_default_schema(),
1467            point_2d_schema(),
1468            point_3d_schema(),
1469        ]);
1470        assert_eq!(
1471            CompatibilityError::MissingUnionElements,
1472            SchemaCompatibility::can_read(&point_2d_fullname_schema(), &read_schema).unwrap_err()
1473        );
1474    }
1475
1476    #[test]
1477    fn test_union_resolution_first_structure_match_3d() {
1478        // multiple structure matches with no name matches
1479        let read_schema = union_schema(vec![
1480            Schema::Null,
1481            point_3d_no_default_schema(),
1482            point_3d_schema(),
1483            point_2d_schema(),
1484        ]);
1485        assert_eq!(
1486            CompatibilityError::MissingUnionElements,
1487            SchemaCompatibility::can_read(&point_2d_fullname_schema(), &read_schema).unwrap_err()
1488        );
1489    }
1490
1491    #[test]
1492    fn test_union_resolution_named_structure_match() {
1493        // multiple structure matches with a short name match
1494        let read_schema = union_schema(vec![
1495            Schema::Null,
1496            point_2d_schema(),
1497            point_3d_match_name_schema(),
1498            point_3d_schema(),
1499        ]);
1500        assert_eq!(
1501            CompatibilityError::MissingUnionElements,
1502            SchemaCompatibility::can_read(&point_2d_fullname_schema(), &read_schema).unwrap_err()
1503        );
1504    }
1505
1506    #[test]
1507    fn test_union_resolution_full_name_match() {
1508        // there is a full name match that should be chosen
1509        let read_schema = union_schema(vec![
1510            Schema::Null,
1511            point_2d_schema(),
1512            point_3d_match_name_schema(),
1513            point_3d_schema(),
1514            point_2d_fullname_schema(),
1515        ]);
1516        assert!(SchemaCompatibility::can_read(&point_2d_fullname_schema(), &read_schema).is_ok());
1517    }
1518
1519    #[test]
1520    fn test_avro_3772_enum_default() -> TestResult {
1521        let writer_raw_schema = r#"
1522        {
1523          "type": "record",
1524          "name": "test",
1525          "fields": [
1526            {"name": "a", "type": "long", "default": 42},
1527            {"name": "b", "type": "string"},
1528            {
1529              "name": "c",
1530              "type": {
1531                "type": "enum",
1532                "name": "suit",
1533                "symbols": ["diamonds", "spades", "clubs", "hearts"],
1534                "default": "spades"
1535              }
1536            }
1537          ]
1538        }
1539        "#;
1540
1541        let reader_raw_schema = r#"
1542        {
1543          "type": "record",
1544          "name": "test",
1545          "fields": [
1546            {"name": "a", "type": "long", "default": 42},
1547            {"name": "b", "type": "string"},
1548            {
1549              "name": "c",
1550              "type": {
1551                 "type": "enum",
1552                 "name": "suit",
1553                 "symbols": ["diamonds", "spades", "ninja", "hearts"],
1554                 "default": "spades"
1555              }
1556            }
1557          ]
1558        }
1559      "#;
1560        let writer_schema = Schema::parse_str(writer_raw_schema)?;
1561        let reader_schema = Schema::parse_str(reader_raw_schema)?;
1562        let mut writer = Writer::with_codec(&writer_schema, Vec::new(), Codec::Null)?;
1563        let mut record = Record::new(writer.schema()).unwrap();
1564        record.put("a", 27i64);
1565        record.put("b", "foo");
1566        record.put("c", "clubs");
1567        writer.append(record).unwrap();
1568        let input = writer.into_inner()?;
1569        let mut reader = Reader::with_schema(&reader_schema, &input[..])?;
1570        assert_eq!(
1571            reader.next().unwrap().unwrap(),
1572            Value::Record(vec![
1573                ("a".to_string(), Value::Long(27)),
1574                ("b".to_string(), Value::String("foo".to_string())),
1575                ("c".to_string(), Value::Enum(1, "spades".to_string())),
1576            ])
1577        );
1578        assert!(reader.next().is_none());
1579
1580        Ok(())
1581    }
1582
1583    #[test]
1584    fn test_avro_3772_enum_default_less_symbols() -> TestResult {
1585        let writer_raw_schema = r#"
1586        {
1587          "type": "record",
1588          "name": "test",
1589          "fields": [
1590            {"name": "a", "type": "long", "default": 42},
1591            {"name": "b", "type": "string"},
1592            {
1593              "name": "c",
1594              "type": {
1595                "type": "enum",
1596                "name": "suit",
1597                "symbols": ["diamonds", "spades", "clubs", "hearts"],
1598                "default": "spades"
1599              }
1600            }
1601          ]
1602        }
1603        "#;
1604
1605        let reader_raw_schema = r#"
1606        {
1607          "type": "record",
1608          "name": "test",
1609          "fields": [
1610            {"name": "a", "type": "long", "default": 42},
1611            {"name": "b", "type": "string"},
1612            {
1613              "name": "c",
1614              "type": {
1615                 "type": "enum",
1616                  "name": "suit",
1617                  "symbols": ["hearts", "spades"],
1618                  "default": "spades"
1619              }
1620            }
1621          ]
1622        }
1623      "#;
1624        let writer_schema = Schema::parse_str(writer_raw_schema)?;
1625        let reader_schema = Schema::parse_str(reader_raw_schema)?;
1626        let mut writer = Writer::with_codec(&writer_schema, Vec::new(), Codec::Null)?;
1627        let mut record = Record::new(writer.schema()).unwrap();
1628        record.put("a", 27i64);
1629        record.put("b", "foo");
1630        record.put("c", "hearts");
1631        writer.append(record).unwrap();
1632        let input = writer.into_inner()?;
1633        let mut reader = Reader::with_schema(&reader_schema, &input[..])?;
1634        assert_eq!(
1635            reader.next().unwrap().unwrap(),
1636            Value::Record(vec![
1637                ("a".to_string(), Value::Long(27)),
1638                ("b".to_string(), Value::String("foo".to_string())),
1639                ("c".to_string(), Value::Enum(0, "hearts".to_string())),
1640            ])
1641        );
1642        assert!(reader.next().is_none());
1643
1644        Ok(())
1645    }
1646
1647    #[test]
1648    fn avro_3894_take_aliases_into_account_when_serializing_for_schema_compatibility() -> TestResult
1649    {
1650        let schema_v1 = Schema::parse_str(
1651            r#"
1652        {
1653            "type": "record",
1654            "name": "Conference",
1655            "namespace": "advdaba",
1656            "fields": [
1657                {"type": "string", "name": "name"},
1658                {"type": "long", "name": "date"}
1659            ]
1660        }"#,
1661        )?;
1662
1663        let schema_v2 = Schema::parse_str(
1664            r#"
1665        {
1666            "type": "record",
1667            "name": "Conference",
1668            "namespace": "advdaba",
1669            "fields": [
1670                {"type": "string", "name": "name"},
1671                {"type": "long", "name": "date", "aliases" : [ "time" ]}
1672            ]
1673        }"#,
1674        )?;
1675
1676        assert!(SchemaCompatibility::mutual_read(&schema_v1, &schema_v2).is_ok());
1677
1678        Ok(())
1679    }
1680
1681    #[test]
1682    fn avro_3917_take_aliases_into_account_for_schema_compatibility() -> TestResult {
1683        let schema_v1 = Schema::parse_str(
1684            r#"
1685        {
1686            "type": "record",
1687            "name": "Conference",
1688            "namespace": "advdaba",
1689            "fields": [
1690                {"type": "string", "name": "name"},
1691                {"type": "long", "name": "date", "aliases" : [ "time" ]}
1692            ]
1693        }"#,
1694        )?;
1695
1696        let schema_v2 = Schema::parse_str(
1697            r#"
1698        {
1699            "type": "record",
1700            "name": "Conference",
1701            "namespace": "advdaba",
1702            "fields": [
1703                {"type": "string", "name": "name"},
1704                {"type": "long", "name": "time"}
1705            ]
1706        }"#,
1707        )?;
1708
1709        assert!(SchemaCompatibility::can_read(&schema_v2, &schema_v1).is_ok());
1710        assert_eq!(
1711            CompatibilityError::MissingDefaultValue(String::from("time")),
1712            SchemaCompatibility::can_read(&schema_v1, &schema_v2).unwrap_err()
1713        );
1714
1715        Ok(())
1716    }
1717
1718    #[test]
1719    fn test_avro_3898_record_schemas_match_by_unqualified_name() -> TestResult {
1720        let schemas = [
1721            // Record schemas
1722            (
1723                Schema::parse_str(
1724                    r#"{
1725              "type": "record",
1726              "name": "Statistics",
1727              "fields": [
1728                { "name": "success", "type": "int" },
1729                { "name": "fail", "type": "int" },
1730                { "name": "time", "type": "string" },
1731                { "name": "max", "type": "int", "default": 0 }
1732              ]
1733            }"#,
1734                )?,
1735                Schema::parse_str(
1736                    r#"{
1737              "type": "record",
1738              "name": "Statistics",
1739              "namespace": "my.namespace",
1740              "fields": [
1741                { "name": "success", "type": "int" },
1742                { "name": "fail", "type": "int" },
1743                { "name": "time", "type": "string" },
1744                { "name": "average", "type": "int", "default": 0}
1745              ]
1746            }"#,
1747                )?,
1748            ),
1749            // Enum schemas
1750            (
1751                Schema::parse_str(
1752                    r#"{
1753                    "type": "enum",
1754                    "name": "Suit",
1755                    "symbols": ["diamonds", "spades", "clubs"]
1756                }"#,
1757                )?,
1758                Schema::parse_str(
1759                    r#"{
1760                    "type": "enum",
1761                    "name": "Suit",
1762                    "namespace": "my.namespace",
1763                    "symbols": ["diamonds", "spades", "clubs", "hearts"]
1764                }"#,
1765                )?,
1766            ),
1767            // Fixed schemas
1768            (
1769                Schema::parse_str(
1770                    r#"{
1771                    "type": "fixed",
1772                    "name": "EmployeeId",
1773                    "size": 16
1774                }"#,
1775                )?,
1776                Schema::parse_str(
1777                    r#"{
1778                    "type": "fixed",
1779                    "name": "EmployeeId",
1780                    "namespace": "my.namespace",
1781                    "size": 16
1782                }"#,
1783                )?,
1784            ),
1785        ];
1786
1787        for (schema_1, schema_2) in schemas {
1788            assert!(SchemaCompatibility::can_read(&schema_1, &schema_2).is_ok());
1789        }
1790
1791        Ok(())
1792    }
1793
1794    #[test]
1795    fn test_can_read_compatibility_errors() -> TestResult {
1796        let schemas = [
1797            (
1798                Schema::parse_str(
1799                    r#"{
1800                    "type": "record",
1801                    "name": "StatisticsMap",
1802                    "fields": [
1803                        {"name": "average", "type": "int", "default": 0},
1804                        {"name": "success", "type": {"type": "map", "values": "int"}}
1805                    ]
1806                }"#,
1807                )?,
1808                Schema::parse_str(
1809                    r#"{
1810                    "type": "record",
1811                    "name": "StatisticsMap",
1812                    "fields": [
1813                        {"name": "average", "type": "int", "default": 0},
1814                        {"name": "success", "type": ["null", {"type": "map", "values": "int"}], "default": null}
1815                    ]
1816                }"#,
1817                )?,
1818                "Incompatible schemata! Field 'success' in reader schema does not match the type in the writer schema",
1819            ),
1820            (
1821                Schema::parse_str(
1822                    r#"{
1823                        "type": "record",
1824                        "name": "StatisticsArray",
1825                        "fields": [
1826                            {"name": "max_values", "type": {"type": "array", "items": "int"}}
1827                        ]
1828                    }"#,
1829                )?,
1830                Schema::parse_str(
1831                    r#"{
1832                        "type": "record",
1833                        "name": "StatisticsArray",
1834                        "fields": [
1835                            {"name": "max_values", "type": ["null", {"type": "array", "items": "int"}], "default": null}
1836                        ]
1837                    }"#,
1838                )?,
1839                "Incompatible schemata! Field 'max_values' in reader schema does not match the type in the writer schema",
1840            ),
1841        ];
1842
1843        for (schema_1, schema_2, error) in schemas {
1844            assert!(SchemaCompatibility::can_read(&schema_1, &schema_2).is_ok());
1845            assert_eq!(
1846                error,
1847                SchemaCompatibility::can_read(&schema_2, &schema_1)
1848                    .unwrap_err()
1849                    .to_string()
1850            );
1851        }
1852
1853        Ok(())
1854    }
1855
1856    #[test]
1857    fn avro_3974_can_read_schema_references() -> TestResult {
1858        let schema_strs = vec![
1859            r#"{
1860          "type": "record",
1861          "name": "Child",
1862          "namespace": "avro",
1863          "fields": [
1864            {
1865              "name": "val",
1866              "type": "int"
1867            }
1868          ]
1869        }
1870        "#,
1871            r#"{
1872          "type": "record",
1873          "name": "Parent",
1874          "namespace": "avro",
1875          "fields": [
1876            {
1877              "name": "child",
1878              "type": "avro.Child"
1879            }
1880          ]
1881        }
1882        "#,
1883        ];
1884
1885        let schemas = Schema::parse_list(schema_strs).unwrap();
1886        SchemaCompatibility::can_read(&schemas[1], &schemas[1])?;
1887
1888        Ok(())
1889    }
1890}