1use crate::{
20 error::CompatibilityError,
21 schema::{
22 ArraySchema, DecimalSchema, EnumSchema, InnerDecimalSchema, MapSchema, RecordSchema,
23 Schema, UuidSchema,
24 },
25};
26use std::{
27 collections::{HashMap, hash_map::DefaultHasher},
28 hash::Hasher,
29 iter::once,
30 ops::BitAndAssign,
31 ptr,
32};
33
34pub struct SchemaCompatibility;
35
36#[derive(Debug, Copy, Clone, Eq, PartialEq)]
38pub enum Compatibility {
39 Full,
41 Partial,
45}
46
47impl BitAndAssign for Compatibility {
48 fn bitand_assign(&mut self, rhs: Self) {
56 match (*self, rhs) {
57 (Self::Full, Self::Full) => *self = Self::Full,
58 _ => *self = Self::Partial,
59 }
60 }
61}
62
63struct Checker {
64 recursion: HashMap<(u64, u64), Compatibility>,
65}
66
67impl Checker {
68 pub(crate) fn new() -> Self {
70 Self {
71 recursion: HashMap::new(),
72 }
73 }
74
75 pub(crate) fn full_match_schemas(
77 &mut self,
78 writers_schema: &Schema,
79 readers_schema: &Schema,
80 ) -> Result<Compatibility, CompatibilityError> {
81 let key = (
84 Self::pointer_hash(writers_schema),
85 Self::pointer_hash(readers_schema),
86 );
87
88 if let Some(c) = self.recursion.get(&key).copied() {
90 Ok(c)
91 } else {
92 let c = self.inner_full_match_schemas(writers_schema, readers_schema)?;
93 self.recursion.insert(key, c);
95 Ok(c)
96 }
97 }
98
99 fn pointer_hash(schema: &Schema) -> u64 {
101 let mut hasher = DefaultHasher::new();
102 ptr::hash(schema, &mut hasher);
103 hasher.finish()
104 }
105
106 #[rustfmt::skip]
110 fn inner_full_match_schemas(
111 &mut self,
112 writers_schema: &Schema,
113 readers_schema: &Schema,
114 ) -> Result<Compatibility, CompatibilityError> {
115 if let Some(w_name) = writers_schema.name()
117 && let Some(r_name) = readers_schema.name()
118 && w_name.name != r_name.name
119 {
120 return Err(CompatibilityError::NameMismatch {
121 writer_name: w_name.name.clone(),
122 reader_name: r_name.name.clone(),
123 });
124 }
125
126 match (writers_schema, readers_schema) {
128 (Schema::Ref { name: w_name }, Schema::Ref { name: r_name }) => {
129 if r_name == w_name {
130 Ok(Compatibility::Full)
131 } else {
132 Err(CompatibilityError::NameMismatch {
133 writer_name: w_name.fullname(None),
134 reader_name: r_name.fullname(None),
135 })
136 }
137 }
138 (Schema::Union(writer), Schema::Union(reader)) => {
139 let mut any = false;
140 let mut all = true;
141 for writer in &writer.schemas {
142 let mut local_any = false;
145 all &= reader.schemas.iter().any(|reader| {
146 match self.full_match_schemas(writer, reader) {
147 Ok(Compatibility::Full) => {
148 local_any = true;
149 true
150 }
151 Ok(Compatibility::Partial) => {
152 local_any = true;
153 false
154 }
155 Err(_) => false,
156 }
157 });
158 any |= local_any;
160 }
161 if all {
162 Ok(Compatibility::Full)
164 } else if any {
165 Ok(Compatibility::Partial)
167 } else {
168 Err(CompatibilityError::MissingUnionElements)
169 }
170 }
171 (Schema::Union(writer), _) => {
172 let mut any = false;
175 let mut all = true;
176 for writer in &writer.schemas {
177 match self.full_match_schemas(writer, readers_schema) {
178 Ok(Compatibility::Full) => any = true,
179 Ok(Compatibility::Partial) => {
180 any = true;
181 all = false;
182 }
183 Err(_) => {
184 all = false;
185 }
186 }
187 }
188 if all {
189 Ok(Compatibility::Full)
191 } else if any {
192 Ok(Compatibility::Partial)
194 } else {
195 Err(CompatibilityError::SchemaMismatchAllUnionElements)
196 }
197 }
198 (_, Schema::Union(reader)) => {
199 let mut partial = false;
202 if reader.schemas.iter().any(|reader| {
203 match self.full_match_schemas(writers_schema, reader) {
204 Ok(Compatibility::Full) => true,
205 Ok(Compatibility::Partial) => {
206 partial = true;
207 false
208 }
209 Err(_) => false,
210 }
211 }) {
212 Ok(Compatibility::Full)
214 } else if partial {
215 Ok(Compatibility::Partial)
217 } else {
218 Err(CompatibilityError::SchemaMismatchAllUnionElements)
219 }
220 }
221 (Schema::Null, Schema::Null) => Ok(Compatibility::Full),
222 (Schema::Boolean, Schema::Boolean) => Ok(Compatibility::Full),
223 (
225 Schema::Int | Schema::Date | Schema::TimeMillis,
226 Schema::Int | Schema::Long | Schema::Float | Schema::Double | Schema::Date
227 | Schema::TimeMillis | Schema::TimeMicros | Schema::TimestampMillis
228 | Schema::TimestampMicros | Schema::TimestampNanos | Schema::LocalTimestampMillis
229 | Schema::LocalTimestampMicros | Schema::LocalTimestampNanos,
230 ) => Ok(Compatibility::Full),
231 (
233 Schema::Long | Schema::TimeMicros | Schema::TimestampMillis
234 | Schema::TimestampMicros | Schema::TimestampNanos | Schema::LocalTimestampMillis
235 | Schema::LocalTimestampMicros | Schema::LocalTimestampNanos,
236 Schema::Long | Schema::Float | Schema::Double | Schema::TimeMicros
237 | Schema::TimestampMillis | Schema::TimestampMicros | Schema::TimestampNanos
238 | Schema::LocalTimestampMillis | Schema::LocalTimestampMicros
239 | Schema::LocalTimestampNanos,
240 ) => Ok(Compatibility::Full),
241 (Schema::Float, Schema::Float | Schema::Double) => Ok(Compatibility::Full),
243 (Schema::Double, Schema::Double) => Ok(Compatibility::Full),
244 (
248 Schema::Decimal(DecimalSchema { precision: w_precision, scale: w_scale, .. }),
249 Schema::Decimal(DecimalSchema { precision: r_precision, scale: r_scale, .. }),
250 ) => {
251 if r_precision == w_precision && r_scale == w_scale {
253 Ok(Compatibility::Full)
254 } else {
255 Err(CompatibilityError::DecimalMismatch {
256 r_precision: *r_precision,
257 r_scale: *r_scale,
258 w_precision: *w_precision,
259 w_scale: *w_scale
260 })
261 }
262 }
263 (
265 Schema::Bytes | Schema::String | Schema::BigDecimal
266 | Schema::Uuid(UuidSchema::String | UuidSchema::Bytes)
267 | Schema::Decimal(DecimalSchema { inner: InnerDecimalSchema::Bytes, .. }),
268 Schema::Bytes | Schema::String | Schema::BigDecimal
269 | Schema::Uuid(UuidSchema::String | UuidSchema::Bytes)
270 | Schema::Decimal(DecimalSchema { inner: InnerDecimalSchema::Bytes, .. }),
271 ) => Ok(Compatibility::Full),
272 (Schema::Uuid(_), Schema::Uuid(_)) => Ok(Compatibility::Full),
273 (
274 Schema::Fixed(w_fixed) | Schema::Uuid(UuidSchema::Fixed(w_fixed))
275 | Schema::Decimal(DecimalSchema { inner: InnerDecimalSchema::Fixed(w_fixed), .. })
276 | Schema::Duration(w_fixed),
277 Schema::Fixed(r_fixed) | Schema::Uuid(UuidSchema::Fixed(r_fixed))
278 | Schema::Decimal(DecimalSchema { inner: InnerDecimalSchema::Fixed(r_fixed), .. })
279 | Schema::Duration(r_fixed),
280 ) => {
281 if r_fixed.size == w_fixed.size {
283 Ok(Compatibility::Full)
284 } else {
285 Err(CompatibilityError::FixedMismatch)
286 }
287 }
288 (
289 Schema::Array(ArraySchema { items: w_items, .. }),
290 Schema::Array(ArraySchema { items: r_items, .. }),
291 ) => {
292 self.full_match_schemas(w_items, r_items)
294 }
295 (
296 Schema::Map(MapSchema { types: w_types, .. }),
297 Schema::Map(MapSchema { types: r_types, .. }),
298 ) => {
299 self.full_match_schemas(w_types, r_types)
301 }
302 (
303 Schema::Enum(EnumSchema { symbols: w_symbols, .. }),
304 Schema::Enum(EnumSchema { symbols: r_symbols, default: r_default, .. }),
305 ) => {
306 if r_default.is_some() {
308 Ok(Compatibility::Full)
310 } else {
311 let mut any = false;
312 let mut all = true;
313 w_symbols.iter().for_each(|s| {
314 let res = r_symbols.contains(s);
315 any |= res;
316 all &= res;
317 });
318 if all {
319 Ok(Compatibility::Full)
321 } else if any {
322 Ok(Compatibility::Partial)
324 } else {
325 Err(CompatibilityError::MissingSymbols)
327 }
328 }
329 }
330 (
331 Schema::Record(RecordSchema { fields: w_fields, .. }),
332 Schema::Record(RecordSchema { fields: r_fields, .. }),
333 ) => {
334 let mut compatibility = Compatibility::Full;
335 for r_field in r_fields {
336 if let Some(w_field) = once(&r_field.name)
340 .chain(r_field.aliases.as_deref().unwrap_or(&[]).iter())
341 .find_map(|ra| w_fields.iter().find(|wf| &wf.name == ra))
342 {
343 match self.full_match_schemas(&w_field.schema, &r_field.schema) {
345 Ok(c) => compatibility &= c,
346 Err(err) => {
347 return Err(CompatibilityError::FieldTypeMismatch(
348 r_field.name.clone(),
349 Box::new(err),
350 ));
351 }
352 }
353 } else if r_field.default.is_none() {
354 return Err(CompatibilityError::MissingDefaultValue(
356 r_field.name.clone(),
357 ));
358 }
359 }
360 Ok(compatibility)
361 }
362 (_, _) => Err(CompatibilityError::WrongType {
363 writer_schema_type: format!("{writers_schema:#?}"),
364 reader_schema_type: format!("{readers_schema:#?}"),
365 }),
366 }
367 }
368
369 pub(crate) fn can_read(
370 &mut self,
371 writers_schema: &Schema,
372 readers_schema: &Schema,
373 ) -> Result<Compatibility, CompatibilityError> {
374 self.full_match_schemas(writers_schema, readers_schema)
375 }
376}
377
378impl SchemaCompatibility {
379 pub fn can_read(
382 writers_schema: &Schema,
383 readers_schema: &Schema,
384 ) -> Result<Compatibility, CompatibilityError> {
385 let mut c = Checker::new();
386 c.can_read(writers_schema, readers_schema)
387 }
388
389 pub fn mutual_read(
392 writers_schema: &Schema,
393 readers_schema: &Schema,
394 ) -> Result<Compatibility, CompatibilityError> {
395 let mut c = SchemaCompatibility::can_read(writers_schema, readers_schema)?;
396 c &= SchemaCompatibility::can_read(readers_schema, writers_schema)?;
397 Ok(c)
398 }
399}
400
401#[cfg(test)]
402mod tests {
403 use std::collections::BTreeMap;
404
405 use super::*;
406 use crate::{
407 Codec, Decimal, Reader, Writer,
408 schema::{FixedSchema, Name, UuidSchema},
409 types::{Record, Value},
410 };
411 use apache_avro_test_helper::TestResult;
412 use rstest::*;
413
414 fn int_array_schema() -> Schema {
415 Schema::parse_str(r#"{"type":"array", "items":"int"}"#).unwrap()
416 }
417
418 fn long_array_schema() -> Schema {
419 Schema::parse_str(r#"{"type":"array", "items":"long"}"#).unwrap()
420 }
421
422 fn string_array_schema() -> Schema {
423 Schema::parse_str(r#"{"type":"array", "items":"string"}"#).unwrap()
424 }
425
426 fn int_map_schema() -> Schema {
427 Schema::parse_str(r#"{"type":"map", "values":"int"}"#).unwrap()
428 }
429
430 fn long_map_schema() -> Schema {
431 Schema::parse_str(r#"{"type":"map", "values":"long"}"#).unwrap()
432 }
433
434 fn string_map_schema() -> Schema {
435 Schema::parse_str(r#"{"type":"map", "values":"string"}"#).unwrap()
436 }
437
438 fn enum1_ab_schema() -> Schema {
439 Schema::parse_str(r#"{"type":"enum", "name":"Enum1", "symbols":["A","B"]}"#).unwrap()
440 }
441
442 fn enum1_abc_schema() -> Schema {
443 Schema::parse_str(r#"{"type":"enum", "name":"Enum1", "symbols":["A","B","C"]}"#).unwrap()
444 }
445
446 fn enum1_bc_schema() -> Schema {
447 Schema::parse_str(r#"{"type":"enum", "name":"Enum1", "symbols":["B","C"]}"#).unwrap()
448 }
449
450 fn enum2_ab_schema() -> Schema {
451 Schema::parse_str(r#"{"type":"enum", "name":"Enum2", "symbols":["A","B"]}"#).unwrap()
452 }
453
454 fn empty_record1_schema() -> Schema {
455 Schema::parse_str(r#"{"type":"record", "name":"Record1", "fields":[]}"#).unwrap()
456 }
457
458 fn empty_record2_schema() -> Schema {
459 Schema::parse_str(r#"{"type":"record", "name":"Record2", "fields": []}"#).unwrap()
460 }
461
462 fn a_int_record1_schema() -> Schema {
463 Schema::parse_str(
464 r#"{"type":"record", "name":"Record1", "fields":[{"name":"a", "type":"int"}]}"#,
465 )
466 .unwrap()
467 }
468
469 fn a_long_record1_schema() -> Schema {
470 Schema::parse_str(
471 r#"{"type":"record", "name":"Record1", "fields":[{"name":"a", "type":"long"}]}"#,
472 )
473 .unwrap()
474 }
475
476 fn a_int_b_int_record1_schema() -> Schema {
477 Schema::parse_str(r#"{"type":"record", "name":"Record1", "fields":[{"name":"a", "type":"int"}, {"name":"b", "type":"int"}]}"#).unwrap()
478 }
479
480 fn a_dint_record1_schema() -> Schema {
481 Schema::parse_str(r#"{"type":"record", "name":"Record1", "fields":[{"name":"a", "type":"int", "default":0}]}"#).unwrap()
482 }
483
484 fn a_int_b_dint_record1_schema() -> Schema {
485 Schema::parse_str(r#"{"type":"record", "name":"Record1", "fields":[{"name":"a", "type":"int"}, {"name":"b", "type":"int", "default":0}]}"#).unwrap()
486 }
487
488 fn a_dint_b_dint_record1_schema() -> Schema {
489 Schema::parse_str(r#"{"type":"record", "name":"Record1", "fields":[{"name":"a", "type":"int", "default":0}, {"name":"b", "type":"int", "default":0}]}"#).unwrap()
490 }
491
492 fn nested_record() -> Schema {
493 Schema::parse_str(r#"{"type":"record","name":"parent","fields":[{"name":"attribute","type":{"type":"record","name":"child","fields":[{"name":"id","type":"string"}]}}]}"#).unwrap()
494 }
495
496 fn nested_optional_record() -> Schema {
497 Schema::parse_str(r#"{"type":"record","name":"parent","fields":[{"name":"attribute","type":["null",{"type":"record","name":"child","fields":[{"name":"id","type":"string"}]}],"default":null}]}"#).unwrap()
498 }
499
500 fn int_list_record_schema() -> Schema {
501 Schema::parse_str(r#"{"type":"record", "name":"List", "fields": [{"name": "head", "type": "int"},{"name": "tail", "type": "array", "items": "int"}]}"#).unwrap()
502 }
503
504 fn long_list_record_schema() -> Schema {
505 Schema::parse_str(
506 r#"
507 {
508 "type":"record", "name":"List", "fields": [
509 {"name": "head", "type": "long"},
510 {"name": "tail", "type": "array", "items": "long"}
511 ]}
512"#,
513 )
514 .unwrap()
515 }
516
517 fn union_schema(schemas: Vec<Schema>) -> Schema {
518 let schema_string = schemas
519 .iter()
520 .map(|s| s.canonical_form())
521 .collect::<Vec<String>>()
522 .join(",");
523 Schema::parse_str(&format!("[{schema_string}]")).unwrap()
524 }
525
526 fn empty_union_schema() -> Schema {
527 union_schema(vec![])
528 }
529
530 fn int_union_schema() -> Schema {
534 union_schema(vec![Schema::Int])
535 }
536
537 fn long_union_schema() -> Schema {
538 union_schema(vec![Schema::Long])
539 }
540
541 fn string_union_schema() -> Schema {
542 union_schema(vec![Schema::String])
543 }
544
545 fn int_string_union_schema() -> Schema {
546 union_schema(vec![Schema::Int, Schema::String])
547 }
548
549 fn string_int_union_schema() -> Schema {
550 union_schema(vec![Schema::String, Schema::Int])
551 }
552
553 #[test]
554 fn test_broken() {
555 assert_eq!(
556 Compatibility::Partial,
557 SchemaCompatibility::can_read(&int_string_union_schema(), &int_union_schema()).unwrap(),
558 "Only compatible if writer writes an int"
559 )
560 }
561
562 #[test]
563 fn test_incompatible_reader_writer_pairs() {
564 let incompatible_schemas = vec![
565 (Schema::Null, Schema::Int),
567 (Schema::Null, Schema::Long),
568 (Schema::Boolean, Schema::Int),
570 (Schema::Int, Schema::Null),
572 (Schema::Int, Schema::Boolean),
573 (Schema::Int, Schema::Long),
574 (Schema::Int, Schema::Float),
575 (Schema::Int, Schema::Double),
576 (Schema::Long, Schema::Float),
578 (Schema::Long, Schema::Double),
579 (Schema::Float, Schema::Double),
581 (Schema::String, Schema::Boolean),
583 (Schema::String, Schema::Int),
584 (Schema::Bytes, Schema::Null),
586 (Schema::Bytes, Schema::Int),
587 (Schema::TimeMicros, Schema::Int),
589 (Schema::TimestampMillis, Schema::Int),
590 (Schema::TimestampMicros, Schema::Int),
591 (Schema::TimestampNanos, Schema::Int),
592 (Schema::LocalTimestampMillis, Schema::Int),
593 (Schema::LocalTimestampMicros, Schema::Int),
594 (Schema::LocalTimestampNanos, Schema::Int),
595 (Schema::Date, Schema::Long),
596 (Schema::TimeMillis, Schema::Long),
597 (int_array_schema(), long_array_schema()),
599 (int_map_schema(), int_array_schema()),
600 (int_array_schema(), int_map_schema()),
601 (int_map_schema(), long_map_schema()),
602 (enum1_ab_schema(), enum1_abc_schema()),
604 (enum1_bc_schema(), enum1_abc_schema()),
605 (enum1_ab_schema(), enum2_ab_schema()),
606 (Schema::Int, enum2_ab_schema()),
607 (enum2_ab_schema(), Schema::Int),
608 (int_union_schema(), int_string_union_schema()),
610 (string_union_schema(), int_string_union_schema()),
611 (empty_record2_schema(), empty_record1_schema()),
613 (a_int_record1_schema(), empty_record1_schema()),
614 (a_int_b_dint_record1_schema(), empty_record1_schema()),
615 (int_list_record_schema(), long_list_record_schema()),
616 (nested_record(), nested_optional_record()),
617 ];
618
619 assert!(
620 incompatible_schemas
621 .iter()
622 .any(|(reader, writer)| SchemaCompatibility::can_read(writer, reader).is_err())
623 );
624 }
625
626 #[rstest]
627 #[case(
629 r#"{"type": "record", "name": "record_a", "fields": [{"type": "long", "name": "date"}]}"#,
630 r#"{"type": "record", "name": "record_a", "fields": [{"type": "long", "name": "date", "default": 18181}]}"#
631 )]
632 #[case(
634 r#"{"type": "fixed", "name": "EmployeeId", "size": 16}"#,
635 r#"{"type": "fixed", "name": "EmployeeId", "size": 16, "default": "u00ffffffffffffx"}"#
636 )]
637 #[case(
639 r#"{"type": "enum", "name":"Enum1", "symbols": ["A","B"]}"#,
640 r#"{"type": "enum", "name":"Enum1", "symbols": ["A","B", "C"], "default": "C"}"#
641 )]
642 #[case(
644 r#"{"type": "map", "values": "int"}"#,
645 r#"{"type": "map", "values": "long"}"#
646 )]
647 #[case(r#"{"type": "int"}"#, r#"{"type": "int", "logicalType": "date"}"#)]
649 #[case(
651 r#"{"type": "int"}"#,
652 r#"{"type": "int", "logicalType": "time-millis"}"#
653 )]
654 #[case(
656 r#"{"type": "long"}"#,
657 r#"{"type": "long", "logicalType": "time-micros"}"#
658 )]
659 #[case(
661 r#"{"type": "long"}"#,
662 r#"{"type": "long", "logicalType": "timestamp-nanos"}"#
663 )]
664 #[case(
666 r#"{"type": "long"}"#,
667 r#"{"type": "long", "logicalType": "timestamp-millis"}"#
668 )]
669 #[case(
671 r#"{"type": "long"}"#,
672 r#"{"type": "long", "logicalType": "timestamp-micros"}"#
673 )]
674 #[case(
676 r#"{"type": "long"}"#,
677 r#"{"type": "long", "logicalType": "local-timestamp-millis"}"#
678 )]
679 #[case(
681 r#"{"type": "long"}"#,
682 r#"{"type": "long", "logicalType": "local-timestamp-micros"}"#
683 )]
684 #[case(
686 r#"{"type": "long"}"#,
687 r#"{"type": "long", "logicalType": "local-timestamp-nanos"}"#
688 )]
689 #[case(
691 r#"{"type": "array", "items": "int"}"#,
692 r#"{"type": "array", "items": "long"}"#
693 )]
694 fn test_avro_3950_match_schemas_ok(
695 #[case] writer_schema_str: &str,
696 #[case] reader_schema_str: &str,
697 ) {
698 let writer_schema = Schema::parse_str(writer_schema_str).unwrap();
699 let reader_schema = Schema::parse_str(reader_schema_str).unwrap();
700
701 assert!(SchemaCompatibility::can_read(&writer_schema, &reader_schema).is_ok());
702 }
703
704 #[rstest]
705 #[case(
707 r#"{"type": "record", "name":"record_a", "fields": [{"type": "long", "name": "date"}]}"#,
708 r#"{"type": "record", "name":"record_b", "fields": [{"type": "long", "name": "date"}]}"#,
709 CompatibilityError::NameMismatch{writer_name: String::from("record_a"), reader_name: String::from("record_b")}
710 )]
711 #[case(
713 r#"{"type": "fixed", "name": "EmployeeId", "size": 16}"#,
714 r#"{"type": "fixed", "name": "EmployeeId", "size": 20}"#,
715 CompatibilityError::FixedMismatch
716 )]
717 #[case(
719 r#"{"type": "enum", "name": "Enum1", "symbols": ["A","B"]}"#,
720 r#"{"type": "enum", "name": "Enum2", "symbols": ["A","B"]}"#,
721 CompatibilityError::NameMismatch{writer_name: String::from("Enum1"), reader_name: String::from("Enum2")}
722 )]
723 #[case(
725 r#"{"type":"map", "values": "long"}"#,
726 r#"{"type":"map", "values": "int"}"#,
727 CompatibilityError::WrongType { writer_schema_type: "Long".to_string(), reader_schema_type: "Int".to_string() }
728 )]
729 #[case(
731 r#"{"type": "array", "items": "long"}"#,
732 r#"{"type": "array", "items": "int"}"#,
733 CompatibilityError::WrongType { writer_schema_type: "Long".to_string(), reader_schema_type: "Int".to_string() }
734 )]
735 #[case(
737 r#"{"type": "string"}"#,
738 r#"{"type": "int", "logicalType": "date"}"#,
739 CompatibilityError::WrongType { writer_schema_type: "String".to_string(), reader_schema_type: "Date".to_string() }
740 )]
741 #[case(
743 r#"{"type": "string"}"#,
744 r#"{"type": "int", "logicalType": "time-millis"}"#,
745 CompatibilityError::WrongType { writer_schema_type: "String".to_string(), reader_schema_type: "TimeMillis".to_string() }
746 )]
747 #[case(
749 r#"{"type": "string"}"#,
750 r#"{"type": "long", "logicalType": "time-micros"}"#,
751 CompatibilityError::WrongType { writer_schema_type: "String".to_string(), reader_schema_type: "TimeMicros".to_string() }
752 )]
753 #[case(
755 r#"{"type": "string"}"#,
756 r#"{"type": "long", "logicalType": "timestamp-nanos"}"#,
757 CompatibilityError::WrongType { writer_schema_type: "String".to_string(), reader_schema_type: "TimestampNanos".to_string() }
758 )]
759 #[case(
761 r#"{"type": "string"}"#,
762 r#"{"type": "long", "logicalType": "timestamp-millis"}"#,
763 CompatibilityError::WrongType { writer_schema_type: "String".to_string(), reader_schema_type: "TimestampMillis".to_string() }
764 )]
765 #[case(
767 r#"{"type": "string"}"#,
768 r#"{"type": "long", "logicalType": "timestamp-micros"}"#,
769 CompatibilityError::WrongType { writer_schema_type: "String".to_string(), reader_schema_type: "TimestampMicros".to_string() }
770 )]
771 #[case(
773 r#"{"type": "string"}"#,
774 r#"{"type": "long", "logicalType": "local-timestamp-millis"}"#,
775 CompatibilityError::WrongType { writer_schema_type: "String".to_string(), reader_schema_type: "LocalTimestampMillis".to_string() }
776 )]
777 #[case(
779 r#"{"type": "string"}"#,
780 r#"{"type": "long", "logicalType": "local-timestamp-micros"}"#,
781 CompatibilityError::WrongType { writer_schema_type: "String".to_string(), reader_schema_type: "LocalTimestampMicros".to_string() }
782 )]
783 #[case(
785 r#"{"type": "string"}"#,
786 r#"{"type": "long", "logicalType": "local-timestamp-nanos"}"#,
787 CompatibilityError::WrongType { writer_schema_type: "String".to_string(), reader_schema_type: "LocalTimestampNanos".to_string() }
788 )]
789 #[case(
791 r#"{"type": "record", "name":"record_b", "fields": [{"type": "long", "name": "date"}]}"#,
792 r#"{"type": "fixed", "name": "EmployeeId", "size": 16}"#,
793 CompatibilityError::NameMismatch { writer_name: "record_b".to_string(), reader_name: "EmployeeId".to_string() }
794 )]
795 fn test_avro_3950_match_schemas_error(
796 #[case] writer_schema_str: &str,
797 #[case] reader_schema_str: &str,
798 #[case] expected_error: CompatibilityError,
799 ) {
800 let writer_schema = Schema::parse_str(writer_schema_str).unwrap();
801 let reader_schema = Schema::parse_str(reader_schema_str).unwrap();
802
803 assert_eq!(
804 expected_error,
805 SchemaCompatibility::can_read(&writer_schema, &reader_schema).unwrap_err()
806 )
807 }
808
809 #[test]
810 fn test_compatible_reader_writer_pairs() {
811 let uuid_fixed = FixedSchema {
812 name: Name::new("uuid_fixed").unwrap(),
813 aliases: None,
814 doc: None,
815 size: 16,
816 default: None,
817 attributes: Default::default(),
818 };
819
820 let compatible_schemas = vec![
821 (Schema::Null, Schema::Null),
822 (Schema::Long, Schema::Int),
823 (Schema::Float, Schema::Int),
824 (Schema::Float, Schema::Long),
825 (Schema::Double, Schema::Long),
826 (Schema::Double, Schema::Int),
827 (Schema::Double, Schema::Float),
828 (Schema::String, Schema::Bytes),
829 (Schema::Bytes, Schema::String),
830 (
832 Schema::Uuid(UuidSchema::String),
833 Schema::Uuid(UuidSchema::String),
834 ),
835 (Schema::Uuid(UuidSchema::String), Schema::String),
836 (Schema::String, Schema::Uuid(UuidSchema::String)),
837 (
838 Schema::Uuid(UuidSchema::Bytes),
839 Schema::Uuid(UuidSchema::Bytes),
840 ),
841 (Schema::Uuid(UuidSchema::Bytes), Schema::Bytes),
842 (Schema::Bytes, Schema::Uuid(UuidSchema::Bytes)),
843 (
844 Schema::Uuid(UuidSchema::Fixed(uuid_fixed.clone())),
845 Schema::Uuid(UuidSchema::Fixed(uuid_fixed.clone())),
846 ),
847 (
848 Schema::Uuid(UuidSchema::Fixed(uuid_fixed.clone())),
849 Schema::Fixed(uuid_fixed.clone()),
850 ),
851 (
852 Schema::Fixed(uuid_fixed.clone()),
853 Schema::Uuid(UuidSchema::Fixed(uuid_fixed.clone())),
854 ),
855 (Schema::Date, Schema::Int),
856 (Schema::TimeMillis, Schema::Int),
857 (Schema::TimeMicros, Schema::Long),
858 (Schema::TimestampMillis, Schema::Long),
859 (Schema::TimestampMicros, Schema::Long),
860 (Schema::TimestampNanos, Schema::Long),
861 (Schema::LocalTimestampMillis, Schema::Long),
862 (Schema::LocalTimestampMicros, Schema::Long),
863 (Schema::LocalTimestampNanos, Schema::Long),
864 (Schema::Int, Schema::Date),
865 (Schema::Int, Schema::TimeMillis),
866 (Schema::Long, Schema::TimeMicros),
867 (Schema::Long, Schema::TimestampMillis),
868 (Schema::Long, Schema::TimestampMicros),
869 (Schema::Long, Schema::TimestampNanos),
870 (Schema::Long, Schema::LocalTimestampMillis),
871 (Schema::Long, Schema::LocalTimestampMicros),
872 (Schema::Long, Schema::LocalTimestampNanos),
873 (int_array_schema(), int_array_schema()),
874 (long_array_schema(), int_array_schema()),
875 (int_map_schema(), int_map_schema()),
876 (long_map_schema(), int_map_schema()),
877 (enum1_ab_schema(), enum1_ab_schema()),
878 (enum1_abc_schema(), enum1_ab_schema()),
879 (empty_union_schema(), empty_union_schema()),
880 (int_union_schema(), int_union_schema()),
881 (int_string_union_schema(), string_int_union_schema()),
882 (int_union_schema(), empty_union_schema()),
883 (long_union_schema(), int_union_schema()),
884 (int_union_schema(), Schema::Int),
885 (Schema::Int, int_union_schema()),
886 (empty_record1_schema(), empty_record1_schema()),
887 (empty_record1_schema(), a_int_record1_schema()),
888 (a_int_record1_schema(), a_int_record1_schema()),
889 (a_dint_record1_schema(), a_int_record1_schema()),
890 (a_dint_record1_schema(), a_dint_record1_schema()),
891 (a_int_record1_schema(), a_dint_record1_schema()),
892 (a_long_record1_schema(), a_int_record1_schema()),
893 (a_int_record1_schema(), a_int_b_int_record1_schema()),
894 (a_dint_record1_schema(), a_int_b_int_record1_schema()),
895 (a_int_b_dint_record1_schema(), a_int_record1_schema()),
896 (a_dint_b_dint_record1_schema(), empty_record1_schema()),
897 (a_dint_b_dint_record1_schema(), a_int_record1_schema()),
898 (a_int_b_int_record1_schema(), a_dint_b_dint_record1_schema()),
899 (int_list_record_schema(), int_list_record_schema()),
900 (long_list_record_schema(), long_list_record_schema()),
901 (long_list_record_schema(), int_list_record_schema()),
902 (nested_optional_record(), nested_record()),
903 ];
904
905 for (reader, writer) in compatible_schemas {
906 SchemaCompatibility::can_read(&writer, &reader).unwrap();
907 }
908 }
909
910 fn writer_schema() -> Schema {
911 Schema::parse_str(
912 r#"
913 {"type":"record", "name":"Record", "fields":[
914 {"name":"oldfield1", "type":"int"},
915 {"name":"oldfield2", "type":"string"}
916 ]}
917"#,
918 )
919 .unwrap()
920 }
921
922 #[test]
923 fn test_missing_field() -> TestResult {
924 let reader_schema = Schema::parse_str(
925 r#"
926 {"type":"record", "name":"Record", "fields":[
927 {"name":"oldfield1", "type":"int"}
928 ]}
929"#,
930 )?;
931 assert!(SchemaCompatibility::can_read(&writer_schema(), &reader_schema,).is_ok());
932 assert_eq!(
933 CompatibilityError::MissingDefaultValue(String::from("oldfield2")),
934 SchemaCompatibility::can_read(&reader_schema, &writer_schema()).unwrap_err()
935 );
936
937 Ok(())
938 }
939
940 #[test]
941 fn test_missing_second_field() -> TestResult {
942 let reader_schema = Schema::parse_str(
943 r#"
944 {"type":"record", "name":"Record", "fields":[
945 {"name":"oldfield2", "type":"string"}
946 ]}
947"#,
948 )?;
949 assert!(SchemaCompatibility::can_read(&writer_schema(), &reader_schema).is_ok());
950 assert_eq!(
951 CompatibilityError::MissingDefaultValue(String::from("oldfield1")),
952 SchemaCompatibility::can_read(&reader_schema, &writer_schema()).unwrap_err()
953 );
954
955 Ok(())
956 }
957
958 #[test]
959 fn test_all_fields() -> TestResult {
960 let reader_schema = Schema::parse_str(
961 r#"
962 {"type":"record", "name":"Record", "fields":[
963 {"name":"oldfield1", "type":"int"},
964 {"name":"oldfield2", "type":"string"}
965 ]}
966"#,
967 )?;
968 assert!(SchemaCompatibility::can_read(&writer_schema(), &reader_schema).is_ok());
969 assert!(SchemaCompatibility::can_read(&reader_schema, &writer_schema()).is_ok());
970
971 Ok(())
972 }
973
974 #[test]
975 fn test_new_field_with_default() -> TestResult {
976 let reader_schema = Schema::parse_str(
977 r#"
978 {"type":"record", "name":"Record", "fields":[
979 {"name":"oldfield1", "type":"int"},
980 {"name":"newfield1", "type":"int", "default":42}
981 ]}
982"#,
983 )?;
984 assert!(SchemaCompatibility::can_read(&writer_schema(), &reader_schema).is_ok());
985 assert_eq!(
986 CompatibilityError::MissingDefaultValue(String::from("oldfield2")),
987 SchemaCompatibility::can_read(&reader_schema, &writer_schema()).unwrap_err()
988 );
989
990 Ok(())
991 }
992
993 #[test]
994 fn test_new_field() -> TestResult {
995 let reader_schema = Schema::parse_str(
996 r#"
997 {"type":"record", "name":"Record", "fields":[
998 {"name":"oldfield1", "type":"int"},
999 {"name":"newfield1", "type":"int"}
1000 ]}
1001"#,
1002 )?;
1003 assert_eq!(
1004 CompatibilityError::MissingDefaultValue(String::from("newfield1")),
1005 SchemaCompatibility::can_read(&writer_schema(), &reader_schema).unwrap_err()
1006 );
1007 assert_eq!(
1008 CompatibilityError::MissingDefaultValue(String::from("oldfield2")),
1009 SchemaCompatibility::can_read(&reader_schema, &writer_schema()).unwrap_err()
1010 );
1011
1012 Ok(())
1013 }
1014
1015 #[test]
1016 fn test_array_writer_schema() {
1017 let valid_reader = string_array_schema();
1018 let invalid_reader = string_map_schema();
1019
1020 assert_eq!(
1021 Compatibility::Full,
1022 SchemaCompatibility::can_read(&string_array_schema(), &valid_reader).unwrap()
1023 );
1024 assert!(matches!(
1025 SchemaCompatibility::can_read(&string_array_schema(), &invalid_reader),
1026 Err(CompatibilityError::WrongType { .. }),
1027 ));
1028 }
1029
1030 #[test]
1031 fn test_primitive_writer_schema() {
1032 let valid_reader = Schema::String;
1033 assert!(SchemaCompatibility::can_read(&Schema::String, &valid_reader).is_ok());
1034 assert_eq!(
1035 CompatibilityError::WrongType {
1036 writer_schema_type: "Int".to_string(),
1037 reader_schema_type: "String".to_string()
1038 },
1039 SchemaCompatibility::can_read(&Schema::Int, &Schema::String).unwrap_err()
1040 );
1041 }
1042
1043 #[test]
1044 fn test_union_reader_writer_subset_incompatibility() {
1045 let union_writer = union_schema(vec![Schema::Int, Schema::String]);
1047 let union_reader = union_schema(vec![Schema::String]);
1048
1049 assert_eq!(
1050 Compatibility::Partial,
1051 SchemaCompatibility::can_read(&union_writer, &union_reader).unwrap()
1052 );
1053 assert_eq!(
1054 Compatibility::Full,
1055 SchemaCompatibility::can_read(&union_reader, &union_writer).unwrap()
1056 );
1057 }
1058
1059 #[test]
1060 fn test_incompatible_record_field() -> TestResult {
1061 let string_schema = Schema::parse_str(
1062 r#"
1063 {"type":"record", "name":"MyRecord", "namespace":"ns", "fields": [
1064 {"name":"field1", "type":"string"}
1065 ]}
1066 "#,
1067 )?;
1068
1069 let int_schema = Schema::parse_str(
1070 r#"
1071 {"type":"record", "name":"MyRecord", "namespace":"ns", "fields": [
1072 {"name":"field1", "type":"int"}
1073 ]}
1074 "#,
1075 )?;
1076
1077 assert_eq!(
1078 CompatibilityError::FieldTypeMismatch(
1079 "field1".to_owned(),
1080 Box::new(CompatibilityError::WrongType {
1081 writer_schema_type: "String".to_string(),
1082 reader_schema_type: "Int".to_string()
1083 })
1084 ),
1085 SchemaCompatibility::can_read(&string_schema, &int_schema).unwrap_err()
1086 );
1087
1088 Ok(())
1089 }
1090
1091 #[test]
1092 fn test_enum_symbols() -> TestResult {
1093 let enum_schema1 = Schema::parse_str(
1094 r#"
1095 {"type":"enum", "name":"MyEnum", "symbols":["A","B"]}
1096"#,
1097 )?;
1098 let enum_schema2 =
1099 Schema::parse_str(r#"{"type":"enum", "name":"MyEnum", "symbols":["A","B","C"]}"#)?;
1100 assert_eq!(
1101 Compatibility::Partial,
1102 SchemaCompatibility::can_read(&enum_schema2, &enum_schema1)?
1103 );
1104 assert_eq!(
1105 Compatibility::Full,
1106 SchemaCompatibility::can_read(&enum_schema1, &enum_schema2)?
1107 );
1108
1109 Ok(())
1110 }
1111
1112 fn point_2d_schema() -> Schema {
1113 Schema::parse_str(
1114 r#"
1115 {"type":"record", "name":"Point2D", "fields":[
1116 {"name":"x", "type":"double"},
1117 {"name":"y", "type":"double"}
1118 ]}
1119 "#,
1120 )
1121 .unwrap()
1122 }
1123
1124 fn point_2d_fullname_schema() -> Schema {
1125 Schema::parse_str(
1126 r#"
1127 {"type":"record", "name":"Point", "namespace":"written", "fields":[
1128 {"name":"x", "type":"double"},
1129 {"name":"y", "type":"double"}
1130 ]}
1131 "#,
1132 )
1133 .unwrap()
1134 }
1135
1136 fn point_3d_no_default_schema() -> Schema {
1137 Schema::parse_str(
1138 r#"
1139 {"type":"record", "name":"Point", "fields":[
1140 {"name":"x", "type":"double"},
1141 {"name":"y", "type":"double"},
1142 {"name":"z", "type":"double"}
1143 ]}
1144 "#,
1145 )
1146 .unwrap()
1147 }
1148
1149 fn point_3d_schema() -> Schema {
1150 Schema::parse_str(
1151 r#"
1152 {"type":"record", "name":"Point3D", "fields":[
1153 {"name":"x", "type":"double"},
1154 {"name":"y", "type":"double"},
1155 {"name":"z", "type":"double", "default": 0.0}
1156 ]}
1157 "#,
1158 )
1159 .unwrap()
1160 }
1161
1162 fn point_3d_match_name_schema() -> Schema {
1163 Schema::parse_str(
1164 r#"
1165 {"type":"record", "name":"Point", "fields":[
1166 {"name":"x", "type":"double"},
1167 {"name":"y", "type":"double"},
1168 {"name":"z", "type":"double", "default": 0.0}
1169 ]}
1170 "#,
1171 )
1172 .unwrap()
1173 }
1174
1175 #[test]
1176 fn test_union_resolution_no_structure_match() {
1177 let read_schema = union_schema(vec![Schema::Null, point_3d_no_default_schema()]);
1179 assert_eq!(
1180 CompatibilityError::SchemaMismatchAllUnionElements,
1181 SchemaCompatibility::can_read(&point_2d_fullname_schema(), &read_schema).unwrap_err()
1182 );
1183 }
1184
1185 #[test]
1186 fn test_union_resolution_first_structure_match_2d() {
1187 let read_schema = union_schema(vec![
1189 Schema::Null,
1190 point_3d_no_default_schema(),
1191 point_2d_schema(),
1192 point_3d_schema(),
1193 ]);
1194 assert_eq!(
1195 CompatibilityError::SchemaMismatchAllUnionElements,
1196 SchemaCompatibility::can_read(&point_2d_fullname_schema(), &read_schema).unwrap_err()
1197 );
1198 }
1199
1200 #[test]
1201 fn test_union_resolution_first_structure_match_3d() {
1202 let read_schema = union_schema(vec![
1204 Schema::Null,
1205 point_3d_no_default_schema(),
1206 point_3d_schema(),
1207 point_2d_schema(),
1208 ]);
1209 assert_eq!(
1210 CompatibilityError::SchemaMismatchAllUnionElements,
1211 SchemaCompatibility::can_read(&point_2d_fullname_schema(), &read_schema).unwrap_err()
1212 );
1213 }
1214
1215 #[test]
1216 fn test_union_resolution_named_structure_match() {
1217 let read_schema = union_schema(vec![
1219 Schema::Null,
1220 point_2d_schema(),
1221 point_3d_match_name_schema(),
1222 point_3d_schema(),
1223 ]);
1224 assert_eq!(
1225 CompatibilityError::SchemaMismatchAllUnionElements,
1226 SchemaCompatibility::can_read(&point_2d_fullname_schema(), &read_schema).unwrap_err()
1227 );
1228 }
1229
1230 #[test]
1231 fn test_union_resolution_full_name_match() {
1232 let read_schema = union_schema(vec![
1234 Schema::Null,
1235 point_2d_schema(),
1236 point_3d_match_name_schema(),
1237 point_3d_schema(),
1238 point_2d_fullname_schema(),
1239 ]);
1240 assert!(SchemaCompatibility::can_read(&point_2d_fullname_schema(), &read_schema).is_ok());
1241 }
1242
1243 #[test]
1244 fn test_avro_3772_enum_default() -> TestResult {
1245 let writer_raw_schema = r#"
1246 {
1247 "type": "record",
1248 "name": "test",
1249 "fields": [
1250 {"name": "a", "type": "long", "default": 42},
1251 {"name": "b", "type": "string"},
1252 {
1253 "name": "c",
1254 "type": {
1255 "type": "enum",
1256 "name": "suit",
1257 "symbols": ["diamonds", "spades", "clubs", "hearts"],
1258 "default": "spades"
1259 }
1260 }
1261 ]
1262 }
1263 "#;
1264
1265 let reader_raw_schema = r#"
1266 {
1267 "type": "record",
1268 "name": "test",
1269 "fields": [
1270 {"name": "a", "type": "long", "default": 42},
1271 {"name": "b", "type": "string"},
1272 {
1273 "name": "c",
1274 "type": {
1275 "type": "enum",
1276 "name": "suit",
1277 "symbols": ["diamonds", "spades", "ninja", "hearts"],
1278 "default": "spades"
1279 }
1280 }
1281 ]
1282 }
1283 "#;
1284 let writer_schema = Schema::parse_str(writer_raw_schema)?;
1285 let reader_schema = Schema::parse_str(reader_raw_schema)?;
1286 let mut writer = Writer::with_codec(&writer_schema, Vec::new(), Codec::Null)?;
1287 let mut record = Record::new(writer.schema()).unwrap();
1288 record.put("a", 27i64);
1289 record.put("b", "foo");
1290 record.put("c", "clubs");
1291 writer.append_value(record).unwrap();
1292 let input = writer.into_inner()?;
1293 let mut reader = Reader::with_schema(&reader_schema, &input[..])?;
1294 assert_eq!(
1295 reader.next().unwrap().unwrap(),
1296 Value::Record(vec![
1297 ("a".to_string(), Value::Long(27)),
1298 ("b".to_string(), Value::String("foo".to_string())),
1299 ("c".to_string(), Value::Enum(1, "spades".to_string())),
1300 ])
1301 );
1302 assert!(reader.next().is_none());
1303
1304 Ok(())
1305 }
1306
1307 #[test]
1308 fn test_avro_3772_enum_default_less_symbols() -> TestResult {
1309 let writer_raw_schema = r#"
1310 {
1311 "type": "record",
1312 "name": "test",
1313 "fields": [
1314 {"name": "a", "type": "long", "default": 42},
1315 {"name": "b", "type": "string"},
1316 {
1317 "name": "c",
1318 "type": {
1319 "type": "enum",
1320 "name": "suit",
1321 "symbols": ["diamonds", "spades", "clubs", "hearts"],
1322 "default": "spades"
1323 }
1324 }
1325 ]
1326 }
1327 "#;
1328
1329 let reader_raw_schema = r#"
1330 {
1331 "type": "record",
1332 "name": "test",
1333 "fields": [
1334 {"name": "a", "type": "long", "default": 42},
1335 {"name": "b", "type": "string"},
1336 {
1337 "name": "c",
1338 "type": {
1339 "type": "enum",
1340 "name": "suit",
1341 "symbols": ["hearts", "spades"],
1342 "default": "spades"
1343 }
1344 }
1345 ]
1346 }
1347 "#;
1348 let writer_schema = Schema::parse_str(writer_raw_schema)?;
1349 let reader_schema = Schema::parse_str(reader_raw_schema)?;
1350 let mut writer = Writer::with_codec(&writer_schema, Vec::new(), Codec::Null)?;
1351 let mut record = Record::new(writer.schema()).unwrap();
1352 record.put("a", 27i64);
1353 record.put("b", "foo");
1354 record.put("c", "hearts");
1355 writer.append_value(record).unwrap();
1356 let input = writer.into_inner()?;
1357 let mut reader = Reader::with_schema(&reader_schema, &input[..])?;
1358 assert_eq!(
1359 reader.next().unwrap().unwrap(),
1360 Value::Record(vec![
1361 ("a".to_string(), Value::Long(27)),
1362 ("b".to_string(), Value::String("foo".to_string())),
1363 ("c".to_string(), Value::Enum(0, "hearts".to_string())),
1364 ])
1365 );
1366 assert!(reader.next().is_none());
1367
1368 Ok(())
1369 }
1370
1371 #[test]
1372 fn avro_3894_take_aliases_into_account_when_serializing_for_schema_compatibility() -> TestResult
1373 {
1374 let schema_v1 = Schema::parse_str(
1375 r#"
1376 {
1377 "type": "record",
1378 "name": "Conference",
1379 "namespace": "advdaba",
1380 "fields": [
1381 {"type": "string", "name": "name"},
1382 {"type": "long", "name": "date"}
1383 ]
1384 }"#,
1385 )?;
1386
1387 let schema_v2 = Schema::parse_str(
1388 r#"
1389 {
1390 "type": "record",
1391 "name": "Conference",
1392 "namespace": "advdaba",
1393 "fields": [
1394 {"type": "string", "name": "name"},
1395 {"type": "long", "name": "date", "aliases" : [ "time" ]}
1396 ]
1397 }"#,
1398 )?;
1399
1400 assert!(SchemaCompatibility::mutual_read(&schema_v1, &schema_v2).is_ok());
1401
1402 Ok(())
1403 }
1404
1405 #[test]
1406 fn avro_3917_take_aliases_into_account_for_schema_compatibility() -> TestResult {
1407 let schema_v1 = Schema::parse_str(
1408 r#"
1409 {
1410 "type": "record",
1411 "name": "Conference",
1412 "namespace": "advdaba",
1413 "fields": [
1414 {"type": "string", "name": "name"},
1415 {"type": "long", "name": "date", "aliases" : [ "time" ]}
1416 ]
1417 }"#,
1418 )?;
1419
1420 let schema_v2 = Schema::parse_str(
1421 r#"
1422 {
1423 "type": "record",
1424 "name": "Conference",
1425 "namespace": "advdaba",
1426 "fields": [
1427 {"type": "string", "name": "name"},
1428 {"type": "long", "name": "time"}
1429 ]
1430 }"#,
1431 )?;
1432
1433 assert_eq!(
1434 Compatibility::Full,
1435 SchemaCompatibility::can_read(&schema_v2, &schema_v1)?
1436 );
1437 assert_eq!(
1438 CompatibilityError::MissingDefaultValue(String::from("time")),
1439 SchemaCompatibility::can_read(&schema_v1, &schema_v2).unwrap_err()
1440 );
1441
1442 Ok(())
1443 }
1444
1445 #[test]
1446 fn test_avro_3898_record_schemas_match_by_unqualified_name() -> TestResult {
1447 let schemas = [
1448 (
1450 Schema::parse_str(
1451 r#"{
1452 "type": "record",
1453 "name": "Statistics",
1454 "fields": [
1455 { "name": "success", "type": "int" },
1456 { "name": "fail", "type": "int" },
1457 { "name": "time", "type": "string" },
1458 { "name": "max", "type": "int", "default": 0 }
1459 ]
1460 }"#,
1461 )?,
1462 Schema::parse_str(
1463 r#"{
1464 "type": "record",
1465 "name": "Statistics",
1466 "namespace": "my.namespace",
1467 "fields": [
1468 { "name": "success", "type": "int" },
1469 { "name": "fail", "type": "int" },
1470 { "name": "time", "type": "string" },
1471 { "name": "average", "type": "int", "default": 0}
1472 ]
1473 }"#,
1474 )?,
1475 ),
1476 (
1478 Schema::parse_str(
1479 r#"{
1480 "type": "enum",
1481 "name": "Suit",
1482 "symbols": ["diamonds", "spades", "clubs"]
1483 }"#,
1484 )?,
1485 Schema::parse_str(
1486 r#"{
1487 "type": "enum",
1488 "name": "Suit",
1489 "namespace": "my.namespace",
1490 "symbols": ["diamonds", "spades", "clubs", "hearts"]
1491 }"#,
1492 )?,
1493 ),
1494 (
1496 Schema::parse_str(
1497 r#"{
1498 "type": "fixed",
1499 "name": "EmployeeId",
1500 "size": 16
1501 }"#,
1502 )?,
1503 Schema::parse_str(
1504 r#"{
1505 "type": "fixed",
1506 "name": "EmployeeId",
1507 "namespace": "my.namespace",
1508 "size": 16
1509 }"#,
1510 )?,
1511 ),
1512 ];
1513
1514 for (schema_1, schema_2) in schemas {
1515 assert!(SchemaCompatibility::can_read(&schema_1, &schema_2).is_ok());
1516 }
1517
1518 Ok(())
1519 }
1520
1521 #[test]
1522 fn test_can_read_compatibility_errors() -> TestResult {
1523 let schemas = [
1524 (
1525 Schema::parse_str(
1526 r#"{
1527 "type": "record",
1528 "name": "StatisticsMap",
1529 "fields": [
1530 {"name": "average", "type": "int", "default": 0},
1531 {"name": "success", "type": {"type": "map", "values": "int"}}
1532 ]
1533 }"#,
1534 )?,
1535 Schema::parse_str(
1536 r#"{
1537 "type": "record",
1538 "name": "StatisticsMap",
1539 "fields": [
1540 {"name": "average", "type": "int", "default": 0},
1541 {"name": "success", "type": ["null", {"type": "map", "values": "int"}], "default": null}
1542 ]
1543 }"#,
1544 )?,
1545 ),
1546 (
1547 Schema::parse_str(
1548 r#"{
1549 "type": "record",
1550 "name": "StatisticsArray",
1551 "fields": [
1552 {"name": "max_values", "type": {"type": "array", "items": "int"}}
1553 ]
1554 }"#,
1555 )?,
1556 Schema::parse_str(
1557 r#"{
1558 "type": "record",
1559 "name": "StatisticsArray",
1560 "fields": [
1561 {"name": "max_values", "type": ["null", {"type": "array", "items": "int"}], "default": null}
1562 ]
1563 }"#,
1564 )?,
1565 ),
1566 ];
1567
1568 for (schema_1, schema_2) in schemas {
1569 assert_eq!(
1570 Compatibility::Full,
1571 SchemaCompatibility::can_read(&schema_1, &schema_2).unwrap()
1572 );
1573 assert_eq!(
1574 Compatibility::Partial,
1575 SchemaCompatibility::can_read(&schema_2, &schema_1).unwrap()
1576 );
1577 }
1578
1579 Ok(())
1580 }
1581
1582 #[test]
1583 fn avro_3974_can_read_schema_references() -> TestResult {
1584 let schema_strs = vec![
1585 r#"{
1586 "type": "record",
1587 "name": "Child",
1588 "namespace": "avro",
1589 "fields": [
1590 {
1591 "name": "val",
1592 "type": "int"
1593 }
1594 ]
1595 }
1596 "#,
1597 r#"{
1598 "type": "record",
1599 "name": "Parent",
1600 "namespace": "avro",
1601 "fields": [
1602 {
1603 "name": "child",
1604 "type": "avro.Child"
1605 }
1606 ]
1607 }
1608 "#,
1609 ];
1610
1611 let schemas = Schema::parse_list(schema_strs).unwrap();
1612 SchemaCompatibility::can_read(&schemas[1], &schemas[1])?;
1613
1614 Ok(())
1615 }
1616
1617 #[test]
1618 fn duration_and_fixed_of_different_size() -> TestResult {
1619 let schema_strs = vec![
1620 r#"{
1621 "type": "fixed",
1622 "name": "Fixed25",
1623 "size": 25
1624 }
1625 "#,
1626 r#"{
1627 "type": "fixed",
1628 "logicalType": "duration",
1629 "name": "Duration",
1630 "size": 12
1631 }
1632 "#,
1633 ];
1634
1635 let schemas = Schema::parse_list(schema_strs).unwrap();
1636 assert!(SchemaCompatibility::can_read(&schemas[0], &schemas[1]).is_err());
1637 assert!(SchemaCompatibility::can_read(&schemas[1], &schemas[0]).is_err());
1638 SchemaCompatibility::can_read(&schemas[1], &schemas[1])?;
1639 SchemaCompatibility::can_read(&schemas[0], &schemas[0])?;
1640
1641 Ok(())
1642 }
1643
1644 #[test]
1645 fn avro_rs_342_decimal_fixed_and_bytes() -> TestResult {
1646 let bytes = Schema::Decimal(DecimalSchema {
1647 precision: 20,
1648 scale: 0,
1649 inner: InnerDecimalSchema::Bytes,
1650 });
1651 let fixed = Schema::Decimal(DecimalSchema {
1652 precision: 20,
1653 scale: 0,
1654 inner: InnerDecimalSchema::Fixed(FixedSchema {
1655 name: Name::new("DecimalFixed")?,
1656 aliases: None,
1657 doc: None,
1658 size: 20,
1659 default: None,
1660 attributes: BTreeMap::default(),
1661 }),
1662 });
1663
1664 assert_eq!(
1665 Compatibility::Full,
1666 SchemaCompatibility::mutual_read(&bytes, &fixed)?
1667 );
1668
1669 let value = Value::Decimal(Decimal::from(vec![1; 10]));
1670 let fixed_value = value.clone().resolve(&fixed)?;
1671 let bytes_value = value.resolve(&bytes)?;
1672
1673 assert_eq!(fixed_value, bytes_value);
1674
1675 Ok(())
1676 }
1677}