1use crate::{
19 Schema,
20 schema::{
21 ArraySchema, DecimalSchema, EnumSchema, FixedSchema, MapSchema, RecordField, RecordSchema,
22 UnionSchema,
23 },
24};
25use log::debug;
26use std::{fmt::Debug, sync::OnceLock};
27
28pub trait SchemataEq: Debug + Send + Sync {
31 fn compare(&self, schema_one: &Schema, schema_two: &Schema) -> bool;
33}
34
35#[derive(Debug)]
39pub struct SpecificationEq;
40impl SchemataEq for SpecificationEq {
41 fn compare(&self, schema_one: &Schema, schema_two: &Schema) -> bool {
42 schema_one.canonical_form() == schema_two.canonical_form()
43 }
44}
45
46#[derive(Debug)]
50pub struct StructFieldEq {
51 pub include_attributes: bool,
54}
55
56impl SchemataEq for StructFieldEq {
57 #[rustfmt::skip]
58 fn compare(&self, schema_one: &Schema, schema_two: &Schema) -> bool {
59 if schema_one.name() != schema_two.name() {
60 return false;
61 }
62
63 if self.include_attributes
64 && schema_one.custom_attributes() != schema_two.custom_attributes()
65 {
66 return false;
67 }
68
69 match (schema_one, schema_two) {
70 (Schema::Null, Schema::Null) => true,
71 (Schema::Null, _) => false,
72 (Schema::Boolean, Schema::Boolean) => true,
73 (Schema::Boolean, _) => false,
74 (Schema::Int, Schema::Int) => true,
75 (Schema::Int, _) => false,
76 (Schema::Long, Schema::Long) => true,
77 (Schema::Long, _) => false,
78 (Schema::Float, Schema::Float) => true,
79 (Schema::Float, _) => false,
80 (Schema::Double, Schema::Double) => true,
81 (Schema::Double, _) => false,
82 (Schema::Bytes, Schema::Bytes) => true,
83 (Schema::Bytes, _) => false,
84 (Schema::String, Schema::String) => true,
85 (Schema::String, _) => false,
86 (Schema::Uuid, Schema::Uuid) => true,
87 (Schema::Uuid, _) => false,
88 (Schema::BigDecimal, Schema::BigDecimal) => true,
89 (Schema::BigDecimal, _) => false,
90 (Schema::Date, Schema::Date) => true,
91 (Schema::Date, _) => false,
92 (Schema::Duration, Schema::Duration) => true,
93 (Schema::Duration, _) => false,
94 (Schema::TimeMicros, Schema::TimeMicros) => true,
95 (Schema::TimeMicros, _) => false,
96 (Schema::TimeMillis, Schema::TimeMillis) => true,
97 (Schema::TimeMillis, _) => false,
98 (Schema::TimestampMicros, Schema::TimestampMicros) => true,
99 (Schema::TimestampMicros, _) => false,
100 (Schema::TimestampMillis, Schema::TimestampMillis) => true,
101 (Schema::TimestampMillis, _) => false,
102 (Schema::TimestampNanos, Schema::TimestampNanos) => true,
103 (Schema::TimestampNanos, _) => false,
104 (Schema::LocalTimestampMicros, Schema::LocalTimestampMicros) => true,
105 (Schema::LocalTimestampMicros, _) => false,
106 (Schema::LocalTimestampMillis, Schema::LocalTimestampMillis) => true,
107 (Schema::LocalTimestampMillis, _) => false,
108 (Schema::LocalTimestampNanos, Schema::LocalTimestampNanos) => true,
109 (Schema::LocalTimestampNanos, _) => false,
110 (
111 Schema::Record(RecordSchema { fields: fields_one, ..}),
112 Schema::Record(RecordSchema { fields: fields_two, ..})
113 ) => {
114 self.compare_fields(fields_one, fields_two)
115 }
116 (Schema::Record(_), _) => false,
117 (
118 Schema::Enum(EnumSchema { symbols: symbols_one, ..}),
119 Schema::Enum(EnumSchema { symbols: symbols_two, .. })
120 ) => {
121 symbols_one == symbols_two
122 }
123 (Schema::Enum(_), _) => false,
124 (
125 Schema::Fixed(FixedSchema { size: size_one, ..}),
126 Schema::Fixed(FixedSchema { size: size_two, .. })
127 ) => {
128 size_one == size_two
129 }
130 (Schema::Fixed(_), _) => false,
131 (
132 Schema::Union(UnionSchema { schemas: schemas_one, ..}),
133 Schema::Union(UnionSchema { schemas: schemas_two, .. })
134 ) => {
135 schemas_one.len() == schemas_two.len()
136 && schemas_one
137 .iter()
138 .zip(schemas_two.iter())
139 .all(|(s1, s2)| self.compare(s1, s2))
140 }
141 (Schema::Union(_), _) => false,
142 (
143 Schema::Decimal(DecimalSchema { precision: precision_one, scale: scale_one, inner: inner_one }),
144 Schema::Decimal(DecimalSchema { precision: precision_two, scale: scale_two, inner: inner_two })
145 ) => {
146 precision_one == precision_two && scale_one == scale_two && self.compare(inner_one, inner_two)
147 }
148 (Schema::Decimal(_), _) => false,
149 (
150 Schema::Array(ArraySchema { items: items_one, ..}),
151 Schema::Array(ArraySchema { items: items_two, ..})
152 ) => {
153 self.compare(items_one, items_two)
154 }
155 (Schema::Array(_), _) => false,
156 (
157 Schema::Map(MapSchema { types: types_one, ..}),
158 Schema::Map(MapSchema { types: types_two, ..})
159 ) => {
160 self.compare(types_one, types_two)
161 }
162 (Schema::Map(_), _) => false,
163 (
164 Schema::Ref { name: name_one },
165 Schema::Ref { name: name_two }
166 ) => {
167 name_one == name_two
168 }
169 (Schema::Ref { .. }, _) => false,
170 }
171 }
172}
173
174impl StructFieldEq {
175 fn compare_fields(&self, fields_one: &[RecordField], fields_two: &[RecordField]) -> bool {
176 fields_one.len() == fields_two.len()
177 && fields_one
178 .iter()
179 .zip(fields_two.iter())
180 .all(|(f1, f2)| f1.name == f2.name && self.compare(&f1.schema, &f2.schema))
181 }
182}
183
184static SCHEMATA_COMPARATOR_ONCE: OnceLock<Box<dyn SchemataEq>> = OnceLock::new();
185
186pub fn set_schemata_equality_comparator(
194 comparator: Box<dyn SchemataEq>,
195) -> Result<(), Box<dyn SchemataEq>> {
196 debug!("Setting a custom schemata equality comparator: {comparator:?}.");
197 SCHEMATA_COMPARATOR_ONCE.set(comparator)
198}
199
200pub(crate) fn compare_schemata(schema_one: &Schema, schema_two: &Schema) -> bool {
201 SCHEMATA_COMPARATOR_ONCE
202 .get_or_init(|| {
203 debug!("Going to use the default schemata equality comparator: StructFieldEq.",);
204 Box::new(StructFieldEq {
205 include_attributes: false,
206 })
207 })
208 .compare(schema_one, schema_two)
209}
210
211#[cfg(test)]
212#[allow(non_snake_case)]
213mod tests {
214 use super::*;
215 use crate::schema::{Name, RecordFieldOrder};
216 use apache_avro_test_helper::TestResult;
217 use serde_json::Value;
218 use std::collections::BTreeMap;
219
220 const SPECIFICATION_EQ: SpecificationEq = SpecificationEq;
221 const STRUCT_FIELD_EQ: StructFieldEq = StructFieldEq {
222 include_attributes: false,
223 };
224
225 macro_rules! test_primitives {
226 ($primitive:ident) => {
227 paste::item! {
228 #[test]
229 fn [<test_avro_3939_compare_schemata_$primitive>]() {
230 let specification_eq_res = SPECIFICATION_EQ.compare(&Schema::$primitive, &Schema::$primitive);
231 let struct_field_eq_res = STRUCT_FIELD_EQ.compare(&Schema::$primitive, &Schema::$primitive);
232 assert_eq!(specification_eq_res, struct_field_eq_res)
233 }
234 }
235 };
236 }
237
238 test_primitives!(Null);
239 test_primitives!(Boolean);
240 test_primitives!(Int);
241 test_primitives!(Long);
242 test_primitives!(Float);
243 test_primitives!(Double);
244 test_primitives!(Bytes);
245 test_primitives!(String);
246 test_primitives!(Uuid);
247 test_primitives!(BigDecimal);
248 test_primitives!(Date);
249 test_primitives!(Duration);
250 test_primitives!(TimeMicros);
251 test_primitives!(TimeMillis);
252 test_primitives!(TimestampMicros);
253 test_primitives!(TimestampMillis);
254 test_primitives!(TimestampNanos);
255 test_primitives!(LocalTimestampMicros);
256 test_primitives!(LocalTimestampMillis);
257 test_primitives!(LocalTimestampNanos);
258
259 #[test]
260 fn test_avro_3939_compare_named_schemata_with_different_names() {
261 let schema_one = Schema::Ref {
262 name: Name::from("name1"),
263 };
264
265 let schema_two = Schema::Ref {
266 name: Name::from("name2"),
267 };
268
269 let specification_eq_res = SPECIFICATION_EQ.compare(&schema_one, &schema_two);
270 assert!(!specification_eq_res);
271 let struct_field_eq_res = STRUCT_FIELD_EQ.compare(&schema_one, &schema_two);
272 assert!(!struct_field_eq_res);
273
274 assert_eq!(specification_eq_res, struct_field_eq_res);
275 }
276
277 #[test]
278 fn test_avro_3939_compare_schemata_not_including_attributes() {
279 let schema_one = Schema::map_with_attributes(
280 Schema::Boolean,
281 BTreeMap::from_iter([("key1".to_string(), Value::Bool(true))]),
282 );
283 let schema_two = Schema::map_with_attributes(
284 Schema::Boolean,
285 BTreeMap::from_iter([("key2".to_string(), Value::Bool(true))]),
286 );
287 assert!(STRUCT_FIELD_EQ.compare(&schema_one, &schema_two));
289 }
290
291 #[test]
292 fn test_avro_3939_compare_schemata_including_attributes() {
293 let struct_field_eq = StructFieldEq {
294 include_attributes: true,
295 };
296 let schema_one = Schema::map_with_attributes(
297 Schema::Boolean,
298 BTreeMap::from_iter([("key1".to_string(), Value::Bool(true))]),
299 );
300 let schema_two = Schema::map_with_attributes(
301 Schema::Boolean,
302 BTreeMap::from_iter([("key2".to_string(), Value::Bool(true))]),
303 );
304 assert!(!struct_field_eq.compare(&schema_one, &schema_two));
305 }
306
307 #[test]
308 fn test_avro_3939_compare_map_schemata() {
309 let schema_one = Schema::map(Schema::Boolean);
310 assert!(!SPECIFICATION_EQ.compare(&schema_one, &Schema::Boolean));
311 assert!(!STRUCT_FIELD_EQ.compare(&schema_one, &Schema::Boolean));
312
313 let schema_two = Schema::map(Schema::Boolean);
314
315 let specification_eq_res = SPECIFICATION_EQ.compare(&schema_one, &schema_two);
316 let struct_field_eq_res = STRUCT_FIELD_EQ.compare(&schema_one, &schema_two);
317 assert!(
318 specification_eq_res,
319 "SpecificationEq: Equality of two Schema::Map failed!"
320 );
321 assert!(
322 struct_field_eq_res,
323 "StructFieldEq: Equality of two Schema::Map failed!"
324 );
325 assert_eq!(specification_eq_res, struct_field_eq_res);
326 }
327
328 #[test]
329 fn test_avro_3939_compare_array_schemata() {
330 let schema_one = Schema::array(Schema::Boolean);
331 assert!(!SPECIFICATION_EQ.compare(&schema_one, &Schema::Boolean));
332 assert!(!STRUCT_FIELD_EQ.compare(&schema_one, &Schema::Boolean));
333
334 let schema_two = Schema::array(Schema::Boolean);
335
336 let specification_eq_res = SPECIFICATION_EQ.compare(&schema_one, &schema_two);
337 let struct_field_eq_res = STRUCT_FIELD_EQ.compare(&schema_one, &schema_two);
338 assert!(
339 specification_eq_res,
340 "SpecificationEq: Equality of two Schema::Array failed!"
341 );
342 assert!(
343 struct_field_eq_res,
344 "StructFieldEq: Equality of two Schema::Array failed!"
345 );
346 assert_eq!(specification_eq_res, struct_field_eq_res);
347 }
348
349 #[test]
350 fn test_avro_3939_compare_decimal_schemata() {
351 let schema_one = Schema::Decimal(DecimalSchema {
352 precision: 10,
353 scale: 2,
354 inner: Box::new(Schema::Bytes),
355 });
356 assert!(!SPECIFICATION_EQ.compare(&schema_one, &Schema::Boolean));
357 assert!(!STRUCT_FIELD_EQ.compare(&schema_one, &Schema::Boolean));
358
359 let schema_two = Schema::Decimal(DecimalSchema {
360 precision: 10,
361 scale: 2,
362 inner: Box::new(Schema::Bytes),
363 });
364
365 let specification_eq_res = SPECIFICATION_EQ.compare(&schema_one, &schema_two);
366 let struct_field_eq_res = STRUCT_FIELD_EQ.compare(&schema_one, &schema_two);
367 assert!(
368 specification_eq_res,
369 "SpecificationEq: Equality of two Schema::Decimal failed!"
370 );
371 assert!(
372 struct_field_eq_res,
373 "StructFieldEq: Equality of two Schema::Decimal failed!"
374 );
375 assert_eq!(specification_eq_res, struct_field_eq_res);
376 }
377
378 #[test]
379 fn test_avro_3939_compare_fixed_schemata() {
380 let schema_one = Schema::Fixed(FixedSchema {
381 name: Name::from("fixed"),
382 doc: None,
383 size: 10,
384 default: None,
385 aliases: None,
386 attributes: BTreeMap::new(),
387 });
388 assert!(!SPECIFICATION_EQ.compare(&schema_one, &Schema::Boolean));
389 assert!(!STRUCT_FIELD_EQ.compare(&schema_one, &Schema::Boolean));
390
391 let schema_two = Schema::Fixed(FixedSchema {
392 name: Name::from("fixed"),
393 doc: None,
394 size: 10,
395 default: None,
396 aliases: None,
397 attributes: BTreeMap::new(),
398 });
399
400 let specification_eq_res = SPECIFICATION_EQ.compare(&schema_one, &schema_two);
401 let struct_field_eq_res = STRUCT_FIELD_EQ.compare(&schema_one, &schema_two);
402 assert!(
403 specification_eq_res,
404 "SpecificationEq: Equality of two Schema::Fixed failed!"
405 );
406 assert!(
407 struct_field_eq_res,
408 "StructFieldEq: Equality of two Schema::Fixed failed!"
409 );
410 assert_eq!(specification_eq_res, struct_field_eq_res);
411 }
412
413 #[test]
414 fn test_avro_3939_compare_enum_schemata() {
415 let schema_one = Schema::Enum(EnumSchema {
416 name: Name::from("enum"),
417 doc: None,
418 symbols: vec!["A".to_string(), "B".to_string()],
419 default: None,
420 aliases: None,
421 attributes: BTreeMap::new(),
422 });
423 assert!(!SPECIFICATION_EQ.compare(&schema_one, &Schema::Boolean));
424 assert!(!STRUCT_FIELD_EQ.compare(&schema_one, &Schema::Boolean));
425
426 let schema_two = Schema::Enum(EnumSchema {
427 name: Name::from("enum"),
428 doc: None,
429 symbols: vec!["A".to_string(), "B".to_string()],
430 default: None,
431 aliases: None,
432 attributes: BTreeMap::new(),
433 });
434
435 let specification_eq_res = SPECIFICATION_EQ.compare(&schema_one, &schema_two);
436 let struct_field_eq_res = STRUCT_FIELD_EQ.compare(&schema_one, &schema_two);
437 assert!(
438 specification_eq_res,
439 "SpecificationEq: Equality of two Schema::Enum failed!"
440 );
441 assert!(
442 struct_field_eq_res,
443 "StructFieldEq: Equality of two Schema::Enum failed!"
444 );
445 assert_eq!(specification_eq_res, struct_field_eq_res);
446 }
447
448 #[test]
449 fn test_avro_3939_compare_ref_schemata() {
450 let schema_one = Schema::Ref {
451 name: Name::from("ref"),
452 };
453 assert!(!SPECIFICATION_EQ.compare(&schema_one, &Schema::Boolean));
454 assert!(!STRUCT_FIELD_EQ.compare(&schema_one, &Schema::Boolean));
455
456 let schema_two = Schema::Ref {
457 name: Name::from("ref"),
458 };
459
460 let specification_eq_res = SPECIFICATION_EQ.compare(&schema_one, &schema_two);
461 let struct_field_eq_res = STRUCT_FIELD_EQ.compare(&schema_one, &schema_two);
462 assert!(
463 specification_eq_res,
464 "SpecificationEq: Equality of two Schema::Ref failed!"
465 );
466 assert!(
467 struct_field_eq_res,
468 "StructFieldEq: Equality of two Schema::Ref failed!"
469 );
470 assert_eq!(specification_eq_res, struct_field_eq_res);
471 }
472
473 #[test]
474 fn test_avro_3939_compare_record_schemata() {
475 let schema_one = Schema::Record(RecordSchema {
476 name: Name::from("record"),
477 doc: None,
478 fields: vec![RecordField {
479 name: "field".to_string(),
480 doc: None,
481 default: None,
482 schema: Schema::Boolean,
483 order: RecordFieldOrder::Ignore,
484 aliases: None,
485 custom_attributes: BTreeMap::new(),
486 position: 0,
487 }],
488 aliases: None,
489 attributes: BTreeMap::new(),
490 lookup: Default::default(),
491 });
492 assert!(!SPECIFICATION_EQ.compare(&schema_one, &Schema::Boolean));
493 assert!(!STRUCT_FIELD_EQ.compare(&schema_one, &Schema::Boolean));
494
495 let schema_two = Schema::Record(RecordSchema {
496 name: Name::from("record"),
497 doc: None,
498 fields: vec![RecordField {
499 name: "field".to_string(),
500 doc: None,
501 default: None,
502 schema: Schema::Boolean,
503 order: RecordFieldOrder::Ignore,
504 aliases: None,
505 custom_attributes: BTreeMap::new(),
506 position: 0,
507 }],
508 aliases: None,
509 attributes: BTreeMap::new(),
510 lookup: Default::default(),
511 });
512
513 let specification_eq_res = SPECIFICATION_EQ.compare(&schema_one, &schema_two);
514 let struct_field_eq_res = STRUCT_FIELD_EQ.compare(&schema_one, &schema_two);
515 assert!(
516 specification_eq_res,
517 "SpecificationEq: Equality of two Schema::Record failed!"
518 );
519 assert!(
520 struct_field_eq_res,
521 "StructFieldEq: Equality of two Schema::Record failed!"
522 );
523 assert_eq!(specification_eq_res, struct_field_eq_res);
524 }
525
526 #[test]
527 fn test_avro_3939_compare_union_schemata() -> TestResult {
528 let schema_one = Schema::Union(UnionSchema::new(vec![Schema::Boolean, Schema::Int])?);
529 assert!(!SPECIFICATION_EQ.compare(&schema_one, &Schema::Boolean));
530 assert!(!STRUCT_FIELD_EQ.compare(&schema_one, &Schema::Boolean));
531
532 let schema_two = Schema::Union(UnionSchema::new(vec![Schema::Boolean, Schema::Int])?);
533
534 let specification_eq_res = SPECIFICATION_EQ.compare(&schema_one, &schema_two);
535 let struct_field_eq_res = STRUCT_FIELD_EQ.compare(&schema_one, &schema_two);
536 assert!(
537 specification_eq_res,
538 "SpecificationEq: Equality of two Schema::Union failed!"
539 );
540 assert!(
541 struct_field_eq_res,
542 "StructFieldEq: Equality of two Schema::Union failed!"
543 );
544 assert_eq!(specification_eq_res, struct_field_eq_res);
545 Ok(())
546 }
547}