1use crate::{
19 schema::{
20 ArraySchema, DecimalSchema, EnumSchema, FixedSchema, MapSchema, RecordField, RecordSchema,
21 UnionSchema,
22 },
23 Schema,
24};
25use log::{debug, error};
26use std::{fmt::Debug, sync::OnceLock};
27
28pub trait SchemataEq: Debug + Send + Sync {
31 fn compare(&self, schema_one: &Schema, schema_two: &Schema) -> bool;
33}
34
35#[derive(Debug)]
39pub struct SpecificationEq;
40impl SchemataEq for SpecificationEq {
41 fn compare(&self, schema_one: &Schema, schema_two: &Schema) -> bool {
42 schema_one.canonical_form() == schema_two.canonical_form()
43 }
44}
45
46#[derive(Debug)]
50pub struct StructFieldEq {
51 pub include_attributes: bool,
54}
55
56impl SchemataEq for StructFieldEq {
57 fn compare(&self, schema_one: &Schema, schema_two: &Schema) -> bool {
58 macro_rules! compare_primitive {
59 ($primitive:ident) => {
60 if let Schema::$primitive = schema_one {
61 if let Schema::$primitive = schema_two {
62 return true;
63 }
64 return false;
65 }
66 };
67 }
68
69 if schema_one.name() != schema_two.name() {
70 return false;
71 }
72
73 compare_primitive!(Null);
74 compare_primitive!(Boolean);
75 compare_primitive!(Int);
76 compare_primitive!(Int);
77 compare_primitive!(Long);
78 compare_primitive!(Float);
79 compare_primitive!(Double);
80 compare_primitive!(Bytes);
81 compare_primitive!(String);
82 compare_primitive!(Uuid);
83 compare_primitive!(BigDecimal);
84 compare_primitive!(Date);
85 compare_primitive!(Duration);
86 compare_primitive!(TimeMicros);
87 compare_primitive!(TimeMillis);
88 compare_primitive!(TimestampMicros);
89 compare_primitive!(TimestampMillis);
90 compare_primitive!(TimestampNanos);
91 compare_primitive!(LocalTimestampMicros);
92 compare_primitive!(LocalTimestampMillis);
93 compare_primitive!(LocalTimestampNanos);
94
95 if self.include_attributes
96 && schema_one.custom_attributes() != schema_two.custom_attributes()
97 {
98 return false;
99 }
100
101 if let Schema::Record(RecordSchema {
102 fields: fields_one, ..
103 }) = schema_one
104 {
105 if let Schema::Record(RecordSchema {
106 fields: fields_two, ..
107 }) = schema_two
108 {
109 return self.compare_fields(fields_one, fields_two);
110 }
111 return false;
112 }
113
114 if let Schema::Enum(EnumSchema {
115 symbols: symbols_one,
116 ..
117 }) = schema_one
118 {
119 if let Schema::Enum(EnumSchema {
120 symbols: symbols_two,
121 ..
122 }) = schema_two
123 {
124 return symbols_one == symbols_two;
125 }
126 return false;
127 }
128
129 if let Schema::Fixed(FixedSchema { size: size_one, .. }) = schema_one {
130 if let Schema::Fixed(FixedSchema { size: size_two, .. }) = schema_two {
131 return size_one == size_two;
132 }
133 return false;
134 }
135
136 if let Schema::Union(UnionSchema {
137 schemas: schemas_one,
138 ..
139 }) = schema_one
140 {
141 if let Schema::Union(UnionSchema {
142 schemas: schemas_two,
143 ..
144 }) = schema_two
145 {
146 return schemas_one.len() == schemas_two.len()
147 && schemas_one
148 .iter()
149 .zip(schemas_two.iter())
150 .all(|(s1, s2)| self.compare(s1, s2));
151 }
152 return false;
153 }
154
155 if let Schema::Decimal(DecimalSchema {
156 precision: precision_one,
157 scale: scale_one,
158 ..
159 }) = schema_one
160 {
161 if let Schema::Decimal(DecimalSchema {
162 precision: precision_two,
163 scale: scale_two,
164 ..
165 }) = schema_two
166 {
167 return precision_one == precision_two && scale_one == scale_two;
168 }
169 return false;
170 }
171
172 if let Schema::Array(ArraySchema {
173 items: items_one, ..
174 }) = schema_one
175 {
176 if let Schema::Array(ArraySchema {
177 items: items_two, ..
178 }) = schema_two
179 {
180 return items_one == items_two;
181 }
182 return false;
183 }
184
185 if let Schema::Map(MapSchema {
186 types: types_one, ..
187 }) = schema_one
188 {
189 if let Schema::Map(MapSchema {
190 types: types_two, ..
191 }) = schema_two
192 {
193 return self.compare(types_one, types_two);
194 }
195 return false;
196 }
197
198 if let Schema::Ref { name: name_one } = schema_one {
199 if let Schema::Ref { name: name_two } = schema_two {
200 return name_one == name_two;
201 }
202 return false;
203 }
204
205 error!(
206 "This is a bug in schema_equality.rs! The following schemata types are not checked! \
207 Please report it to the Avro library maintainers! \
208 \n{:?}\n\n{:?}",
209 schema_one, schema_two
210 );
211 false
212 }
213}
214
215impl StructFieldEq {
216 fn compare_fields(&self, fields_one: &[RecordField], fields_two: &[RecordField]) -> bool {
217 fields_one.len() == fields_two.len()
218 && fields_one
219 .iter()
220 .zip(fields_two.iter())
221 .all(|(f1, f2)| self.compare(&f1.schema, &f2.schema))
222 }
223}
224
225static SCHEMATA_COMPARATOR_ONCE: OnceLock<Box<dyn SchemataEq>> = OnceLock::new();
226
227pub fn set_schemata_equality_comparator(
235 comparator: Box<dyn SchemataEq>,
236) -> Result<(), Box<dyn SchemataEq>> {
237 debug!(
238 "Setting a custom schemata equality comparator: {:?}.",
239 comparator
240 );
241 SCHEMATA_COMPARATOR_ONCE.set(comparator)
242}
243
244pub(crate) fn compare_schemata(schema_one: &Schema, schema_two: &Schema) -> bool {
245 SCHEMATA_COMPARATOR_ONCE
246 .get_or_init(|| {
247 debug!("Going to use the default schemata equality comparator: SpecificationEq.",);
248 Box::new(StructFieldEq {
249 include_attributes: false,
250 })
251 })
252 .compare(schema_one, schema_two)
253}
254
255#[cfg(test)]
256#[allow(non_snake_case)]
257mod tests {
258 use super::*;
259 use crate::schema::{Name, RecordFieldOrder};
260 use apache_avro_test_helper::TestResult;
261 use serde_json::Value;
262 use std::collections::BTreeMap;
263
264 const SPECIFICATION_EQ: SpecificationEq = SpecificationEq;
265 const STRUCT_FIELD_EQ: StructFieldEq = StructFieldEq {
266 include_attributes: false,
267 };
268
269 macro_rules! test_primitives {
270 ($primitive:ident) => {
271 paste::item! {
272 #[test]
273 fn [<test_avro_3939_compare_schemata_$primitive>]() {
274 let specification_eq_res = SPECIFICATION_EQ.compare(&Schema::$primitive, &Schema::$primitive);
275 let struct_field_eq_res = STRUCT_FIELD_EQ.compare(&Schema::$primitive, &Schema::$primitive);
276 assert_eq!(specification_eq_res, struct_field_eq_res)
277 }
278 }
279 };
280 }
281
282 test_primitives!(Null);
283 test_primitives!(Boolean);
284 test_primitives!(Int);
285 test_primitives!(Long);
286 test_primitives!(Float);
287 test_primitives!(Double);
288 test_primitives!(Bytes);
289 test_primitives!(String);
290 test_primitives!(Uuid);
291 test_primitives!(BigDecimal);
292 test_primitives!(Date);
293 test_primitives!(Duration);
294 test_primitives!(TimeMicros);
295 test_primitives!(TimeMillis);
296 test_primitives!(TimestampMicros);
297 test_primitives!(TimestampMillis);
298 test_primitives!(TimestampNanos);
299 test_primitives!(LocalTimestampMicros);
300 test_primitives!(LocalTimestampMillis);
301 test_primitives!(LocalTimestampNanos);
302
303 #[test]
304 fn test_avro_3939_compare_named_schemata_with_different_names() {
305 let schema_one = Schema::Ref {
306 name: Name::from("name1"),
307 };
308
309 let schema_two = Schema::Ref {
310 name: Name::from("name2"),
311 };
312
313 let specification_eq_res = SPECIFICATION_EQ.compare(&schema_one, &schema_two);
314 assert!(!specification_eq_res);
315 let struct_field_eq_res = STRUCT_FIELD_EQ.compare(&schema_one, &schema_two);
316 assert!(!struct_field_eq_res);
317
318 assert_eq!(specification_eq_res, struct_field_eq_res);
319 }
320
321 #[test]
322 fn test_avro_3939_compare_schemata_not_including_attributes() {
323 let schema_one = Schema::map_with_attributes(
324 Schema::Boolean,
325 BTreeMap::from_iter([("key1".to_string(), Value::Bool(true))]),
326 );
327 let schema_two = Schema::map_with_attributes(
328 Schema::Boolean,
329 BTreeMap::from_iter([("key2".to_string(), Value::Bool(true))]),
330 );
331 assert!(STRUCT_FIELD_EQ.compare(&schema_one, &schema_two));
333 }
334
335 #[test]
336 fn test_avro_3939_compare_schemata_including_attributes() {
337 let struct_field_eq = StructFieldEq {
338 include_attributes: true,
339 };
340 let schema_one = Schema::map_with_attributes(
341 Schema::Boolean,
342 BTreeMap::from_iter([("key1".to_string(), Value::Bool(true))]),
343 );
344 let schema_two = Schema::map_with_attributes(
345 Schema::Boolean,
346 BTreeMap::from_iter([("key2".to_string(), Value::Bool(true))]),
347 );
348 assert!(!struct_field_eq.compare(&schema_one, &schema_two));
349 }
350
351 #[test]
352 fn test_avro_3939_compare_map_schemata() {
353 let schema_one = Schema::map(Schema::Boolean);
354 assert!(!SPECIFICATION_EQ.compare(&schema_one, &Schema::Boolean));
355 assert!(!STRUCT_FIELD_EQ.compare(&schema_one, &Schema::Boolean));
356
357 let schema_two = Schema::map(Schema::Boolean);
358
359 let specification_eq_res = SPECIFICATION_EQ.compare(&schema_one, &schema_two);
360 let struct_field_eq_res = STRUCT_FIELD_EQ.compare(&schema_one, &schema_two);
361 assert!(
362 specification_eq_res,
363 "SpecificationEq: Equality of two Schema::Map failed!"
364 );
365 assert!(
366 struct_field_eq_res,
367 "StructFieldEq: Equality of two Schema::Map failed!"
368 );
369 assert_eq!(specification_eq_res, struct_field_eq_res);
370 }
371
372 #[test]
373 fn test_avro_3939_compare_array_schemata() {
374 let schema_one = Schema::array(Schema::Boolean);
375 assert!(!SPECIFICATION_EQ.compare(&schema_one, &Schema::Boolean));
376 assert!(!STRUCT_FIELD_EQ.compare(&schema_one, &Schema::Boolean));
377
378 let schema_two = Schema::array(Schema::Boolean);
379
380 let specification_eq_res = SPECIFICATION_EQ.compare(&schema_one, &schema_two);
381 let struct_field_eq_res = STRUCT_FIELD_EQ.compare(&schema_one, &schema_two);
382 assert!(
383 specification_eq_res,
384 "SpecificationEq: Equality of two Schema::Array failed!"
385 );
386 assert!(
387 struct_field_eq_res,
388 "StructFieldEq: Equality of two Schema::Array failed!"
389 );
390 assert_eq!(specification_eq_res, struct_field_eq_res);
391 }
392
393 #[test]
394 fn test_avro_3939_compare_decimal_schemata() {
395 let schema_one = Schema::Decimal(DecimalSchema {
396 precision: 10,
397 scale: 2,
398 inner: Box::new(Schema::Bytes),
399 });
400 assert!(!SPECIFICATION_EQ.compare(&schema_one, &Schema::Boolean));
401 assert!(!STRUCT_FIELD_EQ.compare(&schema_one, &Schema::Boolean));
402
403 let schema_two = Schema::Decimal(DecimalSchema {
404 precision: 10,
405 scale: 2,
406 inner: Box::new(Schema::Bytes),
407 });
408
409 let specification_eq_res = SPECIFICATION_EQ.compare(&schema_one, &schema_two);
410 let struct_field_eq_res = STRUCT_FIELD_EQ.compare(&schema_one, &schema_two);
411 assert!(
412 specification_eq_res,
413 "SpecificationEq: Equality of two Schema::Decimal failed!"
414 );
415 assert!(
416 struct_field_eq_res,
417 "StructFieldEq: Equality of two Schema::Decimal failed!"
418 );
419 assert_eq!(specification_eq_res, struct_field_eq_res);
420 }
421
422 #[test]
423 fn test_avro_3939_compare_fixed_schemata() {
424 let schema_one = Schema::Fixed(FixedSchema {
425 name: Name::from("fixed"),
426 doc: None,
427 size: 10,
428 default: None,
429 aliases: None,
430 attributes: BTreeMap::new(),
431 });
432 assert!(!SPECIFICATION_EQ.compare(&schema_one, &Schema::Boolean));
433 assert!(!STRUCT_FIELD_EQ.compare(&schema_one, &Schema::Boolean));
434
435 let schema_two = Schema::Fixed(FixedSchema {
436 name: Name::from("fixed"),
437 doc: None,
438 size: 10,
439 default: None,
440 aliases: None,
441 attributes: BTreeMap::new(),
442 });
443
444 let specification_eq_res = SPECIFICATION_EQ.compare(&schema_one, &schema_two);
445 let struct_field_eq_res = STRUCT_FIELD_EQ.compare(&schema_one, &schema_two);
446 assert!(
447 specification_eq_res,
448 "SpecificationEq: Equality of two Schema::Fixed failed!"
449 );
450 assert!(
451 struct_field_eq_res,
452 "StructFieldEq: Equality of two Schema::Fixed failed!"
453 );
454 assert_eq!(specification_eq_res, struct_field_eq_res);
455 }
456
457 #[test]
458 fn test_avro_3939_compare_enum_schemata() {
459 let schema_one = Schema::Enum(EnumSchema {
460 name: Name::from("enum"),
461 doc: None,
462 symbols: vec!["A".to_string(), "B".to_string()],
463 default: None,
464 aliases: None,
465 attributes: BTreeMap::new(),
466 });
467 assert!(!SPECIFICATION_EQ.compare(&schema_one, &Schema::Boolean));
468 assert!(!STRUCT_FIELD_EQ.compare(&schema_one, &Schema::Boolean));
469
470 let schema_two = Schema::Enum(EnumSchema {
471 name: Name::from("enum"),
472 doc: None,
473 symbols: vec!["A".to_string(), "B".to_string()],
474 default: None,
475 aliases: None,
476 attributes: BTreeMap::new(),
477 });
478
479 let specification_eq_res = SPECIFICATION_EQ.compare(&schema_one, &schema_two);
480 let struct_field_eq_res = STRUCT_FIELD_EQ.compare(&schema_one, &schema_two);
481 assert!(
482 specification_eq_res,
483 "SpecificationEq: Equality of two Schema::Enum failed!"
484 );
485 assert!(
486 struct_field_eq_res,
487 "StructFieldEq: Equality of two Schema::Enum failed!"
488 );
489 assert_eq!(specification_eq_res, struct_field_eq_res);
490 }
491
492 #[test]
493 fn test_avro_3939_compare_ref_schemata() {
494 let schema_one = Schema::Ref {
495 name: Name::from("ref"),
496 };
497 assert!(!SPECIFICATION_EQ.compare(&schema_one, &Schema::Boolean));
498 assert!(!STRUCT_FIELD_EQ.compare(&schema_one, &Schema::Boolean));
499
500 let schema_two = Schema::Ref {
501 name: Name::from("ref"),
502 };
503
504 let specification_eq_res = SPECIFICATION_EQ.compare(&schema_one, &schema_two);
505 let struct_field_eq_res = STRUCT_FIELD_EQ.compare(&schema_one, &schema_two);
506 assert!(
507 specification_eq_res,
508 "SpecificationEq: Equality of two Schema::Ref failed!"
509 );
510 assert!(
511 struct_field_eq_res,
512 "StructFieldEq: Equality of two Schema::Ref failed!"
513 );
514 assert_eq!(specification_eq_res, struct_field_eq_res);
515 }
516
517 #[test]
518 fn test_avro_3939_compare_record_schemata() {
519 let schema_one = Schema::Record(RecordSchema {
520 name: Name::from("record"),
521 doc: None,
522 fields: vec![RecordField {
523 name: "field".to_string(),
524 doc: None,
525 default: None,
526 schema: Schema::Boolean,
527 order: RecordFieldOrder::Ignore,
528 aliases: None,
529 custom_attributes: BTreeMap::new(),
530 position: 0,
531 }],
532 aliases: None,
533 attributes: BTreeMap::new(),
534 lookup: Default::default(),
535 });
536 assert!(!SPECIFICATION_EQ.compare(&schema_one, &Schema::Boolean));
537 assert!(!STRUCT_FIELD_EQ.compare(&schema_one, &Schema::Boolean));
538
539 let schema_two = Schema::Record(RecordSchema {
540 name: Name::from("record"),
541 doc: None,
542 fields: vec![RecordField {
543 name: "field".to_string(),
544 doc: None,
545 default: None,
546 schema: Schema::Boolean,
547 order: RecordFieldOrder::Ignore,
548 aliases: None,
549 custom_attributes: BTreeMap::new(),
550 position: 0,
551 }],
552 aliases: None,
553 attributes: BTreeMap::new(),
554 lookup: Default::default(),
555 });
556
557 let specification_eq_res = SPECIFICATION_EQ.compare(&schema_one, &schema_two);
558 let struct_field_eq_res = STRUCT_FIELD_EQ.compare(&schema_one, &schema_two);
559 assert!(
560 specification_eq_res,
561 "SpecificationEq: Equality of two Schema::Record failed!"
562 );
563 assert!(
564 struct_field_eq_res,
565 "StructFieldEq: Equality of two Schema::Record failed!"
566 );
567 assert_eq!(specification_eq_res, struct_field_eq_res);
568 }
569
570 #[test]
571 fn test_avro_3939_compare_union_schemata() -> TestResult {
572 let schema_one = Schema::Union(UnionSchema::new(vec![Schema::Boolean, Schema::Int])?);
573 assert!(!SPECIFICATION_EQ.compare(&schema_one, &Schema::Boolean));
574 assert!(!STRUCT_FIELD_EQ.compare(&schema_one, &Schema::Boolean));
575
576 let schema_two = Schema::Union(UnionSchema::new(vec![Schema::Boolean, Schema::Int])?);
577
578 let specification_eq_res = SPECIFICATION_EQ.compare(&schema_one, &schema_two);
579 let struct_field_eq_res = STRUCT_FIELD_EQ.compare(&schema_one, &schema_two);
580 assert!(
581 specification_eq_res,
582 "SpecificationEq: Equality of two Schema::Union failed!"
583 );
584 assert!(
585 struct_field_eq_res,
586 "StructFieldEq: Equality of two Schema::Union failed!"
587 );
588 assert_eq!(specification_eq_res, struct_field_eq_res);
589 Ok(())
590 }
591}