apache_avro/
schema_equality.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::{
19    schema::{
20        ArraySchema, DecimalSchema, EnumSchema, FixedSchema, MapSchema, RecordField, RecordSchema,
21        UnionSchema,
22    },
23    Schema,
24};
25use log::{debug, error};
26use std::{fmt::Debug, sync::OnceLock};
27
28/// A trait that compares two schemata for equality.
29/// To register a custom one use [set_schemata_equality_comparator].
30pub trait SchemataEq: Debug + Send + Sync {
31    /// Compares two schemata for equality.
32    fn compare(&self, schema_one: &Schema, schema_two: &Schema) -> bool;
33}
34
35/// Compares two schemas according to the Avro specification by using
36/// their canonical forms.
37/// See <https://avro.apache.org/docs/1.11.1/specification/#parsing-canonical-form-for-schemas>
38#[derive(Debug)]
39pub struct SpecificationEq;
40impl SchemataEq for SpecificationEq {
41    fn compare(&self, schema_one: &Schema, schema_two: &Schema) -> bool {
42        schema_one.canonical_form() == schema_two.canonical_form()
43    }
44}
45
46/// Compares two schemas for equality field by field, using only the fields that
47/// are used to construct their canonical forms.
48/// See <https://avro.apache.org/docs/1.11.1/specification/#parsing-canonical-form-for-schemas>
49#[derive(Debug)]
50pub struct StructFieldEq {
51    /// Whether to include custom attributes in the comparison.
52    /// The custom attributes are not used to construct the canonical form of the schema!
53    pub include_attributes: bool,
54}
55
56impl SchemataEq for StructFieldEq {
57    fn compare(&self, schema_one: &Schema, schema_two: &Schema) -> bool {
58        macro_rules! compare_primitive {
59            ($primitive:ident) => {
60                if let Schema::$primitive = schema_one {
61                    if let Schema::$primitive = schema_two {
62                        return true;
63                    }
64                    return false;
65                }
66            };
67        }
68
69        if schema_one.name() != schema_two.name() {
70            return false;
71        }
72
73        compare_primitive!(Null);
74        compare_primitive!(Boolean);
75        compare_primitive!(Int);
76        compare_primitive!(Int);
77        compare_primitive!(Long);
78        compare_primitive!(Float);
79        compare_primitive!(Double);
80        compare_primitive!(Bytes);
81        compare_primitive!(String);
82        compare_primitive!(Uuid);
83        compare_primitive!(BigDecimal);
84        compare_primitive!(Date);
85        compare_primitive!(Duration);
86        compare_primitive!(TimeMicros);
87        compare_primitive!(TimeMillis);
88        compare_primitive!(TimestampMicros);
89        compare_primitive!(TimestampMillis);
90        compare_primitive!(TimestampNanos);
91        compare_primitive!(LocalTimestampMicros);
92        compare_primitive!(LocalTimestampMillis);
93        compare_primitive!(LocalTimestampNanos);
94
95        if self.include_attributes
96            && schema_one.custom_attributes() != schema_two.custom_attributes()
97        {
98            return false;
99        }
100
101        if let Schema::Record(RecordSchema {
102            fields: fields_one, ..
103        }) = schema_one
104        {
105            if let Schema::Record(RecordSchema {
106                fields: fields_two, ..
107            }) = schema_two
108            {
109                return self.compare_fields(fields_one, fields_two);
110            }
111            return false;
112        }
113
114        if let Schema::Enum(EnumSchema {
115            symbols: symbols_one,
116            ..
117        }) = schema_one
118        {
119            if let Schema::Enum(EnumSchema {
120                symbols: symbols_two,
121                ..
122            }) = schema_two
123            {
124                return symbols_one == symbols_two;
125            }
126            return false;
127        }
128
129        if let Schema::Fixed(FixedSchema { size: size_one, .. }) = schema_one {
130            if let Schema::Fixed(FixedSchema { size: size_two, .. }) = schema_two {
131                return size_one == size_two;
132            }
133            return false;
134        }
135
136        if let Schema::Union(UnionSchema {
137            schemas: schemas_one,
138            ..
139        }) = schema_one
140        {
141            if let Schema::Union(UnionSchema {
142                schemas: schemas_two,
143                ..
144            }) = schema_two
145            {
146                return schemas_one.len() == schemas_two.len()
147                    && schemas_one
148                        .iter()
149                        .zip(schemas_two.iter())
150                        .all(|(s1, s2)| self.compare(s1, s2));
151            }
152            return false;
153        }
154
155        if let Schema::Decimal(DecimalSchema {
156            precision: precision_one,
157            scale: scale_one,
158            ..
159        }) = schema_one
160        {
161            if let Schema::Decimal(DecimalSchema {
162                precision: precision_two,
163                scale: scale_two,
164                ..
165            }) = schema_two
166            {
167                return precision_one == precision_two && scale_one == scale_two;
168            }
169            return false;
170        }
171
172        if let Schema::Array(ArraySchema {
173            items: items_one, ..
174        }) = schema_one
175        {
176            if let Schema::Array(ArraySchema {
177                items: items_two, ..
178            }) = schema_two
179            {
180                return items_one == items_two;
181            }
182            return false;
183        }
184
185        if let Schema::Map(MapSchema {
186            types: types_one, ..
187        }) = schema_one
188        {
189            if let Schema::Map(MapSchema {
190                types: types_two, ..
191            }) = schema_two
192            {
193                return self.compare(types_one, types_two);
194            }
195            return false;
196        }
197
198        if let Schema::Ref { name: name_one } = schema_one {
199            if let Schema::Ref { name: name_two } = schema_two {
200                return name_one == name_two;
201            }
202            return false;
203        }
204
205        error!(
206            "This is a bug in schema_equality.rs! The following schemata types are not checked! \
207            Please report it to the Avro library maintainers! \
208            \n{:?}\n\n{:?}",
209            schema_one, schema_two
210        );
211        false
212    }
213}
214
215impl StructFieldEq {
216    fn compare_fields(&self, fields_one: &[RecordField], fields_two: &[RecordField]) -> bool {
217        fields_one.len() == fields_two.len()
218            && fields_one
219                .iter()
220                .zip(fields_two.iter())
221                .all(|(f1, f2)| self.compare(&f1.schema, &f2.schema))
222    }
223}
224
225static SCHEMATA_COMPARATOR_ONCE: OnceLock<Box<dyn SchemataEq>> = OnceLock::new();
226
227/// Sets a custom schemata equality comparator.
228///
229/// Returns a unit if the registration was successful or the already
230/// registered comparator if the registration failed.
231///
232/// **Note**: This function must be called before parsing any schema because this will
233/// register the default comparator and the registration is one time only!
234pub fn set_schemata_equality_comparator(
235    comparator: Box<dyn SchemataEq>,
236) -> Result<(), Box<dyn SchemataEq>> {
237    debug!(
238        "Setting a custom schemata equality comparator: {:?}.",
239        comparator
240    );
241    SCHEMATA_COMPARATOR_ONCE.set(comparator)
242}
243
244pub(crate) fn compare_schemata(schema_one: &Schema, schema_two: &Schema) -> bool {
245    SCHEMATA_COMPARATOR_ONCE
246        .get_or_init(|| {
247            debug!("Going to use the default schemata equality comparator: SpecificationEq.",);
248            Box::new(StructFieldEq {
249                include_attributes: false,
250            })
251        })
252        .compare(schema_one, schema_two)
253}
254
255#[cfg(test)]
256#[allow(non_snake_case)]
257mod tests {
258    use super::*;
259    use crate::schema::{Name, RecordFieldOrder};
260    use apache_avro_test_helper::TestResult;
261    use serde_json::Value;
262    use std::collections::BTreeMap;
263
264    const SPECIFICATION_EQ: SpecificationEq = SpecificationEq;
265    const STRUCT_FIELD_EQ: StructFieldEq = StructFieldEq {
266        include_attributes: false,
267    };
268
269    macro_rules! test_primitives {
270        ($primitive:ident) => {
271            paste::item! {
272                #[test]
273                fn [<test_avro_3939_compare_schemata_$primitive>]() {
274                    let specification_eq_res = SPECIFICATION_EQ.compare(&Schema::$primitive, &Schema::$primitive);
275                    let struct_field_eq_res = STRUCT_FIELD_EQ.compare(&Schema::$primitive, &Schema::$primitive);
276                    assert_eq!(specification_eq_res, struct_field_eq_res)
277                }
278            }
279        };
280    }
281
282    test_primitives!(Null);
283    test_primitives!(Boolean);
284    test_primitives!(Int);
285    test_primitives!(Long);
286    test_primitives!(Float);
287    test_primitives!(Double);
288    test_primitives!(Bytes);
289    test_primitives!(String);
290    test_primitives!(Uuid);
291    test_primitives!(BigDecimal);
292    test_primitives!(Date);
293    test_primitives!(Duration);
294    test_primitives!(TimeMicros);
295    test_primitives!(TimeMillis);
296    test_primitives!(TimestampMicros);
297    test_primitives!(TimestampMillis);
298    test_primitives!(TimestampNanos);
299    test_primitives!(LocalTimestampMicros);
300    test_primitives!(LocalTimestampMillis);
301    test_primitives!(LocalTimestampNanos);
302
303    #[test]
304    fn test_avro_3939_compare_named_schemata_with_different_names() {
305        let schema_one = Schema::Ref {
306            name: Name::from("name1"),
307        };
308
309        let schema_two = Schema::Ref {
310            name: Name::from("name2"),
311        };
312
313        let specification_eq_res = SPECIFICATION_EQ.compare(&schema_one, &schema_two);
314        assert!(!specification_eq_res);
315        let struct_field_eq_res = STRUCT_FIELD_EQ.compare(&schema_one, &schema_two);
316        assert!(!struct_field_eq_res);
317
318        assert_eq!(specification_eq_res, struct_field_eq_res);
319    }
320
321    #[test]
322    fn test_avro_3939_compare_schemata_not_including_attributes() {
323        let schema_one = Schema::map_with_attributes(
324            Schema::Boolean,
325            BTreeMap::from_iter([("key1".to_string(), Value::Bool(true))]),
326        );
327        let schema_two = Schema::map_with_attributes(
328            Schema::Boolean,
329            BTreeMap::from_iter([("key2".to_string(), Value::Bool(true))]),
330        );
331        // STRUCT_FIELD_EQ does not include attributes !
332        assert!(STRUCT_FIELD_EQ.compare(&schema_one, &schema_two));
333    }
334
335    #[test]
336    fn test_avro_3939_compare_schemata_including_attributes() {
337        let struct_field_eq = StructFieldEq {
338            include_attributes: true,
339        };
340        let schema_one = Schema::map_with_attributes(
341            Schema::Boolean,
342            BTreeMap::from_iter([("key1".to_string(), Value::Bool(true))]),
343        );
344        let schema_two = Schema::map_with_attributes(
345            Schema::Boolean,
346            BTreeMap::from_iter([("key2".to_string(), Value::Bool(true))]),
347        );
348        assert!(!struct_field_eq.compare(&schema_one, &schema_two));
349    }
350
351    #[test]
352    fn test_avro_3939_compare_map_schemata() {
353        let schema_one = Schema::map(Schema::Boolean);
354        assert!(!SPECIFICATION_EQ.compare(&schema_one, &Schema::Boolean));
355        assert!(!STRUCT_FIELD_EQ.compare(&schema_one, &Schema::Boolean));
356
357        let schema_two = Schema::map(Schema::Boolean);
358
359        let specification_eq_res = SPECIFICATION_EQ.compare(&schema_one, &schema_two);
360        let struct_field_eq_res = STRUCT_FIELD_EQ.compare(&schema_one, &schema_two);
361        assert!(
362            specification_eq_res,
363            "SpecificationEq: Equality of two Schema::Map failed!"
364        );
365        assert!(
366            struct_field_eq_res,
367            "StructFieldEq: Equality of two Schema::Map failed!"
368        );
369        assert_eq!(specification_eq_res, struct_field_eq_res);
370    }
371
372    #[test]
373    fn test_avro_3939_compare_array_schemata() {
374        let schema_one = Schema::array(Schema::Boolean);
375        assert!(!SPECIFICATION_EQ.compare(&schema_one, &Schema::Boolean));
376        assert!(!STRUCT_FIELD_EQ.compare(&schema_one, &Schema::Boolean));
377
378        let schema_two = Schema::array(Schema::Boolean);
379
380        let specification_eq_res = SPECIFICATION_EQ.compare(&schema_one, &schema_two);
381        let struct_field_eq_res = STRUCT_FIELD_EQ.compare(&schema_one, &schema_two);
382        assert!(
383            specification_eq_res,
384            "SpecificationEq: Equality of two Schema::Array failed!"
385        );
386        assert!(
387            struct_field_eq_res,
388            "StructFieldEq: Equality of two Schema::Array failed!"
389        );
390        assert_eq!(specification_eq_res, struct_field_eq_res);
391    }
392
393    #[test]
394    fn test_avro_3939_compare_decimal_schemata() {
395        let schema_one = Schema::Decimal(DecimalSchema {
396            precision: 10,
397            scale: 2,
398            inner: Box::new(Schema::Bytes),
399        });
400        assert!(!SPECIFICATION_EQ.compare(&schema_one, &Schema::Boolean));
401        assert!(!STRUCT_FIELD_EQ.compare(&schema_one, &Schema::Boolean));
402
403        let schema_two = Schema::Decimal(DecimalSchema {
404            precision: 10,
405            scale: 2,
406            inner: Box::new(Schema::Bytes),
407        });
408
409        let specification_eq_res = SPECIFICATION_EQ.compare(&schema_one, &schema_two);
410        let struct_field_eq_res = STRUCT_FIELD_EQ.compare(&schema_one, &schema_two);
411        assert!(
412            specification_eq_res,
413            "SpecificationEq: Equality of two Schema::Decimal failed!"
414        );
415        assert!(
416            struct_field_eq_res,
417            "StructFieldEq: Equality of two Schema::Decimal failed!"
418        );
419        assert_eq!(specification_eq_res, struct_field_eq_res);
420    }
421
422    #[test]
423    fn test_avro_3939_compare_fixed_schemata() {
424        let schema_one = Schema::Fixed(FixedSchema {
425            name: Name::from("fixed"),
426            doc: None,
427            size: 10,
428            default: None,
429            aliases: None,
430            attributes: BTreeMap::new(),
431        });
432        assert!(!SPECIFICATION_EQ.compare(&schema_one, &Schema::Boolean));
433        assert!(!STRUCT_FIELD_EQ.compare(&schema_one, &Schema::Boolean));
434
435        let schema_two = Schema::Fixed(FixedSchema {
436            name: Name::from("fixed"),
437            doc: None,
438            size: 10,
439            default: None,
440            aliases: None,
441            attributes: BTreeMap::new(),
442        });
443
444        let specification_eq_res = SPECIFICATION_EQ.compare(&schema_one, &schema_two);
445        let struct_field_eq_res = STRUCT_FIELD_EQ.compare(&schema_one, &schema_two);
446        assert!(
447            specification_eq_res,
448            "SpecificationEq: Equality of two Schema::Fixed failed!"
449        );
450        assert!(
451            struct_field_eq_res,
452            "StructFieldEq: Equality of two Schema::Fixed failed!"
453        );
454        assert_eq!(specification_eq_res, struct_field_eq_res);
455    }
456
457    #[test]
458    fn test_avro_3939_compare_enum_schemata() {
459        let schema_one = Schema::Enum(EnumSchema {
460            name: Name::from("enum"),
461            doc: None,
462            symbols: vec!["A".to_string(), "B".to_string()],
463            default: None,
464            aliases: None,
465            attributes: BTreeMap::new(),
466        });
467        assert!(!SPECIFICATION_EQ.compare(&schema_one, &Schema::Boolean));
468        assert!(!STRUCT_FIELD_EQ.compare(&schema_one, &Schema::Boolean));
469
470        let schema_two = Schema::Enum(EnumSchema {
471            name: Name::from("enum"),
472            doc: None,
473            symbols: vec!["A".to_string(), "B".to_string()],
474            default: None,
475            aliases: None,
476            attributes: BTreeMap::new(),
477        });
478
479        let specification_eq_res = SPECIFICATION_EQ.compare(&schema_one, &schema_two);
480        let struct_field_eq_res = STRUCT_FIELD_EQ.compare(&schema_one, &schema_two);
481        assert!(
482            specification_eq_res,
483            "SpecificationEq: Equality of two Schema::Enum failed!"
484        );
485        assert!(
486            struct_field_eq_res,
487            "StructFieldEq: Equality of two Schema::Enum failed!"
488        );
489        assert_eq!(specification_eq_res, struct_field_eq_res);
490    }
491
492    #[test]
493    fn test_avro_3939_compare_ref_schemata() {
494        let schema_one = Schema::Ref {
495            name: Name::from("ref"),
496        };
497        assert!(!SPECIFICATION_EQ.compare(&schema_one, &Schema::Boolean));
498        assert!(!STRUCT_FIELD_EQ.compare(&schema_one, &Schema::Boolean));
499
500        let schema_two = Schema::Ref {
501            name: Name::from("ref"),
502        };
503
504        let specification_eq_res = SPECIFICATION_EQ.compare(&schema_one, &schema_two);
505        let struct_field_eq_res = STRUCT_FIELD_EQ.compare(&schema_one, &schema_two);
506        assert!(
507            specification_eq_res,
508            "SpecificationEq: Equality of two Schema::Ref failed!"
509        );
510        assert!(
511            struct_field_eq_res,
512            "StructFieldEq: Equality of two Schema::Ref failed!"
513        );
514        assert_eq!(specification_eq_res, struct_field_eq_res);
515    }
516
517    #[test]
518    fn test_avro_3939_compare_record_schemata() {
519        let schema_one = Schema::Record(RecordSchema {
520            name: Name::from("record"),
521            doc: None,
522            fields: vec![RecordField {
523                name: "field".to_string(),
524                doc: None,
525                default: None,
526                schema: Schema::Boolean,
527                order: RecordFieldOrder::Ignore,
528                aliases: None,
529                custom_attributes: BTreeMap::new(),
530                position: 0,
531            }],
532            aliases: None,
533            attributes: BTreeMap::new(),
534            lookup: Default::default(),
535        });
536        assert!(!SPECIFICATION_EQ.compare(&schema_one, &Schema::Boolean));
537        assert!(!STRUCT_FIELD_EQ.compare(&schema_one, &Schema::Boolean));
538
539        let schema_two = Schema::Record(RecordSchema {
540            name: Name::from("record"),
541            doc: None,
542            fields: vec![RecordField {
543                name: "field".to_string(),
544                doc: None,
545                default: None,
546                schema: Schema::Boolean,
547                order: RecordFieldOrder::Ignore,
548                aliases: None,
549                custom_attributes: BTreeMap::new(),
550                position: 0,
551            }],
552            aliases: None,
553            attributes: BTreeMap::new(),
554            lookup: Default::default(),
555        });
556
557        let specification_eq_res = SPECIFICATION_EQ.compare(&schema_one, &schema_two);
558        let struct_field_eq_res = STRUCT_FIELD_EQ.compare(&schema_one, &schema_two);
559        assert!(
560            specification_eq_res,
561            "SpecificationEq: Equality of two Schema::Record failed!"
562        );
563        assert!(
564            struct_field_eq_res,
565            "StructFieldEq: Equality of two Schema::Record failed!"
566        );
567        assert_eq!(specification_eq_res, struct_field_eq_res);
568    }
569
570    #[test]
571    fn test_avro_3939_compare_union_schemata() -> TestResult {
572        let schema_one = Schema::Union(UnionSchema::new(vec![Schema::Boolean, Schema::Int])?);
573        assert!(!SPECIFICATION_EQ.compare(&schema_one, &Schema::Boolean));
574        assert!(!STRUCT_FIELD_EQ.compare(&schema_one, &Schema::Boolean));
575
576        let schema_two = Schema::Union(UnionSchema::new(vec![Schema::Boolean, Schema::Int])?);
577
578        let specification_eq_res = SPECIFICATION_EQ.compare(&schema_one, &schema_two);
579        let struct_field_eq_res = STRUCT_FIELD_EQ.compare(&schema_one, &schema_two);
580        assert!(
581            specification_eq_res,
582            "SpecificationEq: Equality of two Schema::Union failed!"
583        );
584        assert!(
585            struct_field_eq_res,
586            "StructFieldEq: Equality of two Schema::Union failed!"
587        );
588        assert_eq!(specification_eq_res, struct_field_eq_res);
589        Ok(())
590    }
591}