apache_avro/
schema_equality.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::{
19    Schema,
20    schema::{
21        ArraySchema, DecimalSchema, EnumSchema, FixedSchema, MapSchema, RecordField, RecordSchema,
22        UnionSchema,
23    },
24};
25use log::debug;
26use std::{fmt::Debug, sync::OnceLock};
27
28/// A trait that compares two schemata for equality.
29/// To register a custom one use [set_schemata_equality_comparator].
30pub trait SchemataEq: Debug + Send + Sync {
31    /// Compares two schemata for equality.
32    fn compare(&self, schema_one: &Schema, schema_two: &Schema) -> bool;
33}
34
35/// Compares two schemas according to the Avro specification by using
36/// their canonical forms.
37/// See <https://avro.apache.org/docs/1.11.1/specification/#parsing-canonical-form-for-schemas>
38#[derive(Debug)]
39pub struct SpecificationEq;
40impl SchemataEq for SpecificationEq {
41    fn compare(&self, schema_one: &Schema, schema_two: &Schema) -> bool {
42        schema_one.canonical_form() == schema_two.canonical_form()
43    }
44}
45
46/// Compares two schemas for equality field by field, using only the fields that
47/// are used to construct their canonical forms.
48/// See <https://avro.apache.org/docs/1.11.1/specification/#parsing-canonical-form-for-schemas>
49#[derive(Debug)]
50pub struct StructFieldEq {
51    /// Whether to include custom attributes in the comparison.
52    /// The custom attributes are not used to construct the canonical form of the schema!
53    pub include_attributes: bool,
54}
55
56impl SchemataEq for StructFieldEq {
57    #[rustfmt::skip]
58    fn compare(&self, schema_one: &Schema, schema_two: &Schema) -> bool {
59        if schema_one.name() != schema_two.name() {
60            return false;
61        }
62
63        if self.include_attributes
64            && schema_one.custom_attributes() != schema_two.custom_attributes()
65        {
66            return false;
67        }
68
69        match (schema_one, schema_two) {
70            (Schema::Null, Schema::Null) => true,
71            (Schema::Null, _) => false,
72            (Schema::Boolean, Schema::Boolean) => true,
73            (Schema::Boolean, _) => false,
74            (Schema::Int, Schema::Int) => true,
75            (Schema::Int, _) => false,
76            (Schema::Long, Schema::Long) => true,
77            (Schema::Long, _) => false,
78            (Schema::Float, Schema::Float) => true,
79            (Schema::Float, _) => false,
80            (Schema::Double, Schema::Double) => true,
81            (Schema::Double, _) => false,
82            (Schema::Bytes, Schema::Bytes) => true,
83            (Schema::Bytes, _) => false,
84            (Schema::String, Schema::String) => true,
85            (Schema::String, _) => false,
86            (Schema::Uuid, Schema::Uuid) => true,
87            (Schema::Uuid, _) => false,
88            (Schema::BigDecimal, Schema::BigDecimal) => true,
89            (Schema::BigDecimal, _) => false,
90            (Schema::Date, Schema::Date) => true,
91            (Schema::Date, _) => false,
92            (Schema::Duration, Schema::Duration) => true,
93            (Schema::Duration, _) => false,
94            (Schema::TimeMicros, Schema::TimeMicros) => true,
95            (Schema::TimeMicros, _) => false,
96            (Schema::TimeMillis, Schema::TimeMillis) => true,
97            (Schema::TimeMillis, _) => false,
98            (Schema::TimestampMicros, Schema::TimestampMicros) => true,
99            (Schema::TimestampMicros, _) => false,
100            (Schema::TimestampMillis, Schema::TimestampMillis) => true,
101            (Schema::TimestampMillis, _) => false,
102            (Schema::TimestampNanos, Schema::TimestampNanos) => true,
103            (Schema::TimestampNanos, _) => false,
104            (Schema::LocalTimestampMicros, Schema::LocalTimestampMicros) => true,
105            (Schema::LocalTimestampMicros, _) => false,
106            (Schema::LocalTimestampMillis, Schema::LocalTimestampMillis) => true,
107            (Schema::LocalTimestampMillis, _) => false,
108            (Schema::LocalTimestampNanos, Schema::LocalTimestampNanos) => true,
109            (Schema::LocalTimestampNanos, _) => false,
110            (
111                Schema::Record(RecordSchema { fields: fields_one, ..}),
112                Schema::Record(RecordSchema { fields: fields_two, ..})
113            ) => {
114                self.compare_fields(fields_one, fields_two)
115            }
116            (Schema::Record(_), _) => false,
117            (
118                Schema::Enum(EnumSchema { symbols: symbols_one, ..}),
119                Schema::Enum(EnumSchema { symbols: symbols_two, .. })
120            ) => {
121                symbols_one == symbols_two
122            }
123            (Schema::Enum(_), _) => false,
124            (
125                Schema::Fixed(FixedSchema { size: size_one, ..}),
126                Schema::Fixed(FixedSchema { size: size_two, .. })
127            ) => {
128                size_one == size_two
129            }
130            (Schema::Fixed(_), _) => false,
131            (
132                Schema::Union(UnionSchema { schemas: schemas_one, ..}),
133                Schema::Union(UnionSchema { schemas: schemas_two, .. })
134            ) => {
135                schemas_one.len() == schemas_two.len()
136                    && schemas_one
137                    .iter()
138                    .zip(schemas_two.iter())
139                    .all(|(s1, s2)| self.compare(s1, s2))
140            }
141            (Schema::Union(_), _) => false,
142            (
143                Schema::Decimal(DecimalSchema { precision: precision_one, scale: scale_one, inner: inner_one }),
144                Schema::Decimal(DecimalSchema { precision: precision_two, scale: scale_two, inner: inner_two })
145            ) => {
146                precision_one == precision_two && scale_one == scale_two && self.compare(inner_one, inner_two)
147            }
148            (Schema::Decimal(_), _) => false,
149            (
150                Schema::Array(ArraySchema { items: items_one, ..}),
151                Schema::Array(ArraySchema { items: items_two, ..})
152            ) => {
153                self.compare(items_one, items_two)
154            }
155            (Schema::Array(_), _) => false,
156            (
157                Schema::Map(MapSchema { types: types_one, ..}),
158                Schema::Map(MapSchema { types: types_two, ..})
159            ) => {
160                self.compare(types_one, types_two)
161            }
162            (Schema::Map(_), _) => false,
163            (
164                Schema::Ref { name: name_one },
165                Schema::Ref { name: name_two }
166            ) => {
167                name_one == name_two
168            }
169            (Schema::Ref { .. }, _) => false,
170        }
171    }
172}
173
174impl StructFieldEq {
175    fn compare_fields(&self, fields_one: &[RecordField], fields_two: &[RecordField]) -> bool {
176        fields_one.len() == fields_two.len()
177            && fields_one
178                .iter()
179                .zip(fields_two.iter())
180                .all(|(f1, f2)| f1.name == f2.name && self.compare(&f1.schema, &f2.schema))
181    }
182}
183
184static SCHEMATA_COMPARATOR_ONCE: OnceLock<Box<dyn SchemataEq>> = OnceLock::new();
185
186/// Sets a custom schemata equality comparator.
187///
188/// Returns a unit if the registration was successful or the already
189/// registered comparator if the registration failed.
190///
191/// **Note**: This function must be called before parsing any schema because this will
192/// register the default comparator and the registration is one time only!
193pub fn set_schemata_equality_comparator(
194    comparator: Box<dyn SchemataEq>,
195) -> Result<(), Box<dyn SchemataEq>> {
196    debug!("Setting a custom schemata equality comparator: {comparator:?}.");
197    SCHEMATA_COMPARATOR_ONCE.set(comparator)
198}
199
200pub(crate) fn compare_schemata(schema_one: &Schema, schema_two: &Schema) -> bool {
201    SCHEMATA_COMPARATOR_ONCE
202        .get_or_init(|| {
203            debug!("Going to use the default schemata equality comparator: StructFieldEq.",);
204            Box::new(StructFieldEq {
205                include_attributes: false,
206            })
207        })
208        .compare(schema_one, schema_two)
209}
210
211#[cfg(test)]
212#[allow(non_snake_case)]
213mod tests {
214    use super::*;
215    use crate::schema::{Name, RecordFieldOrder};
216    use apache_avro_test_helper::TestResult;
217    use serde_json::Value;
218    use std::collections::BTreeMap;
219
220    const SPECIFICATION_EQ: SpecificationEq = SpecificationEq;
221    const STRUCT_FIELD_EQ: StructFieldEq = StructFieldEq {
222        include_attributes: false,
223    };
224
225    macro_rules! test_primitives {
226        ($primitive:ident) => {
227            paste::item! {
228                #[test]
229                fn [<test_avro_3939_compare_schemata_$primitive>]() {
230                    let specification_eq_res = SPECIFICATION_EQ.compare(&Schema::$primitive, &Schema::$primitive);
231                    let struct_field_eq_res = STRUCT_FIELD_EQ.compare(&Schema::$primitive, &Schema::$primitive);
232                    assert_eq!(specification_eq_res, struct_field_eq_res)
233                }
234            }
235        };
236    }
237
238    test_primitives!(Null);
239    test_primitives!(Boolean);
240    test_primitives!(Int);
241    test_primitives!(Long);
242    test_primitives!(Float);
243    test_primitives!(Double);
244    test_primitives!(Bytes);
245    test_primitives!(String);
246    test_primitives!(Uuid);
247    test_primitives!(BigDecimal);
248    test_primitives!(Date);
249    test_primitives!(Duration);
250    test_primitives!(TimeMicros);
251    test_primitives!(TimeMillis);
252    test_primitives!(TimestampMicros);
253    test_primitives!(TimestampMillis);
254    test_primitives!(TimestampNanos);
255    test_primitives!(LocalTimestampMicros);
256    test_primitives!(LocalTimestampMillis);
257    test_primitives!(LocalTimestampNanos);
258
259    #[test]
260    fn test_avro_3939_compare_named_schemata_with_different_names() {
261        let schema_one = Schema::Ref {
262            name: Name::from("name1"),
263        };
264
265        let schema_two = Schema::Ref {
266            name: Name::from("name2"),
267        };
268
269        let specification_eq_res = SPECIFICATION_EQ.compare(&schema_one, &schema_two);
270        assert!(!specification_eq_res);
271        let struct_field_eq_res = STRUCT_FIELD_EQ.compare(&schema_one, &schema_two);
272        assert!(!struct_field_eq_res);
273
274        assert_eq!(specification_eq_res, struct_field_eq_res);
275    }
276
277    #[test]
278    fn test_avro_3939_compare_schemata_not_including_attributes() {
279        let schema_one = Schema::map_with_attributes(
280            Schema::Boolean,
281            BTreeMap::from_iter([("key1".to_string(), Value::Bool(true))]),
282        );
283        let schema_two = Schema::map_with_attributes(
284            Schema::Boolean,
285            BTreeMap::from_iter([("key2".to_string(), Value::Bool(true))]),
286        );
287        // STRUCT_FIELD_EQ does not include attributes !
288        assert!(STRUCT_FIELD_EQ.compare(&schema_one, &schema_two));
289    }
290
291    #[test]
292    fn test_avro_3939_compare_schemata_including_attributes() {
293        let struct_field_eq = StructFieldEq {
294            include_attributes: true,
295        };
296        let schema_one = Schema::map_with_attributes(
297            Schema::Boolean,
298            BTreeMap::from_iter([("key1".to_string(), Value::Bool(true))]),
299        );
300        let schema_two = Schema::map_with_attributes(
301            Schema::Boolean,
302            BTreeMap::from_iter([("key2".to_string(), Value::Bool(true))]),
303        );
304        assert!(!struct_field_eq.compare(&schema_one, &schema_two));
305    }
306
307    #[test]
308    fn test_avro_3939_compare_map_schemata() {
309        let schema_one = Schema::map(Schema::Boolean);
310        assert!(!SPECIFICATION_EQ.compare(&schema_one, &Schema::Boolean));
311        assert!(!STRUCT_FIELD_EQ.compare(&schema_one, &Schema::Boolean));
312
313        let schema_two = Schema::map(Schema::Boolean);
314
315        let specification_eq_res = SPECIFICATION_EQ.compare(&schema_one, &schema_two);
316        let struct_field_eq_res = STRUCT_FIELD_EQ.compare(&schema_one, &schema_two);
317        assert!(
318            specification_eq_res,
319            "SpecificationEq: Equality of two Schema::Map failed!"
320        );
321        assert!(
322            struct_field_eq_res,
323            "StructFieldEq: Equality of two Schema::Map failed!"
324        );
325        assert_eq!(specification_eq_res, struct_field_eq_res);
326    }
327
328    #[test]
329    fn test_avro_3939_compare_array_schemata() {
330        let schema_one = Schema::array(Schema::Boolean);
331        assert!(!SPECIFICATION_EQ.compare(&schema_one, &Schema::Boolean));
332        assert!(!STRUCT_FIELD_EQ.compare(&schema_one, &Schema::Boolean));
333
334        let schema_two = Schema::array(Schema::Boolean);
335
336        let specification_eq_res = SPECIFICATION_EQ.compare(&schema_one, &schema_two);
337        let struct_field_eq_res = STRUCT_FIELD_EQ.compare(&schema_one, &schema_two);
338        assert!(
339            specification_eq_res,
340            "SpecificationEq: Equality of two Schema::Array failed!"
341        );
342        assert!(
343            struct_field_eq_res,
344            "StructFieldEq: Equality of two Schema::Array failed!"
345        );
346        assert_eq!(specification_eq_res, struct_field_eq_res);
347    }
348
349    #[test]
350    fn test_avro_3939_compare_decimal_schemata() {
351        let schema_one = Schema::Decimal(DecimalSchema {
352            precision: 10,
353            scale: 2,
354            inner: Box::new(Schema::Bytes),
355        });
356        assert!(!SPECIFICATION_EQ.compare(&schema_one, &Schema::Boolean));
357        assert!(!STRUCT_FIELD_EQ.compare(&schema_one, &Schema::Boolean));
358
359        let schema_two = Schema::Decimal(DecimalSchema {
360            precision: 10,
361            scale: 2,
362            inner: Box::new(Schema::Bytes),
363        });
364
365        let specification_eq_res = SPECIFICATION_EQ.compare(&schema_one, &schema_two);
366        let struct_field_eq_res = STRUCT_FIELD_EQ.compare(&schema_one, &schema_two);
367        assert!(
368            specification_eq_res,
369            "SpecificationEq: Equality of two Schema::Decimal failed!"
370        );
371        assert!(
372            struct_field_eq_res,
373            "StructFieldEq: Equality of two Schema::Decimal failed!"
374        );
375        assert_eq!(specification_eq_res, struct_field_eq_res);
376    }
377
378    #[test]
379    fn test_avro_3939_compare_fixed_schemata() {
380        let schema_one = Schema::Fixed(FixedSchema {
381            name: Name::from("fixed"),
382            doc: None,
383            size: 10,
384            default: None,
385            aliases: None,
386            attributes: BTreeMap::new(),
387        });
388        assert!(!SPECIFICATION_EQ.compare(&schema_one, &Schema::Boolean));
389        assert!(!STRUCT_FIELD_EQ.compare(&schema_one, &Schema::Boolean));
390
391        let schema_two = Schema::Fixed(FixedSchema {
392            name: Name::from("fixed"),
393            doc: None,
394            size: 10,
395            default: None,
396            aliases: None,
397            attributes: BTreeMap::new(),
398        });
399
400        let specification_eq_res = SPECIFICATION_EQ.compare(&schema_one, &schema_two);
401        let struct_field_eq_res = STRUCT_FIELD_EQ.compare(&schema_one, &schema_two);
402        assert!(
403            specification_eq_res,
404            "SpecificationEq: Equality of two Schema::Fixed failed!"
405        );
406        assert!(
407            struct_field_eq_res,
408            "StructFieldEq: Equality of two Schema::Fixed failed!"
409        );
410        assert_eq!(specification_eq_res, struct_field_eq_res);
411    }
412
413    #[test]
414    fn test_avro_3939_compare_enum_schemata() {
415        let schema_one = Schema::Enum(EnumSchema {
416            name: Name::from("enum"),
417            doc: None,
418            symbols: vec!["A".to_string(), "B".to_string()],
419            default: None,
420            aliases: None,
421            attributes: BTreeMap::new(),
422        });
423        assert!(!SPECIFICATION_EQ.compare(&schema_one, &Schema::Boolean));
424        assert!(!STRUCT_FIELD_EQ.compare(&schema_one, &Schema::Boolean));
425
426        let schema_two = Schema::Enum(EnumSchema {
427            name: Name::from("enum"),
428            doc: None,
429            symbols: vec!["A".to_string(), "B".to_string()],
430            default: None,
431            aliases: None,
432            attributes: BTreeMap::new(),
433        });
434
435        let specification_eq_res = SPECIFICATION_EQ.compare(&schema_one, &schema_two);
436        let struct_field_eq_res = STRUCT_FIELD_EQ.compare(&schema_one, &schema_two);
437        assert!(
438            specification_eq_res,
439            "SpecificationEq: Equality of two Schema::Enum failed!"
440        );
441        assert!(
442            struct_field_eq_res,
443            "StructFieldEq: Equality of two Schema::Enum failed!"
444        );
445        assert_eq!(specification_eq_res, struct_field_eq_res);
446    }
447
448    #[test]
449    fn test_avro_3939_compare_ref_schemata() {
450        let schema_one = Schema::Ref {
451            name: Name::from("ref"),
452        };
453        assert!(!SPECIFICATION_EQ.compare(&schema_one, &Schema::Boolean));
454        assert!(!STRUCT_FIELD_EQ.compare(&schema_one, &Schema::Boolean));
455
456        let schema_two = Schema::Ref {
457            name: Name::from("ref"),
458        };
459
460        let specification_eq_res = SPECIFICATION_EQ.compare(&schema_one, &schema_two);
461        let struct_field_eq_res = STRUCT_FIELD_EQ.compare(&schema_one, &schema_two);
462        assert!(
463            specification_eq_res,
464            "SpecificationEq: Equality of two Schema::Ref failed!"
465        );
466        assert!(
467            struct_field_eq_res,
468            "StructFieldEq: Equality of two Schema::Ref failed!"
469        );
470        assert_eq!(specification_eq_res, struct_field_eq_res);
471    }
472
473    #[test]
474    fn test_avro_3939_compare_record_schemata() {
475        let schema_one = Schema::Record(RecordSchema {
476            name: Name::from("record"),
477            doc: None,
478            fields: vec![RecordField {
479                name: "field".to_string(),
480                doc: None,
481                default: None,
482                schema: Schema::Boolean,
483                order: RecordFieldOrder::Ignore,
484                aliases: None,
485                custom_attributes: BTreeMap::new(),
486                position: 0,
487            }],
488            aliases: None,
489            attributes: BTreeMap::new(),
490            lookup: Default::default(),
491        });
492        assert!(!SPECIFICATION_EQ.compare(&schema_one, &Schema::Boolean));
493        assert!(!STRUCT_FIELD_EQ.compare(&schema_one, &Schema::Boolean));
494
495        let schema_two = Schema::Record(RecordSchema {
496            name: Name::from("record"),
497            doc: None,
498            fields: vec![RecordField {
499                name: "field".to_string(),
500                doc: None,
501                default: None,
502                schema: Schema::Boolean,
503                order: RecordFieldOrder::Ignore,
504                aliases: None,
505                custom_attributes: BTreeMap::new(),
506                position: 0,
507            }],
508            aliases: None,
509            attributes: BTreeMap::new(),
510            lookup: Default::default(),
511        });
512
513        let specification_eq_res = SPECIFICATION_EQ.compare(&schema_one, &schema_two);
514        let struct_field_eq_res = STRUCT_FIELD_EQ.compare(&schema_one, &schema_two);
515        assert!(
516            specification_eq_res,
517            "SpecificationEq: Equality of two Schema::Record failed!"
518        );
519        assert!(
520            struct_field_eq_res,
521            "StructFieldEq: Equality of two Schema::Record failed!"
522        );
523        assert_eq!(specification_eq_res, struct_field_eq_res);
524    }
525
526    #[test]
527    fn test_avro_3939_compare_union_schemata() -> TestResult {
528        let schema_one = Schema::Union(UnionSchema::new(vec![Schema::Boolean, Schema::Int])?);
529        assert!(!SPECIFICATION_EQ.compare(&schema_one, &Schema::Boolean));
530        assert!(!STRUCT_FIELD_EQ.compare(&schema_one, &Schema::Boolean));
531
532        let schema_two = Schema::Union(UnionSchema::new(vec![Schema::Boolean, Schema::Int])?);
533
534        let specification_eq_res = SPECIFICATION_EQ.compare(&schema_one, &schema_two);
535        let struct_field_eq_res = STRUCT_FIELD_EQ.compare(&schema_one, &schema_two);
536        assert!(
537            specification_eq_res,
538            "SpecificationEq: Equality of two Schema::Union failed!"
539        );
540        assert!(
541            struct_field_eq_res,
542            "StructFieldEq: Equality of two Schema::Union failed!"
543        );
544        assert_eq!(specification_eq_res, struct_field_eq_res);
545        Ok(())
546    }
547}