apache_avro/
schema_equality.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::{
19    schema::{
20        ArraySchema, DecimalSchema, EnumSchema, FixedSchema, MapSchema, RecordField, RecordSchema,
21        UnionSchema,
22    },
23    Schema,
24};
25use log::{debug, error};
26use std::{fmt::Debug, sync::OnceLock};
27
28/// A trait that compares two schemata for equality.
29/// To register a custom one use [set_schemata_equality_comparator].
30pub trait SchemataEq: Debug + Send + Sync {
31    /// Compares two schemata for equality.
32    fn compare(&self, schema_one: &Schema, schema_two: &Schema) -> bool;
33}
34
35/// Compares two schemas according to the Avro specification by using
36/// their canonical forms.
37/// See <https://avro.apache.org/docs/1.11.1/specification/#parsing-canonical-form-for-schemas>
38#[derive(Debug)]
39pub struct SpecificationEq;
40impl SchemataEq for SpecificationEq {
41    fn compare(&self, schema_one: &Schema, schema_two: &Schema) -> bool {
42        schema_one.canonical_form() == schema_two.canonical_form()
43    }
44}
45
46/// Compares two schemas for equality field by field, using only the fields that
47/// are used to construct their canonical forms.
48/// See <https://avro.apache.org/docs/1.11.1/specification/#parsing-canonical-form-for-schemas>
49#[derive(Debug)]
50pub struct StructFieldEq {
51    /// Whether to include custom attributes in the comparison.
52    /// The custom attributes are not used to construct the canonical form of the schema!
53    pub include_attributes: bool,
54}
55
56impl SchemataEq for StructFieldEq {
57    fn compare(&self, schema_one: &Schema, schema_two: &Schema) -> bool {
58        macro_rules! compare_primitive {
59            ($primitive:ident) => {
60                if let Schema::$primitive = schema_one {
61                    if let Schema::$primitive = schema_two {
62                        return true;
63                    }
64                    return false;
65                }
66            };
67        }
68
69        if schema_one.name() != schema_two.name() {
70            return false;
71        }
72
73        compare_primitive!(Null);
74        compare_primitive!(Boolean);
75        compare_primitive!(Int);
76        compare_primitive!(Int);
77        compare_primitive!(Long);
78        compare_primitive!(Float);
79        compare_primitive!(Double);
80        compare_primitive!(Bytes);
81        compare_primitive!(String);
82        compare_primitive!(Uuid);
83        compare_primitive!(BigDecimal);
84        compare_primitive!(Date);
85        compare_primitive!(Duration);
86        compare_primitive!(TimeMicros);
87        compare_primitive!(TimeMillis);
88        compare_primitive!(TimestampMicros);
89        compare_primitive!(TimestampMillis);
90        compare_primitive!(TimestampNanos);
91        compare_primitive!(LocalTimestampMicros);
92        compare_primitive!(LocalTimestampMillis);
93        compare_primitive!(LocalTimestampNanos);
94
95        if self.include_attributes
96            && schema_one.custom_attributes() != schema_two.custom_attributes()
97        {
98            return false;
99        }
100
101        if let Schema::Record(RecordSchema {
102            fields: fields_one, ..
103        }) = schema_one
104        {
105            if let Schema::Record(RecordSchema {
106                fields: fields_two, ..
107            }) = schema_two
108            {
109                return self.compare_fields(fields_one, fields_two);
110            }
111            return false;
112        }
113
114        if let Schema::Enum(EnumSchema {
115            symbols: symbols_one,
116            ..
117        }) = schema_one
118        {
119            if let Schema::Enum(EnumSchema {
120                symbols: symbols_two,
121                ..
122            }) = schema_two
123            {
124                return symbols_one == symbols_two;
125            }
126            return false;
127        }
128
129        if let Schema::Fixed(FixedSchema { size: size_one, .. }) = schema_one {
130            if let Schema::Fixed(FixedSchema { size: size_two, .. }) = schema_two {
131                return size_one == size_two;
132            }
133            return false;
134        }
135
136        if let Schema::Union(UnionSchema {
137            schemas: schemas_one,
138            ..
139        }) = schema_one
140        {
141            if let Schema::Union(UnionSchema {
142                schemas: schemas_two,
143                ..
144            }) = schema_two
145            {
146                return schemas_one.len() == schemas_two.len()
147                    && schemas_one
148                        .iter()
149                        .zip(schemas_two.iter())
150                        .all(|(s1, s2)| self.compare(s1, s2));
151            }
152            return false;
153        }
154
155        if let Schema::Decimal(DecimalSchema {
156            precision: precision_one,
157            scale: scale_one,
158            ..
159        }) = schema_one
160        {
161            if let Schema::Decimal(DecimalSchema {
162                precision: precision_two,
163                scale: scale_two,
164                ..
165            }) = schema_two
166            {
167                return precision_one == precision_two && scale_one == scale_two;
168            }
169            return false;
170        }
171
172        if let Schema::Array(ArraySchema {
173            items: items_one, ..
174        }) = schema_one
175        {
176            if let Schema::Array(ArraySchema {
177                items: items_two, ..
178            }) = schema_two
179            {
180                return items_one == items_two;
181            }
182            return false;
183        }
184
185        if let Schema::Map(MapSchema {
186            types: types_one, ..
187        }) = schema_one
188        {
189            if let Schema::Map(MapSchema {
190                types: types_two, ..
191            }) = schema_two
192            {
193                return self.compare(types_one, types_two);
194            }
195            return false;
196        }
197
198        if let Schema::Ref { name: name_one } = schema_one {
199            if let Schema::Ref { name: name_two } = schema_two {
200                return name_one == name_two;
201            }
202            return false;
203        }
204
205        error!(
206            "This is a bug in schema_equality.rs! The following schemata types are not checked! \
207            Please report it to the Avro library maintainers! \
208            \n{schema_one:?}\n\n{schema_two:?}"
209        );
210        false
211    }
212}
213
214impl StructFieldEq {
215    fn compare_fields(&self, fields_one: &[RecordField], fields_two: &[RecordField]) -> bool {
216        fields_one.len() == fields_two.len()
217            && fields_one
218                .iter()
219                .zip(fields_two.iter())
220                .all(|(f1, f2)| self.compare(&f1.schema, &f2.schema))
221    }
222}
223
224static SCHEMATA_COMPARATOR_ONCE: OnceLock<Box<dyn SchemataEq>> = OnceLock::new();
225
226/// Sets a custom schemata equality comparator.
227///
228/// Returns a unit if the registration was successful or the already
229/// registered comparator if the registration failed.
230///
231/// **Note**: This function must be called before parsing any schema because this will
232/// register the default comparator and the registration is one time only!
233pub fn set_schemata_equality_comparator(
234    comparator: Box<dyn SchemataEq>,
235) -> Result<(), Box<dyn SchemataEq>> {
236    debug!("Setting a custom schemata equality comparator: {comparator:?}.");
237    SCHEMATA_COMPARATOR_ONCE.set(comparator)
238}
239
240pub(crate) fn compare_schemata(schema_one: &Schema, schema_two: &Schema) -> bool {
241    SCHEMATA_COMPARATOR_ONCE
242        .get_or_init(|| {
243            debug!("Going to use the default schemata equality comparator: SpecificationEq.",);
244            Box::new(StructFieldEq {
245                include_attributes: false,
246            })
247        })
248        .compare(schema_one, schema_two)
249}
250
251#[cfg(test)]
252#[allow(non_snake_case)]
253mod tests {
254    use super::*;
255    use crate::schema::{Name, RecordFieldOrder};
256    use apache_avro_test_helper::TestResult;
257    use serde_json::Value;
258    use std::collections::BTreeMap;
259
260    const SPECIFICATION_EQ: SpecificationEq = SpecificationEq;
261    const STRUCT_FIELD_EQ: StructFieldEq = StructFieldEq {
262        include_attributes: false,
263    };
264
265    macro_rules! test_primitives {
266        ($primitive:ident) => {
267            paste::item! {
268                #[test]
269                fn [<test_avro_3939_compare_schemata_$primitive>]() {
270                    let specification_eq_res = SPECIFICATION_EQ.compare(&Schema::$primitive, &Schema::$primitive);
271                    let struct_field_eq_res = STRUCT_FIELD_EQ.compare(&Schema::$primitive, &Schema::$primitive);
272                    assert_eq!(specification_eq_res, struct_field_eq_res)
273                }
274            }
275        };
276    }
277
278    test_primitives!(Null);
279    test_primitives!(Boolean);
280    test_primitives!(Int);
281    test_primitives!(Long);
282    test_primitives!(Float);
283    test_primitives!(Double);
284    test_primitives!(Bytes);
285    test_primitives!(String);
286    test_primitives!(Uuid);
287    test_primitives!(BigDecimal);
288    test_primitives!(Date);
289    test_primitives!(Duration);
290    test_primitives!(TimeMicros);
291    test_primitives!(TimeMillis);
292    test_primitives!(TimestampMicros);
293    test_primitives!(TimestampMillis);
294    test_primitives!(TimestampNanos);
295    test_primitives!(LocalTimestampMicros);
296    test_primitives!(LocalTimestampMillis);
297    test_primitives!(LocalTimestampNanos);
298
299    #[test]
300    fn test_avro_3939_compare_named_schemata_with_different_names() {
301        let schema_one = Schema::Ref {
302            name: Name::from("name1"),
303        };
304
305        let schema_two = Schema::Ref {
306            name: Name::from("name2"),
307        };
308
309        let specification_eq_res = SPECIFICATION_EQ.compare(&schema_one, &schema_two);
310        assert!(!specification_eq_res);
311        let struct_field_eq_res = STRUCT_FIELD_EQ.compare(&schema_one, &schema_two);
312        assert!(!struct_field_eq_res);
313
314        assert_eq!(specification_eq_res, struct_field_eq_res);
315    }
316
317    #[test]
318    fn test_avro_3939_compare_schemata_not_including_attributes() {
319        let schema_one = Schema::map_with_attributes(
320            Schema::Boolean,
321            BTreeMap::from_iter([("key1".to_string(), Value::Bool(true))]),
322        );
323        let schema_two = Schema::map_with_attributes(
324            Schema::Boolean,
325            BTreeMap::from_iter([("key2".to_string(), Value::Bool(true))]),
326        );
327        // STRUCT_FIELD_EQ does not include attributes !
328        assert!(STRUCT_FIELD_EQ.compare(&schema_one, &schema_two));
329    }
330
331    #[test]
332    fn test_avro_3939_compare_schemata_including_attributes() {
333        let struct_field_eq = StructFieldEq {
334            include_attributes: true,
335        };
336        let schema_one = Schema::map_with_attributes(
337            Schema::Boolean,
338            BTreeMap::from_iter([("key1".to_string(), Value::Bool(true))]),
339        );
340        let schema_two = Schema::map_with_attributes(
341            Schema::Boolean,
342            BTreeMap::from_iter([("key2".to_string(), Value::Bool(true))]),
343        );
344        assert!(!struct_field_eq.compare(&schema_one, &schema_two));
345    }
346
347    #[test]
348    fn test_avro_3939_compare_map_schemata() {
349        let schema_one = Schema::map(Schema::Boolean);
350        assert!(!SPECIFICATION_EQ.compare(&schema_one, &Schema::Boolean));
351        assert!(!STRUCT_FIELD_EQ.compare(&schema_one, &Schema::Boolean));
352
353        let schema_two = Schema::map(Schema::Boolean);
354
355        let specification_eq_res = SPECIFICATION_EQ.compare(&schema_one, &schema_two);
356        let struct_field_eq_res = STRUCT_FIELD_EQ.compare(&schema_one, &schema_two);
357        assert!(
358            specification_eq_res,
359            "SpecificationEq: Equality of two Schema::Map failed!"
360        );
361        assert!(
362            struct_field_eq_res,
363            "StructFieldEq: Equality of two Schema::Map failed!"
364        );
365        assert_eq!(specification_eq_res, struct_field_eq_res);
366    }
367
368    #[test]
369    fn test_avro_3939_compare_array_schemata() {
370        let schema_one = Schema::array(Schema::Boolean);
371        assert!(!SPECIFICATION_EQ.compare(&schema_one, &Schema::Boolean));
372        assert!(!STRUCT_FIELD_EQ.compare(&schema_one, &Schema::Boolean));
373
374        let schema_two = Schema::array(Schema::Boolean);
375
376        let specification_eq_res = SPECIFICATION_EQ.compare(&schema_one, &schema_two);
377        let struct_field_eq_res = STRUCT_FIELD_EQ.compare(&schema_one, &schema_two);
378        assert!(
379            specification_eq_res,
380            "SpecificationEq: Equality of two Schema::Array failed!"
381        );
382        assert!(
383            struct_field_eq_res,
384            "StructFieldEq: Equality of two Schema::Array failed!"
385        );
386        assert_eq!(specification_eq_res, struct_field_eq_res);
387    }
388
389    #[test]
390    fn test_avro_3939_compare_decimal_schemata() {
391        let schema_one = Schema::Decimal(DecimalSchema {
392            precision: 10,
393            scale: 2,
394            inner: Box::new(Schema::Bytes),
395        });
396        assert!(!SPECIFICATION_EQ.compare(&schema_one, &Schema::Boolean));
397        assert!(!STRUCT_FIELD_EQ.compare(&schema_one, &Schema::Boolean));
398
399        let schema_two = Schema::Decimal(DecimalSchema {
400            precision: 10,
401            scale: 2,
402            inner: Box::new(Schema::Bytes),
403        });
404
405        let specification_eq_res = SPECIFICATION_EQ.compare(&schema_one, &schema_two);
406        let struct_field_eq_res = STRUCT_FIELD_EQ.compare(&schema_one, &schema_two);
407        assert!(
408            specification_eq_res,
409            "SpecificationEq: Equality of two Schema::Decimal failed!"
410        );
411        assert!(
412            struct_field_eq_res,
413            "StructFieldEq: Equality of two Schema::Decimal failed!"
414        );
415        assert_eq!(specification_eq_res, struct_field_eq_res);
416    }
417
418    #[test]
419    fn test_avro_3939_compare_fixed_schemata() {
420        let schema_one = Schema::Fixed(FixedSchema {
421            name: Name::from("fixed"),
422            doc: None,
423            size: 10,
424            default: None,
425            aliases: None,
426            attributes: BTreeMap::new(),
427        });
428        assert!(!SPECIFICATION_EQ.compare(&schema_one, &Schema::Boolean));
429        assert!(!STRUCT_FIELD_EQ.compare(&schema_one, &Schema::Boolean));
430
431        let schema_two = Schema::Fixed(FixedSchema {
432            name: Name::from("fixed"),
433            doc: None,
434            size: 10,
435            default: None,
436            aliases: None,
437            attributes: BTreeMap::new(),
438        });
439
440        let specification_eq_res = SPECIFICATION_EQ.compare(&schema_one, &schema_two);
441        let struct_field_eq_res = STRUCT_FIELD_EQ.compare(&schema_one, &schema_two);
442        assert!(
443            specification_eq_res,
444            "SpecificationEq: Equality of two Schema::Fixed failed!"
445        );
446        assert!(
447            struct_field_eq_res,
448            "StructFieldEq: Equality of two Schema::Fixed failed!"
449        );
450        assert_eq!(specification_eq_res, struct_field_eq_res);
451    }
452
453    #[test]
454    fn test_avro_3939_compare_enum_schemata() {
455        let schema_one = Schema::Enum(EnumSchema {
456            name: Name::from("enum"),
457            doc: None,
458            symbols: vec!["A".to_string(), "B".to_string()],
459            default: None,
460            aliases: None,
461            attributes: BTreeMap::new(),
462        });
463        assert!(!SPECIFICATION_EQ.compare(&schema_one, &Schema::Boolean));
464        assert!(!STRUCT_FIELD_EQ.compare(&schema_one, &Schema::Boolean));
465
466        let schema_two = Schema::Enum(EnumSchema {
467            name: Name::from("enum"),
468            doc: None,
469            symbols: vec!["A".to_string(), "B".to_string()],
470            default: None,
471            aliases: None,
472            attributes: BTreeMap::new(),
473        });
474
475        let specification_eq_res = SPECIFICATION_EQ.compare(&schema_one, &schema_two);
476        let struct_field_eq_res = STRUCT_FIELD_EQ.compare(&schema_one, &schema_two);
477        assert!(
478            specification_eq_res,
479            "SpecificationEq: Equality of two Schema::Enum failed!"
480        );
481        assert!(
482            struct_field_eq_res,
483            "StructFieldEq: Equality of two Schema::Enum failed!"
484        );
485        assert_eq!(specification_eq_res, struct_field_eq_res);
486    }
487
488    #[test]
489    fn test_avro_3939_compare_ref_schemata() {
490        let schema_one = Schema::Ref {
491            name: Name::from("ref"),
492        };
493        assert!(!SPECIFICATION_EQ.compare(&schema_one, &Schema::Boolean));
494        assert!(!STRUCT_FIELD_EQ.compare(&schema_one, &Schema::Boolean));
495
496        let schema_two = Schema::Ref {
497            name: Name::from("ref"),
498        };
499
500        let specification_eq_res = SPECIFICATION_EQ.compare(&schema_one, &schema_two);
501        let struct_field_eq_res = STRUCT_FIELD_EQ.compare(&schema_one, &schema_two);
502        assert!(
503            specification_eq_res,
504            "SpecificationEq: Equality of two Schema::Ref failed!"
505        );
506        assert!(
507            struct_field_eq_res,
508            "StructFieldEq: Equality of two Schema::Ref failed!"
509        );
510        assert_eq!(specification_eq_res, struct_field_eq_res);
511    }
512
513    #[test]
514    fn test_avro_3939_compare_record_schemata() {
515        let schema_one = Schema::Record(RecordSchema {
516            name: Name::from("record"),
517            doc: None,
518            fields: vec![RecordField {
519                name: "field".to_string(),
520                doc: None,
521                default: None,
522                schema: Schema::Boolean,
523                order: RecordFieldOrder::Ignore,
524                aliases: None,
525                custom_attributes: BTreeMap::new(),
526                position: 0,
527            }],
528            aliases: None,
529            attributes: BTreeMap::new(),
530            lookup: Default::default(),
531        });
532        assert!(!SPECIFICATION_EQ.compare(&schema_one, &Schema::Boolean));
533        assert!(!STRUCT_FIELD_EQ.compare(&schema_one, &Schema::Boolean));
534
535        let schema_two = Schema::Record(RecordSchema {
536            name: Name::from("record"),
537            doc: None,
538            fields: vec![RecordField {
539                name: "field".to_string(),
540                doc: None,
541                default: None,
542                schema: Schema::Boolean,
543                order: RecordFieldOrder::Ignore,
544                aliases: None,
545                custom_attributes: BTreeMap::new(),
546                position: 0,
547            }],
548            aliases: None,
549            attributes: BTreeMap::new(),
550            lookup: Default::default(),
551        });
552
553        let specification_eq_res = SPECIFICATION_EQ.compare(&schema_one, &schema_two);
554        let struct_field_eq_res = STRUCT_FIELD_EQ.compare(&schema_one, &schema_two);
555        assert!(
556            specification_eq_res,
557            "SpecificationEq: Equality of two Schema::Record failed!"
558        );
559        assert!(
560            struct_field_eq_res,
561            "StructFieldEq: Equality of two Schema::Record failed!"
562        );
563        assert_eq!(specification_eq_res, struct_field_eq_res);
564    }
565
566    #[test]
567    fn test_avro_3939_compare_union_schemata() -> TestResult {
568        let schema_one = Schema::Union(UnionSchema::new(vec![Schema::Boolean, Schema::Int])?);
569        assert!(!SPECIFICATION_EQ.compare(&schema_one, &Schema::Boolean));
570        assert!(!STRUCT_FIELD_EQ.compare(&schema_one, &Schema::Boolean));
571
572        let schema_two = Schema::Union(UnionSchema::new(vec![Schema::Boolean, Schema::Int])?);
573
574        let specification_eq_res = SPECIFICATION_EQ.compare(&schema_one, &schema_two);
575        let struct_field_eq_res = STRUCT_FIELD_EQ.compare(&schema_one, &schema_two);
576        assert!(
577            specification_eq_res,
578            "SpecificationEq: Equality of two Schema::Union failed!"
579        );
580        assert!(
581            struct_field_eq_res,
582            "StructFieldEq: Equality of two Schema::Union failed!"
583        );
584        assert_eq!(specification_eq_res, struct_field_eq_res);
585        Ok(())
586    }
587}