1use crate::{
61 error::CompatibilityError,
62 schema::{
63 ArraySchema, DecimalSchema, EnumSchema, InnerDecimalSchema, MapSchema, RecordSchema,
64 Schema, UuidSchema,
65 },
66};
67use std::{
68 collections::{HashMap, hash_map::DefaultHasher},
69 hash::Hasher,
70 iter::once,
71 ops::BitAndAssign,
72 ptr,
73};
74
75pub struct SchemaCompatibility;
81
82impl SchemaCompatibility {
83 pub fn can_read(
85 writers_schema: &Schema,
86 readers_schema: &Schema,
87 ) -> Result<Compatibility, CompatibilityError> {
88 let mut c = Checker::new();
89 c.can_read(writers_schema, readers_schema)
90 }
91
92 pub fn mutual_read(
94 schema_a: &Schema,
95 schema_b: &Schema,
96 ) -> Result<Compatibility, CompatibilityError> {
97 let mut c = SchemaCompatibility::can_read(schema_a, schema_b)?;
98 c &= SchemaCompatibility::can_read(schema_b, schema_a)?;
99 Ok(c)
100 }
101}
102
103#[derive(Debug, Copy, Clone, Eq, PartialEq)]
105pub enum Compatibility {
106 Full,
108 Partial,
112}
113
114impl BitAndAssign for Compatibility {
115 fn bitand_assign(&mut self, rhs: Self) {
123 match (*self, rhs) {
124 (Self::Full, Self::Full) => *self = Self::Full,
125 _ => *self = Self::Partial,
126 }
127 }
128}
129
130struct Checker {
131 recursion: HashMap<(u64, u64), Compatibility>,
132}
133
134impl Checker {
135 pub(crate) fn new() -> Self {
137 Self {
138 recursion: HashMap::new(),
139 }
140 }
141
142 pub(crate) fn full_match_schemas(
144 &mut self,
145 writers_schema: &Schema,
146 readers_schema: &Schema,
147 ) -> Result<Compatibility, CompatibilityError> {
148 let key = (
151 Self::pointer_hash(writers_schema),
152 Self::pointer_hash(readers_schema),
153 );
154
155 if let Some(c) = self.recursion.get(&key).copied() {
157 Ok(c)
158 } else {
159 let c = self.inner_full_match_schemas(writers_schema, readers_schema)?;
160 self.recursion.insert(key, c);
162 Ok(c)
163 }
164 }
165
166 fn pointer_hash(schema: &Schema) -> u64 {
168 let mut hasher = DefaultHasher::new();
169 ptr::hash(schema, &mut hasher);
170 hasher.finish()
171 }
172
173 #[rustfmt::skip]
177 fn inner_full_match_schemas(
178 &mut self,
179 writers_schema: &Schema,
180 readers_schema: &Schema,
181 ) -> Result<Compatibility, CompatibilityError> {
182 if let Some(w_name) = writers_schema.name()
184 && let Some(r_name) = readers_schema.name()
185 && w_name.name != r_name.name
186 {
187 return Err(CompatibilityError::NameMismatch {
188 writer_name: w_name.name.clone(),
189 reader_name: r_name.name.clone(),
190 });
191 }
192
193 match (writers_schema, readers_schema) {
195 (Schema::Ref { name: w_name }, Schema::Ref { name: r_name }) => {
196 if r_name == w_name {
197 Ok(Compatibility::Full)
198 } else {
199 Err(CompatibilityError::NameMismatch {
200 writer_name: w_name.fullname(None),
201 reader_name: r_name.fullname(None),
202 })
203 }
204 }
205 (Schema::Union(writer), Schema::Union(reader)) => {
206 let mut any = false;
207 let mut all = true;
208 for writer in &writer.schemas {
209 let mut local_any = false;
212 all &= reader.schemas.iter().any(|reader| {
213 match self.full_match_schemas(writer, reader) {
214 Ok(Compatibility::Full) => {
215 local_any = true;
216 true
217 }
218 Ok(Compatibility::Partial) => {
219 local_any = true;
220 false
221 }
222 Err(_) => false,
223 }
224 });
225 any |= local_any;
227 }
228 if all {
229 Ok(Compatibility::Full)
231 } else if any {
232 Ok(Compatibility::Partial)
234 } else {
235 Err(CompatibilityError::MissingUnionElements)
236 }
237 }
238 (Schema::Union(writer), _) => {
239 let mut any = false;
242 let mut all = true;
243 for writer in &writer.schemas {
244 match self.full_match_schemas(writer, readers_schema) {
245 Ok(Compatibility::Full) => any = true,
246 Ok(Compatibility::Partial) => {
247 any = true;
248 all = false;
249 }
250 Err(_) => {
251 all = false;
252 }
253 }
254 }
255 if all {
256 Ok(Compatibility::Full)
258 } else if any {
259 Ok(Compatibility::Partial)
261 } else {
262 Err(CompatibilityError::SchemaMismatchAllUnionElements)
263 }
264 }
265 (_, Schema::Union(reader)) => {
266 let mut partial = false;
269 if reader.schemas.iter().any(|reader| {
270 match self.full_match_schemas(writers_schema, reader) {
271 Ok(Compatibility::Full) => true,
272 Ok(Compatibility::Partial) => {
273 partial = true;
274 false
275 }
276 Err(_) => false,
277 }
278 }) {
279 Ok(Compatibility::Full)
281 } else if partial {
282 Ok(Compatibility::Partial)
284 } else {
285 Err(CompatibilityError::SchemaMismatchAllUnionElements)
286 }
287 }
288 (Schema::Null, Schema::Null) => Ok(Compatibility::Full),
289 (Schema::Boolean, Schema::Boolean) => Ok(Compatibility::Full),
290 (
292 Schema::Int | Schema::Date | Schema::TimeMillis,
293 Schema::Int | Schema::Long | Schema::Float | Schema::Double | Schema::Date
294 | Schema::TimeMillis | Schema::TimeMicros | Schema::TimestampMillis
295 | Schema::TimestampMicros | Schema::TimestampNanos | Schema::LocalTimestampMillis
296 | Schema::LocalTimestampMicros | Schema::LocalTimestampNanos,
297 ) => Ok(Compatibility::Full),
298 (
300 Schema::Long | Schema::TimeMicros | Schema::TimestampMillis
301 | Schema::TimestampMicros | Schema::TimestampNanos | Schema::LocalTimestampMillis
302 | Schema::LocalTimestampMicros | Schema::LocalTimestampNanos,
303 Schema::Long | Schema::Float | Schema::Double | Schema::TimeMicros
304 | Schema::TimestampMillis | Schema::TimestampMicros | Schema::TimestampNanos
305 | Schema::LocalTimestampMillis | Schema::LocalTimestampMicros
306 | Schema::LocalTimestampNanos,
307 ) => Ok(Compatibility::Full),
308 (Schema::Float, Schema::Float | Schema::Double) => Ok(Compatibility::Full),
310 (Schema::Double, Schema::Double) => Ok(Compatibility::Full),
311 (
315 Schema::Decimal(DecimalSchema { precision: w_precision, scale: w_scale, .. }),
316 Schema::Decimal(DecimalSchema { precision: r_precision, scale: r_scale, .. }),
317 ) => {
318 if r_precision == w_precision && r_scale == w_scale {
320 Ok(Compatibility::Full)
321 } else {
322 Err(CompatibilityError::DecimalMismatch {
323 r_precision: *r_precision,
324 r_scale: *r_scale,
325 w_precision: *w_precision,
326 w_scale: *w_scale
327 })
328 }
329 }
330 (
332 Schema::Bytes | Schema::String | Schema::BigDecimal
333 | Schema::Uuid(UuidSchema::String | UuidSchema::Bytes)
334 | Schema::Decimal(DecimalSchema { inner: InnerDecimalSchema::Bytes, .. }),
335 Schema::Bytes | Schema::String | Schema::BigDecimal
336 | Schema::Uuid(UuidSchema::String | UuidSchema::Bytes)
337 | Schema::Decimal(DecimalSchema { inner: InnerDecimalSchema::Bytes, .. }),
338 ) => Ok(Compatibility::Full),
339 (Schema::Uuid(_), Schema::Uuid(_)) => Ok(Compatibility::Full),
340 (
341 Schema::Fixed(w_fixed) | Schema::Uuid(UuidSchema::Fixed(w_fixed))
342 | Schema::Decimal(DecimalSchema { inner: InnerDecimalSchema::Fixed(w_fixed), .. })
343 | Schema::Duration(w_fixed),
344 Schema::Fixed(r_fixed) | Schema::Uuid(UuidSchema::Fixed(r_fixed))
345 | Schema::Decimal(DecimalSchema { inner: InnerDecimalSchema::Fixed(r_fixed), .. })
346 | Schema::Duration(r_fixed),
347 ) => {
348 if r_fixed.size == w_fixed.size {
350 Ok(Compatibility::Full)
351 } else {
352 Err(CompatibilityError::FixedMismatch)
353 }
354 }
355 (
356 Schema::Array(ArraySchema { items: w_items, .. }),
357 Schema::Array(ArraySchema { items: r_items, .. }),
358 ) => {
359 self.full_match_schemas(w_items, r_items)
361 }
362 (
363 Schema::Map(MapSchema { types: w_types, .. }),
364 Schema::Map(MapSchema { types: r_types, .. }),
365 ) => {
366 self.full_match_schemas(w_types, r_types)
368 }
369 (
370 Schema::Enum(EnumSchema { symbols: w_symbols, .. }),
371 Schema::Enum(EnumSchema { symbols: r_symbols, default: r_default, .. }),
372 ) => {
373 if r_default.is_some() {
375 Ok(Compatibility::Full)
377 } else {
378 let mut any = false;
379 let mut all = true;
380 w_symbols.iter().for_each(|s| {
381 let res = r_symbols.contains(s);
382 any |= res;
383 all &= res;
384 });
385 if all {
386 Ok(Compatibility::Full)
388 } else if any {
389 Ok(Compatibility::Partial)
391 } else {
392 Err(CompatibilityError::MissingSymbols)
394 }
395 }
396 }
397 (
398 Schema::Record(RecordSchema { fields: w_fields, .. }),
399 Schema::Record(RecordSchema { fields: r_fields, .. }),
400 ) => {
401 let mut compatibility = Compatibility::Full;
402 for r_field in r_fields {
403 if let Some(w_field) = once(&r_field.name)
407 .chain(r_field.aliases.as_deref().unwrap_or(&[]).iter())
408 .find_map(|ra| w_fields.iter().find(|wf| &wf.name == ra))
409 {
410 match self.full_match_schemas(&w_field.schema, &r_field.schema) {
412 Ok(c) => compatibility &= c,
413 Err(err) => {
414 return Err(CompatibilityError::FieldTypeMismatch(
415 r_field.name.clone(),
416 Box::new(err),
417 ));
418 }
419 }
420 } else if r_field.default.is_none() {
421 return Err(CompatibilityError::MissingDefaultValue(
423 r_field.name.clone(),
424 ));
425 }
426 }
427 Ok(compatibility)
428 }
429 (_, _) => Err(CompatibilityError::WrongType {
430 writer_schema_type: format!("{writers_schema:#?}"),
431 reader_schema_type: format!("{readers_schema:#?}"),
432 }),
433 }
434 }
435
436 pub(crate) fn can_read(
437 &mut self,
438 writers_schema: &Schema,
439 readers_schema: &Schema,
440 ) -> Result<Compatibility, CompatibilityError> {
441 self.full_match_schemas(writers_schema, readers_schema)
442 }
443}
444
445#[cfg(test)]
446mod tests {
447 use std::collections::BTreeMap;
448
449 use super::*;
450 use crate::{
451 Codec, Decimal, Reader, Writer,
452 schema::{FixedSchema, Name, UuidSchema},
453 types::{Record, Value},
454 };
455 use apache_avro_test_helper::TestResult;
456 use rstest::*;
457
458 fn int_array_schema() -> Schema {
459 Schema::parse_str(r#"{"type":"array", "items":"int"}"#).unwrap()
460 }
461
462 fn long_array_schema() -> Schema {
463 Schema::parse_str(r#"{"type":"array", "items":"long"}"#).unwrap()
464 }
465
466 fn string_array_schema() -> Schema {
467 Schema::parse_str(r#"{"type":"array", "items":"string"}"#).unwrap()
468 }
469
470 fn int_map_schema() -> Schema {
471 Schema::parse_str(r#"{"type":"map", "values":"int"}"#).unwrap()
472 }
473
474 fn long_map_schema() -> Schema {
475 Schema::parse_str(r#"{"type":"map", "values":"long"}"#).unwrap()
476 }
477
478 fn string_map_schema() -> Schema {
479 Schema::parse_str(r#"{"type":"map", "values":"string"}"#).unwrap()
480 }
481
482 fn enum1_ab_schema() -> Schema {
483 Schema::parse_str(r#"{"type":"enum", "name":"Enum1", "symbols":["A","B"]}"#).unwrap()
484 }
485
486 fn enum1_abc_schema() -> Schema {
487 Schema::parse_str(r#"{"type":"enum", "name":"Enum1", "symbols":["A","B","C"]}"#).unwrap()
488 }
489
490 fn enum1_bc_schema() -> Schema {
491 Schema::parse_str(r#"{"type":"enum", "name":"Enum1", "symbols":["B","C"]}"#).unwrap()
492 }
493
494 fn enum2_ab_schema() -> Schema {
495 Schema::parse_str(r#"{"type":"enum", "name":"Enum2", "symbols":["A","B"]}"#).unwrap()
496 }
497
498 fn empty_record1_schema() -> Schema {
499 Schema::parse_str(r#"{"type":"record", "name":"Record1", "fields":[]}"#).unwrap()
500 }
501
502 fn empty_record2_schema() -> Schema {
503 Schema::parse_str(r#"{"type":"record", "name":"Record2", "fields": []}"#).unwrap()
504 }
505
506 fn a_int_record1_schema() -> Schema {
507 Schema::parse_str(
508 r#"{"type":"record", "name":"Record1", "fields":[{"name":"a", "type":"int"}]}"#,
509 )
510 .unwrap()
511 }
512
513 fn a_long_record1_schema() -> Schema {
514 Schema::parse_str(
515 r#"{"type":"record", "name":"Record1", "fields":[{"name":"a", "type":"long"}]}"#,
516 )
517 .unwrap()
518 }
519
520 fn a_int_b_int_record1_schema() -> Schema {
521 Schema::parse_str(r#"{"type":"record", "name":"Record1", "fields":[{"name":"a", "type":"int"}, {"name":"b", "type":"int"}]}"#).unwrap()
522 }
523
524 fn a_dint_record1_schema() -> Schema {
525 Schema::parse_str(r#"{"type":"record", "name":"Record1", "fields":[{"name":"a", "type":"int", "default":0}]}"#).unwrap()
526 }
527
528 fn a_int_b_dint_record1_schema() -> Schema {
529 Schema::parse_str(r#"{"type":"record", "name":"Record1", "fields":[{"name":"a", "type":"int"}, {"name":"b", "type":"int", "default":0}]}"#).unwrap()
530 }
531
532 fn a_dint_b_dint_record1_schema() -> Schema {
533 Schema::parse_str(r#"{"type":"record", "name":"Record1", "fields":[{"name":"a", "type":"int", "default":0}, {"name":"b", "type":"int", "default":0}]}"#).unwrap()
534 }
535
536 fn nested_record() -> Schema {
537 Schema::parse_str(r#"{"type":"record","name":"parent","fields":[{"name":"attribute","type":{"type":"record","name":"child","fields":[{"name":"id","type":"string"}]}}]}"#).unwrap()
538 }
539
540 fn nested_optional_record() -> Schema {
541 Schema::parse_str(r#"{"type":"record","name":"parent","fields":[{"name":"attribute","type":["null",{"type":"record","name":"child","fields":[{"name":"id","type":"string"}]}],"default":null}]}"#).unwrap()
542 }
543
544 fn int_list_record_schema() -> Schema {
545 Schema::parse_str(r#"{"type":"record", "name":"List", "fields": [{"name": "head", "type": "int"},{"name": "tail", "type": "array", "items": "int"}]}"#).unwrap()
546 }
547
548 fn long_list_record_schema() -> Schema {
549 Schema::parse_str(
550 r#"
551 {
552 "type":"record", "name":"List", "fields": [
553 {"name": "head", "type": "long"},
554 {"name": "tail", "type": "array", "items": "long"}
555 ]}
556"#,
557 )
558 .unwrap()
559 }
560
561 fn union_schema(schemas: Vec<Schema>) -> Schema {
562 let schema_string = schemas
563 .iter()
564 .map(|s| s.canonical_form())
565 .collect::<Vec<String>>()
566 .join(",");
567 Schema::parse_str(&format!("[{schema_string}]")).unwrap()
568 }
569
570 fn empty_union_schema() -> Schema {
571 union_schema(vec![])
572 }
573
574 fn int_union_schema() -> Schema {
578 union_schema(vec![Schema::Int])
579 }
580
581 fn long_union_schema() -> Schema {
582 union_schema(vec![Schema::Long])
583 }
584
585 fn string_union_schema() -> Schema {
586 union_schema(vec![Schema::String])
587 }
588
589 fn int_string_union_schema() -> Schema {
590 union_schema(vec![Schema::Int, Schema::String])
591 }
592
593 fn string_int_union_schema() -> Schema {
594 union_schema(vec![Schema::String, Schema::Int])
595 }
596
597 #[test]
598 fn test_broken() {
599 assert_eq!(
600 Compatibility::Partial,
601 SchemaCompatibility::can_read(&int_string_union_schema(), &int_union_schema()).unwrap(),
602 "Only compatible if writer writes an int"
603 )
604 }
605
606 #[test]
607 fn test_incompatible_reader_writer_pairs() {
608 let incompatible_schemas = vec![
609 (Schema::Null, Schema::Int),
611 (Schema::Null, Schema::Long),
612 (Schema::Boolean, Schema::Int),
614 (Schema::Int, Schema::Null),
616 (Schema::Int, Schema::Boolean),
617 (Schema::Int, Schema::Long),
618 (Schema::Int, Schema::Float),
619 (Schema::Int, Schema::Double),
620 (Schema::Long, Schema::Float),
622 (Schema::Long, Schema::Double),
623 (Schema::Float, Schema::Double),
625 (Schema::String, Schema::Boolean),
627 (Schema::String, Schema::Int),
628 (Schema::Bytes, Schema::Null),
630 (Schema::Bytes, Schema::Int),
631 (Schema::TimeMicros, Schema::Int),
633 (Schema::TimestampMillis, Schema::Int),
634 (Schema::TimestampMicros, Schema::Int),
635 (Schema::TimestampNanos, Schema::Int),
636 (Schema::LocalTimestampMillis, Schema::Int),
637 (Schema::LocalTimestampMicros, Schema::Int),
638 (Schema::LocalTimestampNanos, Schema::Int),
639 (Schema::Date, Schema::Long),
640 (Schema::TimeMillis, Schema::Long),
641 (int_array_schema(), long_array_schema()),
643 (int_map_schema(), int_array_schema()),
644 (int_array_schema(), int_map_schema()),
645 (int_map_schema(), long_map_schema()),
646 (enum1_ab_schema(), enum1_abc_schema()),
648 (enum1_bc_schema(), enum1_abc_schema()),
649 (enum1_ab_schema(), enum2_ab_schema()),
650 (Schema::Int, enum2_ab_schema()),
651 (enum2_ab_schema(), Schema::Int),
652 (int_union_schema(), int_string_union_schema()),
654 (string_union_schema(), int_string_union_schema()),
655 (empty_record2_schema(), empty_record1_schema()),
657 (a_int_record1_schema(), empty_record1_schema()),
658 (a_int_b_dint_record1_schema(), empty_record1_schema()),
659 (int_list_record_schema(), long_list_record_schema()),
660 (nested_record(), nested_optional_record()),
661 ];
662
663 assert!(
664 incompatible_schemas
665 .iter()
666 .any(|(reader, writer)| SchemaCompatibility::can_read(writer, reader).is_err())
667 );
668 }
669
670 #[rstest]
671 #[case(
673 r#"{"type": "record", "name": "record_a", "fields": [{"type": "long", "name": "date"}]}"#,
674 r#"{"type": "record", "name": "record_a", "fields": [{"type": "long", "name": "date", "default": 18181}]}"#
675 )]
676 #[case(
678 r#"{"type": "fixed", "name": "EmployeeId", "size": 16}"#,
679 r#"{"type": "fixed", "name": "EmployeeId", "size": 16, "default": "u00ffffffffffffx"}"#
680 )]
681 #[case(
683 r#"{"type": "enum", "name":"Enum1", "symbols": ["A","B"]}"#,
684 r#"{"type": "enum", "name":"Enum1", "symbols": ["A","B", "C"], "default": "C"}"#
685 )]
686 #[case(
688 r#"{"type": "map", "values": "int"}"#,
689 r#"{"type": "map", "values": "long"}"#
690 )]
691 #[case(r#"{"type": "int"}"#, r#"{"type": "int", "logicalType": "date"}"#)]
693 #[case(
695 r#"{"type": "int"}"#,
696 r#"{"type": "int", "logicalType": "time-millis"}"#
697 )]
698 #[case(
700 r#"{"type": "long"}"#,
701 r#"{"type": "long", "logicalType": "time-micros"}"#
702 )]
703 #[case(
705 r#"{"type": "long"}"#,
706 r#"{"type": "long", "logicalType": "timestamp-nanos"}"#
707 )]
708 #[case(
710 r#"{"type": "long"}"#,
711 r#"{"type": "long", "logicalType": "timestamp-millis"}"#
712 )]
713 #[case(
715 r#"{"type": "long"}"#,
716 r#"{"type": "long", "logicalType": "timestamp-micros"}"#
717 )]
718 #[case(
720 r#"{"type": "long"}"#,
721 r#"{"type": "long", "logicalType": "local-timestamp-millis"}"#
722 )]
723 #[case(
725 r#"{"type": "long"}"#,
726 r#"{"type": "long", "logicalType": "local-timestamp-micros"}"#
727 )]
728 #[case(
730 r#"{"type": "long"}"#,
731 r#"{"type": "long", "logicalType": "local-timestamp-nanos"}"#
732 )]
733 #[case(
735 r#"{"type": "array", "items": "int"}"#,
736 r#"{"type": "array", "items": "long"}"#
737 )]
738 fn test_avro_3950_match_schemas_ok(
739 #[case] writer_schema_str: &str,
740 #[case] reader_schema_str: &str,
741 ) {
742 let writer_schema = Schema::parse_str(writer_schema_str).unwrap();
743 let reader_schema = Schema::parse_str(reader_schema_str).unwrap();
744
745 assert!(SchemaCompatibility::can_read(&writer_schema, &reader_schema).is_ok());
746 }
747
748 #[rstest]
749 #[case(
751 r#"{"type": "record", "name":"record_a", "fields": [{"type": "long", "name": "date"}]}"#,
752 r#"{"type": "record", "name":"record_b", "fields": [{"type": "long", "name": "date"}]}"#,
753 CompatibilityError::NameMismatch{writer_name: String::from("record_a"), reader_name: String::from("record_b")}
754 )]
755 #[case(
757 r#"{"type": "fixed", "name": "EmployeeId", "size": 16}"#,
758 r#"{"type": "fixed", "name": "EmployeeId", "size": 20}"#,
759 CompatibilityError::FixedMismatch
760 )]
761 #[case(
763 r#"{"type": "enum", "name": "Enum1", "symbols": ["A","B"]}"#,
764 r#"{"type": "enum", "name": "Enum2", "symbols": ["A","B"]}"#,
765 CompatibilityError::NameMismatch{writer_name: String::from("Enum1"), reader_name: String::from("Enum2")}
766 )]
767 #[case(
769 r#"{"type":"map", "values": "long"}"#,
770 r#"{"type":"map", "values": "int"}"#,
771 CompatibilityError::WrongType { writer_schema_type: "Long".to_string(), reader_schema_type: "Int".to_string() }
772 )]
773 #[case(
775 r#"{"type": "array", "items": "long"}"#,
776 r#"{"type": "array", "items": "int"}"#,
777 CompatibilityError::WrongType { writer_schema_type: "Long".to_string(), reader_schema_type: "Int".to_string() }
778 )]
779 #[case(
781 r#"{"type": "string"}"#,
782 r#"{"type": "int", "logicalType": "date"}"#,
783 CompatibilityError::WrongType { writer_schema_type: "String".to_string(), reader_schema_type: "Date".to_string() }
784 )]
785 #[case(
787 r#"{"type": "string"}"#,
788 r#"{"type": "int", "logicalType": "time-millis"}"#,
789 CompatibilityError::WrongType { writer_schema_type: "String".to_string(), reader_schema_type: "TimeMillis".to_string() }
790 )]
791 #[case(
793 r#"{"type": "string"}"#,
794 r#"{"type": "long", "logicalType": "time-micros"}"#,
795 CompatibilityError::WrongType { writer_schema_type: "String".to_string(), reader_schema_type: "TimeMicros".to_string() }
796 )]
797 #[case(
799 r#"{"type": "string"}"#,
800 r#"{"type": "long", "logicalType": "timestamp-nanos"}"#,
801 CompatibilityError::WrongType { writer_schema_type: "String".to_string(), reader_schema_type: "TimestampNanos".to_string() }
802 )]
803 #[case(
805 r#"{"type": "string"}"#,
806 r#"{"type": "long", "logicalType": "timestamp-millis"}"#,
807 CompatibilityError::WrongType { writer_schema_type: "String".to_string(), reader_schema_type: "TimestampMillis".to_string() }
808 )]
809 #[case(
811 r#"{"type": "string"}"#,
812 r#"{"type": "long", "logicalType": "timestamp-micros"}"#,
813 CompatibilityError::WrongType { writer_schema_type: "String".to_string(), reader_schema_type: "TimestampMicros".to_string() }
814 )]
815 #[case(
817 r#"{"type": "string"}"#,
818 r#"{"type": "long", "logicalType": "local-timestamp-millis"}"#,
819 CompatibilityError::WrongType { writer_schema_type: "String".to_string(), reader_schema_type: "LocalTimestampMillis".to_string() }
820 )]
821 #[case(
823 r#"{"type": "string"}"#,
824 r#"{"type": "long", "logicalType": "local-timestamp-micros"}"#,
825 CompatibilityError::WrongType { writer_schema_type: "String".to_string(), reader_schema_type: "LocalTimestampMicros".to_string() }
826 )]
827 #[case(
829 r#"{"type": "string"}"#,
830 r#"{"type": "long", "logicalType": "local-timestamp-nanos"}"#,
831 CompatibilityError::WrongType { writer_schema_type: "String".to_string(), reader_schema_type: "LocalTimestampNanos".to_string() }
832 )]
833 #[case(
835 r#"{"type": "record", "name":"record_b", "fields": [{"type": "long", "name": "date"}]}"#,
836 r#"{"type": "fixed", "name": "EmployeeId", "size": 16}"#,
837 CompatibilityError::NameMismatch { writer_name: "record_b".to_string(), reader_name: "EmployeeId".to_string() }
838 )]
839 fn test_avro_3950_match_schemas_error(
840 #[case] writer_schema_str: &str,
841 #[case] reader_schema_str: &str,
842 #[case] expected_error: CompatibilityError,
843 ) {
844 let writer_schema = Schema::parse_str(writer_schema_str).unwrap();
845 let reader_schema = Schema::parse_str(reader_schema_str).unwrap();
846
847 assert_eq!(
848 expected_error,
849 SchemaCompatibility::can_read(&writer_schema, &reader_schema).unwrap_err()
850 )
851 }
852
853 #[test]
854 fn test_compatible_reader_writer_pairs() {
855 let uuid_fixed = FixedSchema {
856 name: Name::new("uuid_fixed").unwrap(),
857 aliases: None,
858 doc: None,
859 size: 16,
860 attributes: Default::default(),
861 };
862
863 let compatible_schemas = vec![
864 (Schema::Null, Schema::Null),
865 (Schema::Long, Schema::Int),
866 (Schema::Float, Schema::Int),
867 (Schema::Float, Schema::Long),
868 (Schema::Double, Schema::Long),
869 (Schema::Double, Schema::Int),
870 (Schema::Double, Schema::Float),
871 (Schema::String, Schema::Bytes),
872 (Schema::Bytes, Schema::String),
873 (
875 Schema::Uuid(UuidSchema::String),
876 Schema::Uuid(UuidSchema::String),
877 ),
878 (Schema::Uuid(UuidSchema::String), Schema::String),
879 (Schema::String, Schema::Uuid(UuidSchema::String)),
880 (
881 Schema::Uuid(UuidSchema::Bytes),
882 Schema::Uuid(UuidSchema::Bytes),
883 ),
884 (Schema::Uuid(UuidSchema::Bytes), Schema::Bytes),
885 (Schema::Bytes, Schema::Uuid(UuidSchema::Bytes)),
886 (
887 Schema::Uuid(UuidSchema::Fixed(uuid_fixed.clone())),
888 Schema::Uuid(UuidSchema::Fixed(uuid_fixed.clone())),
889 ),
890 (
891 Schema::Uuid(UuidSchema::Fixed(uuid_fixed.clone())),
892 Schema::Fixed(uuid_fixed.clone()),
893 ),
894 (
895 Schema::Fixed(uuid_fixed.clone()),
896 Schema::Uuid(UuidSchema::Fixed(uuid_fixed.clone())),
897 ),
898 (Schema::Date, Schema::Int),
899 (Schema::TimeMillis, Schema::Int),
900 (Schema::TimeMicros, Schema::Long),
901 (Schema::TimestampMillis, Schema::Long),
902 (Schema::TimestampMicros, Schema::Long),
903 (Schema::TimestampNanos, Schema::Long),
904 (Schema::LocalTimestampMillis, Schema::Long),
905 (Schema::LocalTimestampMicros, Schema::Long),
906 (Schema::LocalTimestampNanos, Schema::Long),
907 (Schema::Int, Schema::Date),
908 (Schema::Int, Schema::TimeMillis),
909 (Schema::Long, Schema::TimeMicros),
910 (Schema::Long, Schema::TimestampMillis),
911 (Schema::Long, Schema::TimestampMicros),
912 (Schema::Long, Schema::TimestampNanos),
913 (Schema::Long, Schema::LocalTimestampMillis),
914 (Schema::Long, Schema::LocalTimestampMicros),
915 (Schema::Long, Schema::LocalTimestampNanos),
916 (int_array_schema(), int_array_schema()),
917 (long_array_schema(), int_array_schema()),
918 (int_map_schema(), int_map_schema()),
919 (long_map_schema(), int_map_schema()),
920 (enum1_ab_schema(), enum1_ab_schema()),
921 (enum1_abc_schema(), enum1_ab_schema()),
922 (empty_union_schema(), empty_union_schema()),
923 (int_union_schema(), int_union_schema()),
924 (int_string_union_schema(), string_int_union_schema()),
925 (int_union_schema(), empty_union_schema()),
926 (long_union_schema(), int_union_schema()),
927 (int_union_schema(), Schema::Int),
928 (Schema::Int, int_union_schema()),
929 (empty_record1_schema(), empty_record1_schema()),
930 (empty_record1_schema(), a_int_record1_schema()),
931 (a_int_record1_schema(), a_int_record1_schema()),
932 (a_dint_record1_schema(), a_int_record1_schema()),
933 (a_dint_record1_schema(), a_dint_record1_schema()),
934 (a_int_record1_schema(), a_dint_record1_schema()),
935 (a_long_record1_schema(), a_int_record1_schema()),
936 (a_int_record1_schema(), a_int_b_int_record1_schema()),
937 (a_dint_record1_schema(), a_int_b_int_record1_schema()),
938 (a_int_b_dint_record1_schema(), a_int_record1_schema()),
939 (a_dint_b_dint_record1_schema(), empty_record1_schema()),
940 (a_dint_b_dint_record1_schema(), a_int_record1_schema()),
941 (a_int_b_int_record1_schema(), a_dint_b_dint_record1_schema()),
942 (int_list_record_schema(), int_list_record_schema()),
943 (long_list_record_schema(), long_list_record_schema()),
944 (long_list_record_schema(), int_list_record_schema()),
945 (nested_optional_record(), nested_record()),
946 ];
947
948 for (reader, writer) in compatible_schemas {
949 SchemaCompatibility::can_read(&writer, &reader).unwrap();
950 }
951 }
952
953 fn writer_schema() -> Schema {
954 Schema::parse_str(
955 r#"
956 {"type":"record", "name":"Record", "fields":[
957 {"name":"oldfield1", "type":"int"},
958 {"name":"oldfield2", "type":"string"}
959 ]}
960"#,
961 )
962 .unwrap()
963 }
964
965 #[test]
966 fn test_missing_field() -> TestResult {
967 let reader_schema = Schema::parse_str(
968 r#"
969 {"type":"record", "name":"Record", "fields":[
970 {"name":"oldfield1", "type":"int"}
971 ]}
972"#,
973 )?;
974 assert!(SchemaCompatibility::can_read(&writer_schema(), &reader_schema,).is_ok());
975 assert_eq!(
976 CompatibilityError::MissingDefaultValue(String::from("oldfield2")),
977 SchemaCompatibility::can_read(&reader_schema, &writer_schema()).unwrap_err()
978 );
979
980 Ok(())
981 }
982
983 #[test]
984 fn test_missing_second_field() -> TestResult {
985 let reader_schema = Schema::parse_str(
986 r#"
987 {"type":"record", "name":"Record", "fields":[
988 {"name":"oldfield2", "type":"string"}
989 ]}
990"#,
991 )?;
992 assert!(SchemaCompatibility::can_read(&writer_schema(), &reader_schema).is_ok());
993 assert_eq!(
994 CompatibilityError::MissingDefaultValue(String::from("oldfield1")),
995 SchemaCompatibility::can_read(&reader_schema, &writer_schema()).unwrap_err()
996 );
997
998 Ok(())
999 }
1000
1001 #[test]
1002 fn test_all_fields() -> TestResult {
1003 let reader_schema = Schema::parse_str(
1004 r#"
1005 {"type":"record", "name":"Record", "fields":[
1006 {"name":"oldfield1", "type":"int"},
1007 {"name":"oldfield2", "type":"string"}
1008 ]}
1009"#,
1010 )?;
1011 assert!(SchemaCompatibility::can_read(&writer_schema(), &reader_schema).is_ok());
1012 assert!(SchemaCompatibility::can_read(&reader_schema, &writer_schema()).is_ok());
1013
1014 Ok(())
1015 }
1016
1017 #[test]
1018 fn test_new_field_with_default() -> TestResult {
1019 let reader_schema = Schema::parse_str(
1020 r#"
1021 {"type":"record", "name":"Record", "fields":[
1022 {"name":"oldfield1", "type":"int"},
1023 {"name":"newfield1", "type":"int", "default":42}
1024 ]}
1025"#,
1026 )?;
1027 assert!(SchemaCompatibility::can_read(&writer_schema(), &reader_schema).is_ok());
1028 assert_eq!(
1029 CompatibilityError::MissingDefaultValue(String::from("oldfield2")),
1030 SchemaCompatibility::can_read(&reader_schema, &writer_schema()).unwrap_err()
1031 );
1032
1033 Ok(())
1034 }
1035
1036 #[test]
1037 fn test_new_field() -> TestResult {
1038 let reader_schema = Schema::parse_str(
1039 r#"
1040 {"type":"record", "name":"Record", "fields":[
1041 {"name":"oldfield1", "type":"int"},
1042 {"name":"newfield1", "type":"int"}
1043 ]}
1044"#,
1045 )?;
1046 assert_eq!(
1047 CompatibilityError::MissingDefaultValue(String::from("newfield1")),
1048 SchemaCompatibility::can_read(&writer_schema(), &reader_schema).unwrap_err()
1049 );
1050 assert_eq!(
1051 CompatibilityError::MissingDefaultValue(String::from("oldfield2")),
1052 SchemaCompatibility::can_read(&reader_schema, &writer_schema()).unwrap_err()
1053 );
1054
1055 Ok(())
1056 }
1057
1058 #[test]
1059 fn test_array_writer_schema() {
1060 let valid_reader = string_array_schema();
1061 let invalid_reader = string_map_schema();
1062
1063 assert_eq!(
1064 Compatibility::Full,
1065 SchemaCompatibility::can_read(&string_array_schema(), &valid_reader).unwrap()
1066 );
1067 assert!(matches!(
1068 SchemaCompatibility::can_read(&string_array_schema(), &invalid_reader),
1069 Err(CompatibilityError::WrongType { .. }),
1070 ));
1071 }
1072
1073 #[test]
1074 fn test_primitive_writer_schema() {
1075 let valid_reader = Schema::String;
1076 assert!(SchemaCompatibility::can_read(&Schema::String, &valid_reader).is_ok());
1077 assert_eq!(
1078 CompatibilityError::WrongType {
1079 writer_schema_type: "Int".to_string(),
1080 reader_schema_type: "String".to_string()
1081 },
1082 SchemaCompatibility::can_read(&Schema::Int, &Schema::String).unwrap_err()
1083 );
1084 }
1085
1086 #[test]
1087 fn test_union_reader_writer_subset_incompatibility() {
1088 let union_writer = union_schema(vec![Schema::Int, Schema::String]);
1090 let union_reader = union_schema(vec![Schema::String]);
1091
1092 assert_eq!(
1093 Compatibility::Partial,
1094 SchemaCompatibility::can_read(&union_writer, &union_reader).unwrap()
1095 );
1096 assert_eq!(
1097 Compatibility::Full,
1098 SchemaCompatibility::can_read(&union_reader, &union_writer).unwrap()
1099 );
1100 }
1101
1102 #[test]
1103 fn test_incompatible_record_field() -> TestResult {
1104 let string_schema = Schema::parse_str(
1105 r#"
1106 {"type":"record", "name":"MyRecord", "namespace":"ns", "fields": [
1107 {"name":"field1", "type":"string"}
1108 ]}
1109 "#,
1110 )?;
1111
1112 let int_schema = Schema::parse_str(
1113 r#"
1114 {"type":"record", "name":"MyRecord", "namespace":"ns", "fields": [
1115 {"name":"field1", "type":"int"}
1116 ]}
1117 "#,
1118 )?;
1119
1120 assert_eq!(
1121 CompatibilityError::FieldTypeMismatch(
1122 "field1".to_owned(),
1123 Box::new(CompatibilityError::WrongType {
1124 writer_schema_type: "String".to_string(),
1125 reader_schema_type: "Int".to_string()
1126 })
1127 ),
1128 SchemaCompatibility::can_read(&string_schema, &int_schema).unwrap_err()
1129 );
1130
1131 Ok(())
1132 }
1133
1134 #[test]
1135 fn test_enum_symbols() -> TestResult {
1136 let enum_schema1 = Schema::parse_str(
1137 r#"
1138 {"type":"enum", "name":"MyEnum", "symbols":["A","B"]}
1139"#,
1140 )?;
1141 let enum_schema2 =
1142 Schema::parse_str(r#"{"type":"enum", "name":"MyEnum", "symbols":["A","B","C"]}"#)?;
1143 assert_eq!(
1144 Compatibility::Partial,
1145 SchemaCompatibility::can_read(&enum_schema2, &enum_schema1)?
1146 );
1147 assert_eq!(
1148 Compatibility::Full,
1149 SchemaCompatibility::can_read(&enum_schema1, &enum_schema2)?
1150 );
1151
1152 Ok(())
1153 }
1154
1155 fn point_2d_schema() -> Schema {
1156 Schema::parse_str(
1157 r#"
1158 {"type":"record", "name":"Point2D", "fields":[
1159 {"name":"x", "type":"double"},
1160 {"name":"y", "type":"double"}
1161 ]}
1162 "#,
1163 )
1164 .unwrap()
1165 }
1166
1167 fn point_2d_fullname_schema() -> Schema {
1168 Schema::parse_str(
1169 r#"
1170 {"type":"record", "name":"Point", "namespace":"written", "fields":[
1171 {"name":"x", "type":"double"},
1172 {"name":"y", "type":"double"}
1173 ]}
1174 "#,
1175 )
1176 .unwrap()
1177 }
1178
1179 fn point_3d_no_default_schema() -> Schema {
1180 Schema::parse_str(
1181 r#"
1182 {"type":"record", "name":"Point", "fields":[
1183 {"name":"x", "type":"double"},
1184 {"name":"y", "type":"double"},
1185 {"name":"z", "type":"double"}
1186 ]}
1187 "#,
1188 )
1189 .unwrap()
1190 }
1191
1192 fn point_3d_schema() -> Schema {
1193 Schema::parse_str(
1194 r#"
1195 {"type":"record", "name":"Point3D", "fields":[
1196 {"name":"x", "type":"double"},
1197 {"name":"y", "type":"double"},
1198 {"name":"z", "type":"double", "default": 0.0}
1199 ]}
1200 "#,
1201 )
1202 .unwrap()
1203 }
1204
1205 fn point_3d_match_name_schema() -> Schema {
1206 Schema::parse_str(
1207 r#"
1208 {"type":"record", "name":"Point", "fields":[
1209 {"name":"x", "type":"double"},
1210 {"name":"y", "type":"double"},
1211 {"name":"z", "type":"double", "default": 0.0}
1212 ]}
1213 "#,
1214 )
1215 .unwrap()
1216 }
1217
1218 #[test]
1219 fn test_union_resolution_no_structure_match() {
1220 let read_schema = union_schema(vec![Schema::Null, point_3d_no_default_schema()]);
1222 assert_eq!(
1223 CompatibilityError::SchemaMismatchAllUnionElements,
1224 SchemaCompatibility::can_read(&point_2d_fullname_schema(), &read_schema).unwrap_err()
1225 );
1226 }
1227
1228 #[test]
1229 fn test_union_resolution_first_structure_match_2d() {
1230 let read_schema = union_schema(vec![
1232 Schema::Null,
1233 point_3d_no_default_schema(),
1234 point_2d_schema(),
1235 point_3d_schema(),
1236 ]);
1237 assert_eq!(
1238 CompatibilityError::SchemaMismatchAllUnionElements,
1239 SchemaCompatibility::can_read(&point_2d_fullname_schema(), &read_schema).unwrap_err()
1240 );
1241 }
1242
1243 #[test]
1244 fn test_union_resolution_first_structure_match_3d() {
1245 let read_schema = union_schema(vec![
1247 Schema::Null,
1248 point_3d_no_default_schema(),
1249 point_3d_schema(),
1250 point_2d_schema(),
1251 ]);
1252 assert_eq!(
1253 CompatibilityError::SchemaMismatchAllUnionElements,
1254 SchemaCompatibility::can_read(&point_2d_fullname_schema(), &read_schema).unwrap_err()
1255 );
1256 }
1257
1258 #[test]
1259 fn test_union_resolution_named_structure_match() {
1260 let read_schema = union_schema(vec![
1262 Schema::Null,
1263 point_2d_schema(),
1264 point_3d_match_name_schema(),
1265 point_3d_schema(),
1266 ]);
1267 assert_eq!(
1268 CompatibilityError::SchemaMismatchAllUnionElements,
1269 SchemaCompatibility::can_read(&point_2d_fullname_schema(), &read_schema).unwrap_err()
1270 );
1271 }
1272
1273 #[test]
1274 fn test_union_resolution_full_name_match() {
1275 let read_schema = union_schema(vec![
1277 Schema::Null,
1278 point_2d_schema(),
1279 point_3d_match_name_schema(),
1280 point_3d_schema(),
1281 point_2d_fullname_schema(),
1282 ]);
1283 assert!(SchemaCompatibility::can_read(&point_2d_fullname_schema(), &read_schema).is_ok());
1284 }
1285
1286 #[test]
1287 fn test_avro_3772_enum_default() -> TestResult {
1288 let writer_raw_schema = r#"
1289 {
1290 "type": "record",
1291 "name": "test",
1292 "fields": [
1293 {"name": "a", "type": "long", "default": 42},
1294 {"name": "b", "type": "string"},
1295 {
1296 "name": "c",
1297 "type": {
1298 "type": "enum",
1299 "name": "suit",
1300 "symbols": ["diamonds", "spades", "clubs", "hearts"],
1301 "default": "spades"
1302 }
1303 }
1304 ]
1305 }
1306 "#;
1307
1308 let reader_raw_schema = r#"
1309 {
1310 "type": "record",
1311 "name": "test",
1312 "fields": [
1313 {"name": "a", "type": "long", "default": 42},
1314 {"name": "b", "type": "string"},
1315 {
1316 "name": "c",
1317 "type": {
1318 "type": "enum",
1319 "name": "suit",
1320 "symbols": ["diamonds", "spades", "ninja", "hearts"],
1321 "default": "spades"
1322 }
1323 }
1324 ]
1325 }
1326 "#;
1327 let writer_schema = Schema::parse_str(writer_raw_schema)?;
1328 let reader_schema = Schema::parse_str(reader_raw_schema)?;
1329 let mut writer = Writer::with_codec(&writer_schema, Vec::new(), Codec::Null)?;
1330 let mut record = Record::new(writer.schema()).unwrap();
1331 record.put("a", 27i64);
1332 record.put("b", "foo");
1333 record.put("c", "clubs");
1334 writer.append_value(record).unwrap();
1335 let input = writer.into_inner()?;
1336 let mut reader = Reader::builder(&input[..])
1337 .reader_schema(&reader_schema)
1338 .build()?;
1339 assert_eq!(
1340 reader.next().unwrap().unwrap(),
1341 Value::Record(vec![
1342 ("a".to_string(), Value::Long(27)),
1343 ("b".to_string(), Value::String("foo".to_string())),
1344 ("c".to_string(), Value::Enum(1, "spades".to_string())),
1345 ])
1346 );
1347 assert!(reader.next().is_none());
1348
1349 Ok(())
1350 }
1351
1352 #[test]
1353 fn test_avro_3772_enum_default_less_symbols() -> TestResult {
1354 let writer_raw_schema = r#"
1355 {
1356 "type": "record",
1357 "name": "test",
1358 "fields": [
1359 {"name": "a", "type": "long", "default": 42},
1360 {"name": "b", "type": "string"},
1361 {
1362 "name": "c",
1363 "type": {
1364 "type": "enum",
1365 "name": "suit",
1366 "symbols": ["diamonds", "spades", "clubs", "hearts"],
1367 "default": "spades"
1368 }
1369 }
1370 ]
1371 }
1372 "#;
1373
1374 let reader_raw_schema = r#"
1375 {
1376 "type": "record",
1377 "name": "test",
1378 "fields": [
1379 {"name": "a", "type": "long", "default": 42},
1380 {"name": "b", "type": "string"},
1381 {
1382 "name": "c",
1383 "type": {
1384 "type": "enum",
1385 "name": "suit",
1386 "symbols": ["hearts", "spades"],
1387 "default": "spades"
1388 }
1389 }
1390 ]
1391 }
1392 "#;
1393 let writer_schema = Schema::parse_str(writer_raw_schema)?;
1394 let reader_schema = Schema::parse_str(reader_raw_schema)?;
1395 let mut writer = Writer::with_codec(&writer_schema, Vec::new(), Codec::Null)?;
1396 let mut record = Record::new(writer.schema()).unwrap();
1397 record.put("a", 27i64);
1398 record.put("b", "foo");
1399 record.put("c", "hearts");
1400 writer.append_value(record).unwrap();
1401 let input = writer.into_inner()?;
1402 let mut reader = Reader::builder(&input[..])
1403 .reader_schema(&reader_schema)
1404 .build()?;
1405 assert_eq!(
1406 reader.next().unwrap().unwrap(),
1407 Value::Record(vec![
1408 ("a".to_string(), Value::Long(27)),
1409 ("b".to_string(), Value::String("foo".to_string())),
1410 ("c".to_string(), Value::Enum(0, "hearts".to_string())),
1411 ])
1412 );
1413 assert!(reader.next().is_none());
1414
1415 Ok(())
1416 }
1417
1418 #[test]
1419 fn avro_3894_take_aliases_into_account_when_serializing_for_schema_compatibility() -> TestResult
1420 {
1421 let schema_v1 = Schema::parse_str(
1422 r#"
1423 {
1424 "type": "record",
1425 "name": "Conference",
1426 "namespace": "advdaba",
1427 "fields": [
1428 {"type": "string", "name": "name"},
1429 {"type": "long", "name": "date"}
1430 ]
1431 }"#,
1432 )?;
1433
1434 let schema_v2 = Schema::parse_str(
1435 r#"
1436 {
1437 "type": "record",
1438 "name": "Conference",
1439 "namespace": "advdaba",
1440 "fields": [
1441 {"type": "string", "name": "name"},
1442 {"type": "long", "name": "date", "aliases" : [ "time" ]}
1443 ]
1444 }"#,
1445 )?;
1446
1447 assert!(SchemaCompatibility::mutual_read(&schema_v1, &schema_v2).is_ok());
1448
1449 Ok(())
1450 }
1451
1452 #[test]
1453 fn avro_3917_take_aliases_into_account_for_schema_compatibility() -> TestResult {
1454 let schema_v1 = Schema::parse_str(
1455 r#"
1456 {
1457 "type": "record",
1458 "name": "Conference",
1459 "namespace": "advdaba",
1460 "fields": [
1461 {"type": "string", "name": "name"},
1462 {"type": "long", "name": "date", "aliases" : [ "time" ]}
1463 ]
1464 }"#,
1465 )?;
1466
1467 let schema_v2 = Schema::parse_str(
1468 r#"
1469 {
1470 "type": "record",
1471 "name": "Conference",
1472 "namespace": "advdaba",
1473 "fields": [
1474 {"type": "string", "name": "name"},
1475 {"type": "long", "name": "time"}
1476 ]
1477 }"#,
1478 )?;
1479
1480 assert_eq!(
1481 Compatibility::Full,
1482 SchemaCompatibility::can_read(&schema_v2, &schema_v1)?
1483 );
1484 assert_eq!(
1485 CompatibilityError::MissingDefaultValue(String::from("time")),
1486 SchemaCompatibility::can_read(&schema_v1, &schema_v2).unwrap_err()
1487 );
1488
1489 Ok(())
1490 }
1491
1492 #[test]
1493 fn test_avro_3898_record_schemas_match_by_unqualified_name() -> TestResult {
1494 let schemas = [
1495 (
1497 Schema::parse_str(
1498 r#"{
1499 "type": "record",
1500 "name": "Statistics",
1501 "fields": [
1502 { "name": "success", "type": "int" },
1503 { "name": "fail", "type": "int" },
1504 { "name": "time", "type": "string" },
1505 { "name": "max", "type": "int", "default": 0 }
1506 ]
1507 }"#,
1508 )?,
1509 Schema::parse_str(
1510 r#"{
1511 "type": "record",
1512 "name": "Statistics",
1513 "namespace": "my.namespace",
1514 "fields": [
1515 { "name": "success", "type": "int" },
1516 { "name": "fail", "type": "int" },
1517 { "name": "time", "type": "string" },
1518 { "name": "average", "type": "int", "default": 0}
1519 ]
1520 }"#,
1521 )?,
1522 ),
1523 (
1525 Schema::parse_str(
1526 r#"{
1527 "type": "enum",
1528 "name": "Suit",
1529 "symbols": ["diamonds", "spades", "clubs"]
1530 }"#,
1531 )?,
1532 Schema::parse_str(
1533 r#"{
1534 "type": "enum",
1535 "name": "Suit",
1536 "namespace": "my.namespace",
1537 "symbols": ["diamonds", "spades", "clubs", "hearts"]
1538 }"#,
1539 )?,
1540 ),
1541 (
1543 Schema::parse_str(
1544 r#"{
1545 "type": "fixed",
1546 "name": "EmployeeId",
1547 "size": 16
1548 }"#,
1549 )?,
1550 Schema::parse_str(
1551 r#"{
1552 "type": "fixed",
1553 "name": "EmployeeId",
1554 "namespace": "my.namespace",
1555 "size": 16
1556 }"#,
1557 )?,
1558 ),
1559 ];
1560
1561 for (schema_1, schema_2) in schemas {
1562 assert!(SchemaCompatibility::can_read(&schema_1, &schema_2).is_ok());
1563 }
1564
1565 Ok(())
1566 }
1567
1568 #[test]
1569 fn test_can_read_compatibility_errors() -> TestResult {
1570 let schemas = [
1571 (
1572 Schema::parse_str(
1573 r#"{
1574 "type": "record",
1575 "name": "StatisticsMap",
1576 "fields": [
1577 {"name": "average", "type": "int", "default": 0},
1578 {"name": "success", "type": {"type": "map", "values": "int"}}
1579 ]
1580 }"#,
1581 )?,
1582 Schema::parse_str(
1583 r#"{
1584 "type": "record",
1585 "name": "StatisticsMap",
1586 "fields": [
1587 {"name": "average", "type": "int", "default": 0},
1588 {"name": "success", "type": ["null", {"type": "map", "values": "int"}], "default": null}
1589 ]
1590 }"#,
1591 )?,
1592 ),
1593 (
1594 Schema::parse_str(
1595 r#"{
1596 "type": "record",
1597 "name": "StatisticsArray",
1598 "fields": [
1599 {"name": "max_values", "type": {"type": "array", "items": "int"}}
1600 ]
1601 }"#,
1602 )?,
1603 Schema::parse_str(
1604 r#"{
1605 "type": "record",
1606 "name": "StatisticsArray",
1607 "fields": [
1608 {"name": "max_values", "type": ["null", {"type": "array", "items": "int"}], "default": null}
1609 ]
1610 }"#,
1611 )?,
1612 ),
1613 ];
1614
1615 for (schema_1, schema_2) in schemas {
1616 assert_eq!(
1617 Compatibility::Full,
1618 SchemaCompatibility::can_read(&schema_1, &schema_2).unwrap()
1619 );
1620 assert_eq!(
1621 Compatibility::Partial,
1622 SchemaCompatibility::can_read(&schema_2, &schema_1).unwrap()
1623 );
1624 }
1625
1626 Ok(())
1627 }
1628
1629 #[test]
1630 fn avro_3974_can_read_schema_references() -> TestResult {
1631 let schema_strs = vec![
1632 r#"{
1633 "type": "record",
1634 "name": "Child",
1635 "namespace": "avro",
1636 "fields": [
1637 {
1638 "name": "val",
1639 "type": "int"
1640 }
1641 ]
1642 }
1643 "#,
1644 r#"{
1645 "type": "record",
1646 "name": "Parent",
1647 "namespace": "avro",
1648 "fields": [
1649 {
1650 "name": "child",
1651 "type": "avro.Child"
1652 }
1653 ]
1654 }
1655 "#,
1656 ];
1657
1658 let schemas = Schema::parse_list(schema_strs).unwrap();
1659 SchemaCompatibility::can_read(&schemas[1], &schemas[1])?;
1660
1661 Ok(())
1662 }
1663
1664 #[test]
1665 fn duration_and_fixed_of_different_size() -> TestResult {
1666 let schema_strs = vec![
1667 r#"{
1668 "type": "fixed",
1669 "name": "Fixed25",
1670 "size": 25
1671 }
1672 "#,
1673 r#"{
1674 "type": "fixed",
1675 "logicalType": "duration",
1676 "name": "Duration",
1677 "size": 12
1678 }
1679 "#,
1680 ];
1681
1682 let schemas = Schema::parse_list(schema_strs).unwrap();
1683 assert!(SchemaCompatibility::can_read(&schemas[0], &schemas[1]).is_err());
1684 assert!(SchemaCompatibility::can_read(&schemas[1], &schemas[0]).is_err());
1685 SchemaCompatibility::can_read(&schemas[1], &schemas[1])?;
1686 SchemaCompatibility::can_read(&schemas[0], &schemas[0])?;
1687
1688 Ok(())
1689 }
1690
1691 #[test]
1692 fn avro_rs_342_decimal_fixed_and_bytes() -> TestResult {
1693 let bytes = Schema::Decimal(DecimalSchema {
1694 precision: 20,
1695 scale: 0,
1696 inner: InnerDecimalSchema::Bytes,
1697 });
1698 let fixed = Schema::Decimal(DecimalSchema {
1699 precision: 20,
1700 scale: 0,
1701 inner: InnerDecimalSchema::Fixed(FixedSchema {
1702 name: Name::new("DecimalFixed")?,
1703 aliases: None,
1704 doc: None,
1705 size: 20,
1706 attributes: BTreeMap::default(),
1707 }),
1708 });
1709
1710 assert_eq!(
1711 Compatibility::Full,
1712 SchemaCompatibility::mutual_read(&bytes, &fixed)?
1713 );
1714
1715 let value = Value::Decimal(Decimal::from(vec![1; 10]));
1716 let fixed_value = value.clone().resolve(&fixed)?;
1717 let bytes_value = value.resolve(&bytes)?;
1718
1719 assert_eq!(fixed_value, bytes_value);
1720
1721 Ok(())
1722 }
1723}