1use crate::{
19 schema::{
20 ArraySchema, DecimalSchema, EnumSchema, FixedSchema, MapSchema, RecordField, RecordSchema,
21 UnionSchema,
22 },
23 Schema,
24};
25use log::{debug, error};
26use std::{fmt::Debug, sync::OnceLock};
27
28pub trait SchemataEq: Debug + Send + Sync {
31 fn compare(&self, schema_one: &Schema, schema_two: &Schema) -> bool;
33}
34
35#[derive(Debug)]
39pub struct SpecificationEq;
40impl SchemataEq for SpecificationEq {
41 fn compare(&self, schema_one: &Schema, schema_two: &Schema) -> bool {
42 schema_one.canonical_form() == schema_two.canonical_form()
43 }
44}
45
46#[derive(Debug)]
50pub struct StructFieldEq {
51 pub include_attributes: bool,
54}
55
56impl SchemataEq for StructFieldEq {
57 fn compare(&self, schema_one: &Schema, schema_two: &Schema) -> bool {
58 macro_rules! compare_primitive {
59 ($primitive:ident) => {
60 if let Schema::$primitive = schema_one {
61 if let Schema::$primitive = schema_two {
62 return true;
63 }
64 return false;
65 }
66 };
67 }
68
69 if schema_one.name() != schema_two.name() {
70 return false;
71 }
72
73 compare_primitive!(Null);
74 compare_primitive!(Boolean);
75 compare_primitive!(Int);
76 compare_primitive!(Int);
77 compare_primitive!(Long);
78 compare_primitive!(Float);
79 compare_primitive!(Double);
80 compare_primitive!(Bytes);
81 compare_primitive!(String);
82 compare_primitive!(Uuid);
83 compare_primitive!(BigDecimal);
84 compare_primitive!(Date);
85 compare_primitive!(Duration);
86 compare_primitive!(TimeMicros);
87 compare_primitive!(TimeMillis);
88 compare_primitive!(TimestampMicros);
89 compare_primitive!(TimestampMillis);
90 compare_primitive!(TimestampNanos);
91 compare_primitive!(LocalTimestampMicros);
92 compare_primitive!(LocalTimestampMillis);
93 compare_primitive!(LocalTimestampNanos);
94
95 if self.include_attributes
96 && schema_one.custom_attributes() != schema_two.custom_attributes()
97 {
98 return false;
99 }
100
101 if let Schema::Record(RecordSchema {
102 fields: fields_one, ..
103 }) = schema_one
104 {
105 if let Schema::Record(RecordSchema {
106 fields: fields_two, ..
107 }) = schema_two
108 {
109 return self.compare_fields(fields_one, fields_two);
110 }
111 return false;
112 }
113
114 if let Schema::Enum(EnumSchema {
115 symbols: symbols_one,
116 ..
117 }) = schema_one
118 {
119 if let Schema::Enum(EnumSchema {
120 symbols: symbols_two,
121 ..
122 }) = schema_two
123 {
124 return symbols_one == symbols_two;
125 }
126 return false;
127 }
128
129 if let Schema::Fixed(FixedSchema { size: size_one, .. }) = schema_one {
130 if let Schema::Fixed(FixedSchema { size: size_two, .. }) = schema_two {
131 return size_one == size_two;
132 }
133 return false;
134 }
135
136 if let Schema::Union(UnionSchema {
137 schemas: schemas_one,
138 ..
139 }) = schema_one
140 {
141 if let Schema::Union(UnionSchema {
142 schemas: schemas_two,
143 ..
144 }) = schema_two
145 {
146 return schemas_one.len() == schemas_two.len()
147 && schemas_one
148 .iter()
149 .zip(schemas_two.iter())
150 .all(|(s1, s2)| self.compare(s1, s2));
151 }
152 return false;
153 }
154
155 if let Schema::Decimal(DecimalSchema {
156 precision: precision_one,
157 scale: scale_one,
158 ..
159 }) = schema_one
160 {
161 if let Schema::Decimal(DecimalSchema {
162 precision: precision_two,
163 scale: scale_two,
164 ..
165 }) = schema_two
166 {
167 return precision_one == precision_two && scale_one == scale_two;
168 }
169 return false;
170 }
171
172 if let Schema::Array(ArraySchema {
173 items: items_one, ..
174 }) = schema_one
175 {
176 if let Schema::Array(ArraySchema {
177 items: items_two, ..
178 }) = schema_two
179 {
180 return items_one == items_two;
181 }
182 return false;
183 }
184
185 if let Schema::Map(MapSchema {
186 types: types_one, ..
187 }) = schema_one
188 {
189 if let Schema::Map(MapSchema {
190 types: types_two, ..
191 }) = schema_two
192 {
193 return self.compare(types_one, types_two);
194 }
195 return false;
196 }
197
198 if let Schema::Ref { name: name_one } = schema_one {
199 if let Schema::Ref { name: name_two } = schema_two {
200 return name_one == name_two;
201 }
202 return false;
203 }
204
205 error!(
206 "This is a bug in schema_equality.rs! The following schemata types are not checked! \
207 Please report it to the Avro library maintainers! \
208 \n{schema_one:?}\n\n{schema_two:?}"
209 );
210 false
211 }
212}
213
214impl StructFieldEq {
215 fn compare_fields(&self, fields_one: &[RecordField], fields_two: &[RecordField]) -> bool {
216 fields_one.len() == fields_two.len()
217 && fields_one
218 .iter()
219 .zip(fields_two.iter())
220 .all(|(f1, f2)| self.compare(&f1.schema, &f2.schema))
221 }
222}
223
224static SCHEMATA_COMPARATOR_ONCE: OnceLock<Box<dyn SchemataEq>> = OnceLock::new();
225
226pub fn set_schemata_equality_comparator(
234 comparator: Box<dyn SchemataEq>,
235) -> Result<(), Box<dyn SchemataEq>> {
236 debug!("Setting a custom schemata equality comparator: {comparator:?}.");
237 SCHEMATA_COMPARATOR_ONCE.set(comparator)
238}
239
240pub(crate) fn compare_schemata(schema_one: &Schema, schema_two: &Schema) -> bool {
241 SCHEMATA_COMPARATOR_ONCE
242 .get_or_init(|| {
243 debug!("Going to use the default schemata equality comparator: SpecificationEq.",);
244 Box::new(StructFieldEq {
245 include_attributes: false,
246 })
247 })
248 .compare(schema_one, schema_two)
249}
250
251#[cfg(test)]
252#[allow(non_snake_case)]
253mod tests {
254 use super::*;
255 use crate::schema::{Name, RecordFieldOrder};
256 use apache_avro_test_helper::TestResult;
257 use serde_json::Value;
258 use std::collections::BTreeMap;
259
260 const SPECIFICATION_EQ: SpecificationEq = SpecificationEq;
261 const STRUCT_FIELD_EQ: StructFieldEq = StructFieldEq {
262 include_attributes: false,
263 };
264
265 macro_rules! test_primitives {
266 ($primitive:ident) => {
267 paste::item! {
268 #[test]
269 fn [<test_avro_3939_compare_schemata_$primitive>]() {
270 let specification_eq_res = SPECIFICATION_EQ.compare(&Schema::$primitive, &Schema::$primitive);
271 let struct_field_eq_res = STRUCT_FIELD_EQ.compare(&Schema::$primitive, &Schema::$primitive);
272 assert_eq!(specification_eq_res, struct_field_eq_res)
273 }
274 }
275 };
276 }
277
278 test_primitives!(Null);
279 test_primitives!(Boolean);
280 test_primitives!(Int);
281 test_primitives!(Long);
282 test_primitives!(Float);
283 test_primitives!(Double);
284 test_primitives!(Bytes);
285 test_primitives!(String);
286 test_primitives!(Uuid);
287 test_primitives!(BigDecimal);
288 test_primitives!(Date);
289 test_primitives!(Duration);
290 test_primitives!(TimeMicros);
291 test_primitives!(TimeMillis);
292 test_primitives!(TimestampMicros);
293 test_primitives!(TimestampMillis);
294 test_primitives!(TimestampNanos);
295 test_primitives!(LocalTimestampMicros);
296 test_primitives!(LocalTimestampMillis);
297 test_primitives!(LocalTimestampNanos);
298
299 #[test]
300 fn test_avro_3939_compare_named_schemata_with_different_names() {
301 let schema_one = Schema::Ref {
302 name: Name::from("name1"),
303 };
304
305 let schema_two = Schema::Ref {
306 name: Name::from("name2"),
307 };
308
309 let specification_eq_res = SPECIFICATION_EQ.compare(&schema_one, &schema_two);
310 assert!(!specification_eq_res);
311 let struct_field_eq_res = STRUCT_FIELD_EQ.compare(&schema_one, &schema_two);
312 assert!(!struct_field_eq_res);
313
314 assert_eq!(specification_eq_res, struct_field_eq_res);
315 }
316
317 #[test]
318 fn test_avro_3939_compare_schemata_not_including_attributes() {
319 let schema_one = Schema::map_with_attributes(
320 Schema::Boolean,
321 BTreeMap::from_iter([("key1".to_string(), Value::Bool(true))]),
322 );
323 let schema_two = Schema::map_with_attributes(
324 Schema::Boolean,
325 BTreeMap::from_iter([("key2".to_string(), Value::Bool(true))]),
326 );
327 assert!(STRUCT_FIELD_EQ.compare(&schema_one, &schema_two));
329 }
330
331 #[test]
332 fn test_avro_3939_compare_schemata_including_attributes() {
333 let struct_field_eq = StructFieldEq {
334 include_attributes: true,
335 };
336 let schema_one = Schema::map_with_attributes(
337 Schema::Boolean,
338 BTreeMap::from_iter([("key1".to_string(), Value::Bool(true))]),
339 );
340 let schema_two = Schema::map_with_attributes(
341 Schema::Boolean,
342 BTreeMap::from_iter([("key2".to_string(), Value::Bool(true))]),
343 );
344 assert!(!struct_field_eq.compare(&schema_one, &schema_two));
345 }
346
347 #[test]
348 fn test_avro_3939_compare_map_schemata() {
349 let schema_one = Schema::map(Schema::Boolean);
350 assert!(!SPECIFICATION_EQ.compare(&schema_one, &Schema::Boolean));
351 assert!(!STRUCT_FIELD_EQ.compare(&schema_one, &Schema::Boolean));
352
353 let schema_two = Schema::map(Schema::Boolean);
354
355 let specification_eq_res = SPECIFICATION_EQ.compare(&schema_one, &schema_two);
356 let struct_field_eq_res = STRUCT_FIELD_EQ.compare(&schema_one, &schema_two);
357 assert!(
358 specification_eq_res,
359 "SpecificationEq: Equality of two Schema::Map failed!"
360 );
361 assert!(
362 struct_field_eq_res,
363 "StructFieldEq: Equality of two Schema::Map failed!"
364 );
365 assert_eq!(specification_eq_res, struct_field_eq_res);
366 }
367
368 #[test]
369 fn test_avro_3939_compare_array_schemata() {
370 let schema_one = Schema::array(Schema::Boolean);
371 assert!(!SPECIFICATION_EQ.compare(&schema_one, &Schema::Boolean));
372 assert!(!STRUCT_FIELD_EQ.compare(&schema_one, &Schema::Boolean));
373
374 let schema_two = Schema::array(Schema::Boolean);
375
376 let specification_eq_res = SPECIFICATION_EQ.compare(&schema_one, &schema_two);
377 let struct_field_eq_res = STRUCT_FIELD_EQ.compare(&schema_one, &schema_two);
378 assert!(
379 specification_eq_res,
380 "SpecificationEq: Equality of two Schema::Array failed!"
381 );
382 assert!(
383 struct_field_eq_res,
384 "StructFieldEq: Equality of two Schema::Array failed!"
385 );
386 assert_eq!(specification_eq_res, struct_field_eq_res);
387 }
388
389 #[test]
390 fn test_avro_3939_compare_decimal_schemata() {
391 let schema_one = Schema::Decimal(DecimalSchema {
392 precision: 10,
393 scale: 2,
394 inner: Box::new(Schema::Bytes),
395 });
396 assert!(!SPECIFICATION_EQ.compare(&schema_one, &Schema::Boolean));
397 assert!(!STRUCT_FIELD_EQ.compare(&schema_one, &Schema::Boolean));
398
399 let schema_two = Schema::Decimal(DecimalSchema {
400 precision: 10,
401 scale: 2,
402 inner: Box::new(Schema::Bytes),
403 });
404
405 let specification_eq_res = SPECIFICATION_EQ.compare(&schema_one, &schema_two);
406 let struct_field_eq_res = STRUCT_FIELD_EQ.compare(&schema_one, &schema_two);
407 assert!(
408 specification_eq_res,
409 "SpecificationEq: Equality of two Schema::Decimal failed!"
410 );
411 assert!(
412 struct_field_eq_res,
413 "StructFieldEq: Equality of two Schema::Decimal failed!"
414 );
415 assert_eq!(specification_eq_res, struct_field_eq_res);
416 }
417
418 #[test]
419 fn test_avro_3939_compare_fixed_schemata() {
420 let schema_one = Schema::Fixed(FixedSchema {
421 name: Name::from("fixed"),
422 doc: None,
423 size: 10,
424 default: None,
425 aliases: None,
426 attributes: BTreeMap::new(),
427 });
428 assert!(!SPECIFICATION_EQ.compare(&schema_one, &Schema::Boolean));
429 assert!(!STRUCT_FIELD_EQ.compare(&schema_one, &Schema::Boolean));
430
431 let schema_two = Schema::Fixed(FixedSchema {
432 name: Name::from("fixed"),
433 doc: None,
434 size: 10,
435 default: None,
436 aliases: None,
437 attributes: BTreeMap::new(),
438 });
439
440 let specification_eq_res = SPECIFICATION_EQ.compare(&schema_one, &schema_two);
441 let struct_field_eq_res = STRUCT_FIELD_EQ.compare(&schema_one, &schema_two);
442 assert!(
443 specification_eq_res,
444 "SpecificationEq: Equality of two Schema::Fixed failed!"
445 );
446 assert!(
447 struct_field_eq_res,
448 "StructFieldEq: Equality of two Schema::Fixed failed!"
449 );
450 assert_eq!(specification_eq_res, struct_field_eq_res);
451 }
452
453 #[test]
454 fn test_avro_3939_compare_enum_schemata() {
455 let schema_one = Schema::Enum(EnumSchema {
456 name: Name::from("enum"),
457 doc: None,
458 symbols: vec!["A".to_string(), "B".to_string()],
459 default: None,
460 aliases: None,
461 attributes: BTreeMap::new(),
462 });
463 assert!(!SPECIFICATION_EQ.compare(&schema_one, &Schema::Boolean));
464 assert!(!STRUCT_FIELD_EQ.compare(&schema_one, &Schema::Boolean));
465
466 let schema_two = Schema::Enum(EnumSchema {
467 name: Name::from("enum"),
468 doc: None,
469 symbols: vec!["A".to_string(), "B".to_string()],
470 default: None,
471 aliases: None,
472 attributes: BTreeMap::new(),
473 });
474
475 let specification_eq_res = SPECIFICATION_EQ.compare(&schema_one, &schema_two);
476 let struct_field_eq_res = STRUCT_FIELD_EQ.compare(&schema_one, &schema_two);
477 assert!(
478 specification_eq_res,
479 "SpecificationEq: Equality of two Schema::Enum failed!"
480 );
481 assert!(
482 struct_field_eq_res,
483 "StructFieldEq: Equality of two Schema::Enum failed!"
484 );
485 assert_eq!(specification_eq_res, struct_field_eq_res);
486 }
487
488 #[test]
489 fn test_avro_3939_compare_ref_schemata() {
490 let schema_one = Schema::Ref {
491 name: Name::from("ref"),
492 };
493 assert!(!SPECIFICATION_EQ.compare(&schema_one, &Schema::Boolean));
494 assert!(!STRUCT_FIELD_EQ.compare(&schema_one, &Schema::Boolean));
495
496 let schema_two = Schema::Ref {
497 name: Name::from("ref"),
498 };
499
500 let specification_eq_res = SPECIFICATION_EQ.compare(&schema_one, &schema_two);
501 let struct_field_eq_res = STRUCT_FIELD_EQ.compare(&schema_one, &schema_two);
502 assert!(
503 specification_eq_res,
504 "SpecificationEq: Equality of two Schema::Ref failed!"
505 );
506 assert!(
507 struct_field_eq_res,
508 "StructFieldEq: Equality of two Schema::Ref failed!"
509 );
510 assert_eq!(specification_eq_res, struct_field_eq_res);
511 }
512
513 #[test]
514 fn test_avro_3939_compare_record_schemata() {
515 let schema_one = Schema::Record(RecordSchema {
516 name: Name::from("record"),
517 doc: None,
518 fields: vec![RecordField {
519 name: "field".to_string(),
520 doc: None,
521 default: None,
522 schema: Schema::Boolean,
523 order: RecordFieldOrder::Ignore,
524 aliases: None,
525 custom_attributes: BTreeMap::new(),
526 position: 0,
527 }],
528 aliases: None,
529 attributes: BTreeMap::new(),
530 lookup: Default::default(),
531 });
532 assert!(!SPECIFICATION_EQ.compare(&schema_one, &Schema::Boolean));
533 assert!(!STRUCT_FIELD_EQ.compare(&schema_one, &Schema::Boolean));
534
535 let schema_two = Schema::Record(RecordSchema {
536 name: Name::from("record"),
537 doc: None,
538 fields: vec![RecordField {
539 name: "field".to_string(),
540 doc: None,
541 default: None,
542 schema: Schema::Boolean,
543 order: RecordFieldOrder::Ignore,
544 aliases: None,
545 custom_attributes: BTreeMap::new(),
546 position: 0,
547 }],
548 aliases: None,
549 attributes: BTreeMap::new(),
550 lookup: Default::default(),
551 });
552
553 let specification_eq_res = SPECIFICATION_EQ.compare(&schema_one, &schema_two);
554 let struct_field_eq_res = STRUCT_FIELD_EQ.compare(&schema_one, &schema_two);
555 assert!(
556 specification_eq_res,
557 "SpecificationEq: Equality of two Schema::Record failed!"
558 );
559 assert!(
560 struct_field_eq_res,
561 "StructFieldEq: Equality of two Schema::Record failed!"
562 );
563 assert_eq!(specification_eq_res, struct_field_eq_res);
564 }
565
566 #[test]
567 fn test_avro_3939_compare_union_schemata() -> TestResult {
568 let schema_one = Schema::Union(UnionSchema::new(vec![Schema::Boolean, Schema::Int])?);
569 assert!(!SPECIFICATION_EQ.compare(&schema_one, &Schema::Boolean));
570 assert!(!STRUCT_FIELD_EQ.compare(&schema_one, &Schema::Boolean));
571
572 let schema_two = Schema::Union(UnionSchema::new(vec![Schema::Boolean, Schema::Int])?);
573
574 let specification_eq_res = SPECIFICATION_EQ.compare(&schema_one, &schema_two);
575 let struct_field_eq_res = STRUCT_FIELD_EQ.compare(&schema_one, &schema_two);
576 assert!(
577 specification_eq_res,
578 "SpecificationEq: Equality of two Schema::Union failed!"
579 );
580 assert!(
581 struct_field_eq_res,
582 "StructFieldEq: Equality of two Schema::Union failed!"
583 );
584 assert_eq!(specification_eq_res, struct_field_eq_res);
585 Ok(())
586 }
587}