1use crate::schema::{InnerDecimalSchema, UuidSchema};
19use crate::{
20 Schema,
21 schema::{
22 ArraySchema, DecimalSchema, EnumSchema, FixedSchema, MapSchema, RecordField, RecordSchema,
23 UnionSchema,
24 },
25};
26use log::debug;
27use std::{fmt::Debug, sync::OnceLock};
28
29pub trait SchemataEq: Debug + Send + Sync {
32 fn compare(&self, schema_one: &Schema, schema_two: &Schema) -> bool;
34}
35
36#[derive(Debug)]
40pub struct SpecificationEq;
41impl SchemataEq for SpecificationEq {
42 fn compare(&self, schema_one: &Schema, schema_two: &Schema) -> bool {
43 schema_one.canonical_form() == schema_two.canonical_form()
44 }
45}
46
47#[derive(Debug)]
51pub struct StructFieldEq {
52 pub include_attributes: bool,
55}
56
57impl SchemataEq for StructFieldEq {
58 #[rustfmt::skip]
59 fn compare(&self, schema_one: &Schema, schema_two: &Schema) -> bool {
60 if schema_one.name() != schema_two.name() {
61 return false;
62 }
63
64 if self.include_attributes
65 && schema_one.custom_attributes() != schema_two.custom_attributes()
66 {
67 return false;
68 }
69
70 match (schema_one, schema_two) {
71 (Schema::Null, Schema::Null) => true,
72 (Schema::Null, _) => false,
73 (Schema::Boolean, Schema::Boolean) => true,
74 (Schema::Boolean, _) => false,
75 (Schema::Int, Schema::Int) => true,
76 (Schema::Int, _) => false,
77 (Schema::Long, Schema::Long) => true,
78 (Schema::Long, _) => false,
79 (Schema::Float, Schema::Float) => true,
80 (Schema::Float, _) => false,
81 (Schema::Double, Schema::Double) => true,
82 (Schema::Double, _) => false,
83 (Schema::Bytes, Schema::Bytes) => true,
84 (Schema::Bytes, _) => false,
85 (Schema::String, Schema::String) => true,
86 (Schema::String, _) => false,
87 (Schema::BigDecimal, Schema::BigDecimal) => true,
88 (Schema::BigDecimal, _) => false,
89 (Schema::Date, Schema::Date) => true,
90 (Schema::Date, _) => false,
91 (Schema::Duration, Schema::Duration) => true,
92 (Schema::Duration, _) => false,
93 (Schema::TimeMicros, Schema::TimeMicros) => true,
94 (Schema::TimeMicros, _) => false,
95 (Schema::TimeMillis, Schema::TimeMillis) => true,
96 (Schema::TimeMillis, _) => false,
97 (Schema::TimestampMicros, Schema::TimestampMicros) => true,
98 (Schema::TimestampMicros, _) => false,
99 (Schema::TimestampMillis, Schema::TimestampMillis) => true,
100 (Schema::TimestampMillis, _) => false,
101 (Schema::TimestampNanos, Schema::TimestampNanos) => true,
102 (Schema::TimestampNanos, _) => false,
103 (Schema::LocalTimestampMicros, Schema::LocalTimestampMicros) => true,
104 (Schema::LocalTimestampMicros, _) => false,
105 (Schema::LocalTimestampMillis, Schema::LocalTimestampMillis) => true,
106 (Schema::LocalTimestampMillis, _) => false,
107 (Schema::LocalTimestampNanos, Schema::LocalTimestampNanos) => true,
108 (Schema::LocalTimestampNanos, _) => false,
109 (
110 Schema::Record(RecordSchema { fields: fields_one, ..}),
111 Schema::Record(RecordSchema { fields: fields_two, ..})
112 ) => {
113 self.compare_fields(fields_one, fields_two)
114 }
115 (Schema::Record(_), _) => false,
116 (
117 Schema::Enum(EnumSchema { symbols: symbols_one, ..}),
118 Schema::Enum(EnumSchema { symbols: symbols_two, .. })
119 ) => {
120 symbols_one == symbols_two
121 }
122 (Schema::Enum(_), _) => false,
123 (
124 Schema::Fixed(FixedSchema { size: size_one, ..}),
125 Schema::Fixed(FixedSchema { size: size_two, .. })
126 ) => {
127 size_one == size_two
128 }
129 (Schema::Fixed(_), _) => false,
130 (
131 Schema::Union(UnionSchema { schemas: schemas_one, ..}),
132 Schema::Union(UnionSchema { schemas: schemas_two, .. })
133 ) => {
134 schemas_one.len() == schemas_two.len()
135 && schemas_one
136 .iter()
137 .zip(schemas_two.iter())
138 .all(|(s1, s2)| self.compare(s1, s2))
139 }
140 (Schema::Union(_), _) => false,
141 (
142 Schema::Decimal(DecimalSchema { precision: precision_one, scale: scale_one, inner: inner_one }),
143 Schema::Decimal(DecimalSchema { precision: precision_two, scale: scale_two, inner: inner_two })
144 ) => {
145 precision_one == precision_two && scale_one == scale_two && match (inner_one, inner_two) {
146 (InnerDecimalSchema::Bytes, InnerDecimalSchema::Bytes) => true,
147 (InnerDecimalSchema::Fixed(FixedSchema { size: size_one, .. }), InnerDecimalSchema::Fixed(FixedSchema { size: size_two, ..})) => {
148 size_one == size_two
149 }
150 _ => false,
151 }
152 }
153 (Schema::Decimal(_), _) => false,
154 (Schema::Uuid(UuidSchema::Bytes), Schema::Uuid(UuidSchema::Bytes)) => true,
155 (Schema::Uuid(UuidSchema::Bytes), _) => false,
156 (Schema::Uuid(UuidSchema::String), Schema::Uuid(UuidSchema::String)) => true,
157 (Schema::Uuid(UuidSchema::String), _) => false,
158 (Schema::Uuid(UuidSchema::Fixed(FixedSchema { size: size_one, ..})), Schema::Uuid(UuidSchema::Fixed(FixedSchema { size: size_two, ..}))) => {
159 size_one == size_two
160 },
161 (Schema::Uuid(UuidSchema::Fixed(_)), _) => false,
162 (
163 Schema::Array(ArraySchema { items: items_one, ..}),
164 Schema::Array(ArraySchema { items: items_two, ..})
165 ) => {
166 self.compare(items_one, items_two)
167 }
168 (Schema::Array(_), _) => false,
169 (
170 Schema::Map(MapSchema { types: types_one, ..}),
171 Schema::Map(MapSchema { types: types_two, ..})
172 ) => {
173 self.compare(types_one, types_two)
174 }
175 (Schema::Map(_), _) => false,
176 (
177 Schema::Ref { name: name_one },
178 Schema::Ref { name: name_two }
179 ) => {
180 name_one == name_two
181 }
182 (Schema::Ref { .. }, _) => false,
183 }
184 }
185}
186
187impl StructFieldEq {
188 fn compare_fields(&self, fields_one: &[RecordField], fields_two: &[RecordField]) -> bool {
189 fields_one.len() == fields_two.len()
190 && fields_one
191 .iter()
192 .zip(fields_two.iter())
193 .all(|(f1, f2)| f1.name == f2.name && self.compare(&f1.schema, &f2.schema))
194 }
195}
196
197static SCHEMATA_COMPARATOR_ONCE: OnceLock<Box<dyn SchemataEq>> = OnceLock::new();
198
199pub fn set_schemata_equality_comparator(
207 comparator: Box<dyn SchemataEq>,
208) -> Result<(), Box<dyn SchemataEq>> {
209 debug!("Setting a custom schemata equality comparator: {comparator:?}.");
210 SCHEMATA_COMPARATOR_ONCE.set(comparator)
211}
212
213pub(crate) fn compare_schemata(schema_one: &Schema, schema_two: &Schema) -> bool {
214 SCHEMATA_COMPARATOR_ONCE
215 .get_or_init(|| {
216 debug!("Going to use the default schemata equality comparator: StructFieldEq.",);
217 Box::new(StructFieldEq {
218 include_attributes: false,
219 })
220 })
221 .compare(schema_one, schema_two)
222}
223
224#[cfg(test)]
225#[allow(non_snake_case)]
226mod tests {
227 use super::*;
228 use crate::schema::{InnerDecimalSchema, Name, RecordFieldOrder};
229 use apache_avro_test_helper::TestResult;
230 use serde_json::Value;
231 use std::collections::BTreeMap;
232
233 const SPECIFICATION_EQ: SpecificationEq = SpecificationEq;
234 const STRUCT_FIELD_EQ: StructFieldEq = StructFieldEq {
235 include_attributes: false,
236 };
237
238 macro_rules! test_primitives {
239 ($primitive:ident) => {
240 paste::item! {
241 #[test]
242 fn [<test_avro_3939_compare_schemata_$primitive>]() {
243 let specification_eq_res = SPECIFICATION_EQ.compare(&Schema::$primitive, &Schema::$primitive);
244 let struct_field_eq_res = STRUCT_FIELD_EQ.compare(&Schema::$primitive, &Schema::$primitive);
245 assert_eq!(specification_eq_res, struct_field_eq_res)
246 }
247 }
248 };
249 }
250
251 test_primitives!(Null);
252 test_primitives!(Boolean);
253 test_primitives!(Int);
254 test_primitives!(Long);
255 test_primitives!(Float);
256 test_primitives!(Double);
257 test_primitives!(Bytes);
258 test_primitives!(String);
259 test_primitives!(BigDecimal);
260 test_primitives!(Date);
261 test_primitives!(Duration);
262 test_primitives!(TimeMicros);
263 test_primitives!(TimeMillis);
264 test_primitives!(TimestampMicros);
265 test_primitives!(TimestampMillis);
266 test_primitives!(TimestampNanos);
267 test_primitives!(LocalTimestampMicros);
268 test_primitives!(LocalTimestampMillis);
269 test_primitives!(LocalTimestampNanos);
270
271 #[test]
272 fn test_avro_3939_compare_named_schemata_with_different_names() {
273 let schema_one = Schema::Ref {
274 name: Name::from("name1"),
275 };
276
277 let schema_two = Schema::Ref {
278 name: Name::from("name2"),
279 };
280
281 let specification_eq_res = SPECIFICATION_EQ.compare(&schema_one, &schema_two);
282 assert!(!specification_eq_res);
283 let struct_field_eq_res = STRUCT_FIELD_EQ.compare(&schema_one, &schema_two);
284 assert!(!struct_field_eq_res);
285
286 assert_eq!(specification_eq_res, struct_field_eq_res);
287 }
288
289 #[test]
290 fn test_avro_3939_compare_schemata_not_including_attributes() {
291 let schema_one = Schema::map_with_attributes(
292 Schema::Boolean,
293 BTreeMap::from_iter([("key1".to_string(), Value::Bool(true))]),
294 );
295 let schema_two = Schema::map_with_attributes(
296 Schema::Boolean,
297 BTreeMap::from_iter([("key2".to_string(), Value::Bool(true))]),
298 );
299 assert!(STRUCT_FIELD_EQ.compare(&schema_one, &schema_two));
301 }
302
303 #[test]
304 fn test_avro_3939_compare_schemata_including_attributes() {
305 let struct_field_eq = StructFieldEq {
306 include_attributes: true,
307 };
308 let schema_one = Schema::map_with_attributes(
309 Schema::Boolean,
310 BTreeMap::from_iter([("key1".to_string(), Value::Bool(true))]),
311 );
312 let schema_two = Schema::map_with_attributes(
313 Schema::Boolean,
314 BTreeMap::from_iter([("key2".to_string(), Value::Bool(true))]),
315 );
316 assert!(!struct_field_eq.compare(&schema_one, &schema_two));
317 }
318
319 #[test]
320 fn test_avro_3939_compare_map_schemata() {
321 let schema_one = Schema::map(Schema::Boolean);
322 assert!(!SPECIFICATION_EQ.compare(&schema_one, &Schema::Boolean));
323 assert!(!STRUCT_FIELD_EQ.compare(&schema_one, &Schema::Boolean));
324
325 let schema_two = Schema::map(Schema::Boolean);
326
327 let specification_eq_res = SPECIFICATION_EQ.compare(&schema_one, &schema_two);
328 let struct_field_eq_res = STRUCT_FIELD_EQ.compare(&schema_one, &schema_two);
329 assert!(
330 specification_eq_res,
331 "SpecificationEq: Equality of two Schema::Map failed!"
332 );
333 assert!(
334 struct_field_eq_res,
335 "StructFieldEq: Equality of two Schema::Map failed!"
336 );
337 assert_eq!(specification_eq_res, struct_field_eq_res);
338 }
339
340 #[test]
341 fn test_avro_3939_compare_array_schemata() {
342 let schema_one = Schema::array(Schema::Boolean);
343 assert!(!SPECIFICATION_EQ.compare(&schema_one, &Schema::Boolean));
344 assert!(!STRUCT_FIELD_EQ.compare(&schema_one, &Schema::Boolean));
345
346 let schema_two = Schema::array(Schema::Boolean);
347
348 let specification_eq_res = SPECIFICATION_EQ.compare(&schema_one, &schema_two);
349 let struct_field_eq_res = STRUCT_FIELD_EQ.compare(&schema_one, &schema_two);
350 assert!(
351 specification_eq_res,
352 "SpecificationEq: Equality of two Schema::Array failed!"
353 );
354 assert!(
355 struct_field_eq_res,
356 "StructFieldEq: Equality of two Schema::Array failed!"
357 );
358 assert_eq!(specification_eq_res, struct_field_eq_res);
359 }
360
361 #[test]
362 fn test_avro_3939_compare_decimal_schemata() {
363 let schema_one = Schema::Decimal(DecimalSchema {
364 precision: 10,
365 scale: 2,
366 inner: InnerDecimalSchema::Bytes,
367 });
368 assert!(!SPECIFICATION_EQ.compare(&schema_one, &Schema::Boolean));
369 assert!(!STRUCT_FIELD_EQ.compare(&schema_one, &Schema::Boolean));
370
371 let schema_two = Schema::Decimal(DecimalSchema {
372 precision: 10,
373 scale: 2,
374 inner: InnerDecimalSchema::Bytes,
375 });
376
377 let specification_eq_res = SPECIFICATION_EQ.compare(&schema_one, &schema_two);
378 let struct_field_eq_res = STRUCT_FIELD_EQ.compare(&schema_one, &schema_two);
379 assert!(
380 specification_eq_res,
381 "SpecificationEq: Equality of two Schema::Decimal failed!"
382 );
383 assert!(
384 struct_field_eq_res,
385 "StructFieldEq: Equality of two Schema::Decimal failed!"
386 );
387 assert_eq!(specification_eq_res, struct_field_eq_res);
388 }
389
390 #[test]
391 fn test_avro_3939_compare_fixed_schemata() {
392 let schema_one = Schema::Fixed(FixedSchema {
393 name: Name::from("fixed"),
394 doc: None,
395 size: 10,
396 default: None,
397 aliases: None,
398 attributes: BTreeMap::new(),
399 });
400 assert!(!SPECIFICATION_EQ.compare(&schema_one, &Schema::Boolean));
401 assert!(!STRUCT_FIELD_EQ.compare(&schema_one, &Schema::Boolean));
402
403 let schema_two = Schema::Fixed(FixedSchema {
404 name: Name::from("fixed"),
405 doc: None,
406 size: 10,
407 default: None,
408 aliases: None,
409 attributes: BTreeMap::new(),
410 });
411
412 let specification_eq_res = SPECIFICATION_EQ.compare(&schema_one, &schema_two);
413 let struct_field_eq_res = STRUCT_FIELD_EQ.compare(&schema_one, &schema_two);
414 assert!(
415 specification_eq_res,
416 "SpecificationEq: Equality of two Schema::Fixed failed!"
417 );
418 assert!(
419 struct_field_eq_res,
420 "StructFieldEq: Equality of two Schema::Fixed failed!"
421 );
422 assert_eq!(specification_eq_res, struct_field_eq_res);
423 }
424
425 #[test]
426 fn test_avro_3939_compare_enum_schemata() {
427 let schema_one = Schema::Enum(EnumSchema {
428 name: Name::from("enum"),
429 doc: None,
430 symbols: vec!["A".to_string(), "B".to_string()],
431 default: None,
432 aliases: None,
433 attributes: BTreeMap::new(),
434 });
435 assert!(!SPECIFICATION_EQ.compare(&schema_one, &Schema::Boolean));
436 assert!(!STRUCT_FIELD_EQ.compare(&schema_one, &Schema::Boolean));
437
438 let schema_two = Schema::Enum(EnumSchema {
439 name: Name::from("enum"),
440 doc: None,
441 symbols: vec!["A".to_string(), "B".to_string()],
442 default: None,
443 aliases: None,
444 attributes: BTreeMap::new(),
445 });
446
447 let specification_eq_res = SPECIFICATION_EQ.compare(&schema_one, &schema_two);
448 let struct_field_eq_res = STRUCT_FIELD_EQ.compare(&schema_one, &schema_two);
449 assert!(
450 specification_eq_res,
451 "SpecificationEq: Equality of two Schema::Enum failed!"
452 );
453 assert!(
454 struct_field_eq_res,
455 "StructFieldEq: Equality of two Schema::Enum failed!"
456 );
457 assert_eq!(specification_eq_res, struct_field_eq_res);
458 }
459
460 #[test]
461 fn test_avro_3939_compare_ref_schemata() {
462 let schema_one = Schema::Ref {
463 name: Name::from("ref"),
464 };
465 assert!(!SPECIFICATION_EQ.compare(&schema_one, &Schema::Boolean));
466 assert!(!STRUCT_FIELD_EQ.compare(&schema_one, &Schema::Boolean));
467
468 let schema_two = Schema::Ref {
469 name: Name::from("ref"),
470 };
471
472 let specification_eq_res = SPECIFICATION_EQ.compare(&schema_one, &schema_two);
473 let struct_field_eq_res = STRUCT_FIELD_EQ.compare(&schema_one, &schema_two);
474 assert!(
475 specification_eq_res,
476 "SpecificationEq: Equality of two Schema::Ref failed!"
477 );
478 assert!(
479 struct_field_eq_res,
480 "StructFieldEq: Equality of two Schema::Ref failed!"
481 );
482 assert_eq!(specification_eq_res, struct_field_eq_res);
483 }
484
485 #[test]
486 fn test_avro_3939_compare_record_schemata() {
487 let schema_one = Schema::Record(RecordSchema {
488 name: Name::from("record"),
489 doc: None,
490 fields: vec![RecordField {
491 name: "field".to_string(),
492 doc: None,
493 default: None,
494 schema: Schema::Boolean,
495 order: RecordFieldOrder::Ignore,
496 aliases: None,
497 custom_attributes: BTreeMap::new(),
498 position: 0,
499 }],
500 aliases: None,
501 attributes: BTreeMap::new(),
502 lookup: Default::default(),
503 });
504 assert!(!SPECIFICATION_EQ.compare(&schema_one, &Schema::Boolean));
505 assert!(!STRUCT_FIELD_EQ.compare(&schema_one, &Schema::Boolean));
506
507 let schema_two = Schema::Record(RecordSchema {
508 name: Name::from("record"),
509 doc: None,
510 fields: vec![RecordField {
511 name: "field".to_string(),
512 doc: None,
513 default: None,
514 schema: Schema::Boolean,
515 order: RecordFieldOrder::Ignore,
516 aliases: None,
517 custom_attributes: BTreeMap::new(),
518 position: 0,
519 }],
520 aliases: None,
521 attributes: BTreeMap::new(),
522 lookup: Default::default(),
523 });
524
525 let specification_eq_res = SPECIFICATION_EQ.compare(&schema_one, &schema_two);
526 let struct_field_eq_res = STRUCT_FIELD_EQ.compare(&schema_one, &schema_two);
527 assert!(
528 specification_eq_res,
529 "SpecificationEq: Equality of two Schema::Record failed!"
530 );
531 assert!(
532 struct_field_eq_res,
533 "StructFieldEq: Equality of two Schema::Record failed!"
534 );
535 assert_eq!(specification_eq_res, struct_field_eq_res);
536 }
537
538 #[test]
539 fn test_avro_3939_compare_union_schemata() -> TestResult {
540 let schema_one = Schema::Union(UnionSchema::new(vec![Schema::Boolean, Schema::Int])?);
541 assert!(!SPECIFICATION_EQ.compare(&schema_one, &Schema::Boolean));
542 assert!(!STRUCT_FIELD_EQ.compare(&schema_one, &Schema::Boolean));
543
544 let schema_two = Schema::Union(UnionSchema::new(vec![Schema::Boolean, Schema::Int])?);
545
546 let specification_eq_res = SPECIFICATION_EQ.compare(&schema_one, &schema_two);
547 let struct_field_eq_res = STRUCT_FIELD_EQ.compare(&schema_one, &schema_two);
548 assert!(
549 specification_eq_res,
550 "SpecificationEq: Equality of two Schema::Union failed!"
551 );
552 assert!(
553 struct_field_eq_res,
554 "StructFieldEq: Equality of two Schema::Union failed!"
555 );
556 assert_eq!(specification_eq_res, struct_field_eq_res);
557 Ok(())
558 }
559
560 #[test]
561 fn test_uuid_compare_uuid() -> TestResult {
562 let string = Schema::Uuid(UuidSchema::String);
563 let bytes = Schema::Uuid(UuidSchema::Bytes);
564 let mut fixed_schema = FixedSchema {
565 name: Name {
566 name: "some_name".to_string(),
567 namespace: None,
568 },
569 aliases: None,
570 doc: None,
571 size: 16,
572 default: None,
573 attributes: Default::default(),
574 };
575 let fixed = Schema::Uuid(UuidSchema::Fixed(fixed_schema.clone()));
576 fixed_schema
577 .attributes
578 .insert("Something".to_string(), Value::Null);
579 let fixed_different = Schema::Uuid(UuidSchema::Fixed(fixed_schema));
580
581 assert!(SPECIFICATION_EQ.compare(&string, &string));
582 assert!(STRUCT_FIELD_EQ.compare(&string, &string));
583 assert!(SPECIFICATION_EQ.compare(&bytes, &bytes));
584 assert!(STRUCT_FIELD_EQ.compare(&bytes, &bytes));
585 assert!(SPECIFICATION_EQ.compare(&fixed, &fixed));
586 assert!(STRUCT_FIELD_EQ.compare(&fixed, &fixed));
587
588 assert!(!SPECIFICATION_EQ.compare(&string, &bytes));
589 assert!(!STRUCT_FIELD_EQ.compare(&string, &bytes));
590 assert!(!SPECIFICATION_EQ.compare(&bytes, &string));
591 assert!(!STRUCT_FIELD_EQ.compare(&bytes, &string));
592 assert!(!SPECIFICATION_EQ.compare(&string, &fixed));
593 assert!(!STRUCT_FIELD_EQ.compare(&string, &fixed));
594 assert!(!SPECIFICATION_EQ.compare(&fixed, &string));
595 assert!(!STRUCT_FIELD_EQ.compare(&fixed, &string));
596 assert!(!SPECIFICATION_EQ.compare(&bytes, &fixed));
597 assert!(!STRUCT_FIELD_EQ.compare(&bytes, &fixed));
598 assert!(!SPECIFICATION_EQ.compare(&fixed, &bytes));
599 assert!(!STRUCT_FIELD_EQ.compare(&fixed, &bytes));
600
601 assert!(SPECIFICATION_EQ.compare(&fixed, &fixed_different));
602 assert!(STRUCT_FIELD_EQ.compare(&fixed, &fixed_different));
603 assert!(SPECIFICATION_EQ.compare(&fixed_different, &fixed));
604 assert!(STRUCT_FIELD_EQ.compare(&fixed_different, &fixed));
605
606 let strict = StructFieldEq {
607 include_attributes: true,
608 };
609
610 assert!(!strict.compare(&fixed, &fixed_different));
611 assert!(!strict.compare(&fixed_different, &fixed));
612
613 Ok(())
614 }
615}