1use crate::schema::UuidSchema;
20use crate::{
21 error::CompatibilityError,
22 schema::{EnumSchema, FixedSchema, RecordSchema, Schema, SchemaKind},
23};
24use std::{
25 collections::{HashSet, hash_map::DefaultHasher},
26 hash::Hasher,
27 ptr,
28};
29
30fn match_ref_schemas(
31 writers_schema: &Schema,
32 readers_schema: &Schema,
33) -> Result<(), CompatibilityError> {
34 match (readers_schema, writers_schema) {
35 (Schema::Ref { name: r_name }, Schema::Ref { name: w_name }) => {
36 if r_name == w_name {
37 Ok(())
38 } else {
39 Err(CompatibilityError::NameMismatch {
40 writer_name: w_name.fullname(None),
41 reader_name: r_name.fullname(None),
42 })
43 }
44 }
45 _ => Err(CompatibilityError::WrongType {
46 writer_schema_type: format!("{writers_schema:#?}"),
47 reader_schema_type: format!("{readers_schema:#?}"),
48 }),
49 }
50}
51
52pub struct SchemaCompatibility;
53
54struct Checker {
55 recursion: HashSet<(u64, u64)>,
56}
57
58impl Checker {
59 pub(crate) fn new() -> Self {
61 Self {
62 recursion: HashSet::new(),
63 }
64 }
65
66 pub(crate) fn can_read(
67 &mut self,
68 writers_schema: &Schema,
69 readers_schema: &Schema,
70 ) -> Result<(), CompatibilityError> {
71 self.full_match_schemas(writers_schema, readers_schema)
72 }
73
74 pub(crate) fn full_match_schemas(
75 &mut self,
76 writers_schema: &Schema,
77 readers_schema: &Schema,
78 ) -> Result<(), CompatibilityError> {
79 if self.recursion_in_progress(writers_schema, readers_schema) {
80 return Ok(());
81 }
82
83 SchemaCompatibility::match_schemas(writers_schema, readers_schema)?;
84
85 let w_type = SchemaKind::from(writers_schema);
86 let r_type = SchemaKind::from(readers_schema);
87
88 if w_type != SchemaKind::Union
89 && (r_type.is_primitive()
90 || r_type == SchemaKind::Fixed
91 || r_type == SchemaKind::Uuid
92 || r_type == SchemaKind::Date
93 || r_type == SchemaKind::TimeMillis
94 || r_type == SchemaKind::TimeMicros
95 || r_type == SchemaKind::TimestampMillis
96 || r_type == SchemaKind::TimestampMicros
97 || r_type == SchemaKind::TimestampNanos
98 || r_type == SchemaKind::LocalTimestampMillis
99 || r_type == SchemaKind::LocalTimestampMicros
100 || r_type == SchemaKind::LocalTimestampNanos)
101 {
102 return Ok(());
103 }
104
105 match r_type {
106 SchemaKind::Ref => match_ref_schemas(writers_schema, readers_schema),
107 SchemaKind::Record => self.match_record_schemas(writers_schema, readers_schema),
108 SchemaKind::Map => {
109 if let Schema::Map(w_m) = writers_schema {
110 match readers_schema {
111 Schema::Map(r_m) => self.full_match_schemas(&w_m.types, &r_m.types),
112 _ => Err(CompatibilityError::WrongType {
113 writer_schema_type: format!("{writers_schema:#?}"),
114 reader_schema_type: format!("{readers_schema:#?}"),
115 }),
116 }
117 } else {
118 Err(CompatibilityError::TypeExpected {
119 schema_type: String::from("writers_schema"),
120 expected_type: vec![SchemaKind::Record],
121 })
122 }
123 }
124 SchemaKind::Array => {
125 if let Schema::Array(w_a) = writers_schema {
126 match readers_schema {
127 Schema::Array(r_a) => self.full_match_schemas(&w_a.items, &r_a.items),
128 _ => Err(CompatibilityError::WrongType {
129 writer_schema_type: format!("{writers_schema:#?}"),
130 reader_schema_type: format!("{readers_schema:#?}"),
131 }),
132 }
133 } else {
134 Err(CompatibilityError::TypeExpected {
135 schema_type: String::from("writers_schema"),
136 expected_type: vec![SchemaKind::Array],
137 })
138 }
139 }
140 SchemaKind::Union => self.match_union_schemas(writers_schema, readers_schema),
141 SchemaKind::Enum => {
142 if let Schema::Enum(EnumSchema {
144 symbols: w_symbols, ..
145 }) = writers_schema
146 {
147 if let Schema::Enum(EnumSchema {
148 symbols: r_symbols, ..
149 }) = readers_schema
150 {
151 if w_symbols.iter().all(|e| r_symbols.contains(e)) {
152 return Ok(());
153 }
154 }
155 }
156 Err(CompatibilityError::MissingSymbols)
157 }
158 _ => {
159 if w_type == SchemaKind::Union {
160 if let Schema::Union(r) = writers_schema {
161 if r.schemas.len() == 1 {
162 return self.full_match_schemas(&r.schemas[0], readers_schema);
163 }
164 }
165 }
166 Err(CompatibilityError::Inconclusive(String::from(
167 "writers_schema",
168 )))
169 }
170 }
171 }
172
173 fn match_record_schemas(
174 &mut self,
175 writers_schema: &Schema,
176 readers_schema: &Schema,
177 ) -> Result<(), CompatibilityError> {
178 let w_type = SchemaKind::from(writers_schema);
179
180 if w_type == SchemaKind::Union {
181 return Err(CompatibilityError::TypeExpected {
182 schema_type: String::from("writers_schema"),
183 expected_type: vec![SchemaKind::Record],
184 });
185 }
186
187 if let Schema::Record(RecordSchema {
188 fields: w_fields,
189 lookup: w_lookup,
190 ..
191 }) = writers_schema
192 {
193 if let Schema::Record(RecordSchema {
194 fields: r_fields, ..
195 }) = readers_schema
196 {
197 for field in r_fields.iter() {
198 let mut fields_names = vec![&field.name];
200 if let Some(ref aliases) = field.aliases {
201 for alias in aliases {
202 fields_names.push(alias);
203 }
204 }
205
206 let position = fields_names.iter().find_map(|field_name| {
210 if let Some(pos) = w_lookup.get(*field_name) {
211 if &w_fields[*pos].name == *field_name {
212 return Some(pos);
213 }
214 }
215 None
216 });
217
218 match position {
219 Some(pos) => {
220 if let Err(err) =
221 self.full_match_schemas(&w_fields[*pos].schema, &field.schema)
222 {
223 return Err(CompatibilityError::FieldTypeMismatch(
224 field.name.clone(),
225 Box::new(err),
226 ));
227 }
228 }
229 _ => {
230 if field.default.is_none() {
231 return Err(CompatibilityError::MissingDefaultValue(
232 field.name.clone(),
233 ));
234 }
235 }
236 }
237 }
238 }
239 }
240 Ok(())
241 }
242
243 fn match_union_schemas(
244 &mut self,
245 writers_schema: &Schema,
246 readers_schema: &Schema,
247 ) -> Result<(), CompatibilityError> {
248 if let Schema::Union(u) = writers_schema {
249 if u.schemas
250 .iter()
251 .all(|schema| self.full_match_schemas(schema, readers_schema).is_ok())
252 {
253 return Ok(());
254 } else {
255 return Err(CompatibilityError::MissingUnionElements);
256 }
257 } else if let Schema::Union(u) = readers_schema {
258 if u.schemas
262 .iter()
263 .any(|schema| self.full_match_schemas(writers_schema, schema).is_ok())
264 {
265 return Ok(());
266 }
267 }
268 Err(CompatibilityError::MissingUnionElements)
269 }
270
271 fn recursion_in_progress(&mut self, writers_schema: &Schema, readers_schema: &Schema) -> bool {
272 let mut hasher = DefaultHasher::new();
273 ptr::hash(writers_schema, &mut hasher);
274 let w_hash = hasher.finish();
275
276 hasher = DefaultHasher::new();
277 ptr::hash(readers_schema, &mut hasher);
278 let r_hash = hasher.finish();
279
280 let key = (w_hash, r_hash);
281 !self.recursion.insert(key)
284 }
285}
286
287impl SchemaCompatibility {
288 pub fn can_read(
291 writers_schema: &Schema,
292 readers_schema: &Schema,
293 ) -> Result<(), CompatibilityError> {
294 let mut c = Checker::new();
295 c.can_read(writers_schema, readers_schema)
296 }
297
298 pub fn mutual_read(
301 writers_schema: &Schema,
302 readers_schema: &Schema,
303 ) -> Result<(), CompatibilityError> {
304 SchemaCompatibility::can_read(writers_schema, readers_schema)?;
305 SchemaCompatibility::can_read(readers_schema, writers_schema)
306 }
307
308 pub(crate) fn match_schemas(
314 writers_schema: &Schema,
315 readers_schema: &Schema,
316 ) -> Result<(), CompatibilityError> {
317 fn check_reader_type_multi(
318 reader_type: SchemaKind,
319 allowed_reader_types: Vec<SchemaKind>,
320 writer_type: SchemaKind,
321 ) -> Result<(), CompatibilityError> {
322 if allowed_reader_types.contains(&reader_type) {
323 Ok(())
324 } else {
325 let mut allowed_types: Vec<SchemaKind> = vec![writer_type];
326 allowed_types.extend_from_slice(allowed_reader_types.as_slice());
327 Err(CompatibilityError::TypeExpected {
328 schema_type: String::from("readers_schema"),
329 expected_type: allowed_types,
330 })
331 }
332 }
333
334 fn check_reader_type(
335 reader_type: SchemaKind,
336 allowed_reader_type: SchemaKind,
337 writer_type: SchemaKind,
338 ) -> Result<(), CompatibilityError> {
339 if reader_type == allowed_reader_type {
340 Ok(())
341 } else {
342 Err(CompatibilityError::TypeExpected {
343 schema_type: String::from("readers_schema"),
344 expected_type: vec![writer_type, allowed_reader_type],
345 })
346 }
347 }
348
349 fn check_writer_type(
350 writers_schema: &Schema,
351 allowed_schema: &Schema,
352 expected_schema_types: Vec<SchemaKind>,
353 ) -> Result<(), CompatibilityError> {
354 if *allowed_schema == *writers_schema {
355 Ok(())
356 } else {
357 Err(CompatibilityError::TypeExpected {
358 schema_type: String::from("writers_schema"),
359 expected_type: expected_schema_types,
360 })
361 }
362 }
363
364 let w_type = SchemaKind::from(writers_schema);
365 let r_type = SchemaKind::from(readers_schema);
366
367 if w_type == SchemaKind::Union || r_type == SchemaKind::Union {
368 return Ok(());
369 }
370
371 if w_type == r_type {
372 if r_type.is_primitive() {
373 return Ok(());
374 }
375
376 match r_type {
377 SchemaKind::Record | SchemaKind::Enum => {
378 let msg = format!("A {readers_schema} type must always has a name");
379 let writers_name = writers_schema.name().expect(&msg);
380 let readers_name = readers_schema.name().expect(&msg);
381
382 if writers_name.name == readers_name.name {
383 return Ok(());
384 }
385
386 return Err(CompatibilityError::NameMismatch {
387 writer_name: writers_name.name.clone(),
388 reader_name: readers_name.name.clone(),
389 });
390 }
391 SchemaKind::Fixed => {
392 if let Schema::Fixed(FixedSchema {
393 name: w_name,
394 aliases: _,
395 doc: _w_doc,
396 size: w_size,
397 default: _w_default,
398 attributes: _,
399 }) = writers_schema
400 {
401 if let Schema::Fixed(FixedSchema {
402 name: r_name,
403 aliases: _,
404 doc: _r_doc,
405 size: r_size,
406 default: _r_default,
407 attributes: _,
408 }) = readers_schema
409 {
410 return (w_name.name == r_name.name && w_size == r_size)
411 .then_some(())
412 .ok_or(CompatibilityError::FixedMismatch);
413 }
414 }
415 }
416 SchemaKind::Map => {
417 if let Schema::Map(w_m) = writers_schema {
418 if let Schema::Map(r_m) = readers_schema {
419 return SchemaCompatibility::match_schemas(&w_m.types, &r_m.types);
420 }
421 }
422 }
423 SchemaKind::Array => {
424 if let Schema::Array(w_a) = writers_schema {
425 if let Schema::Array(r_a) = readers_schema {
426 return SchemaCompatibility::match_schemas(&w_a.items, &r_a.items);
427 }
428 }
429 }
430 SchemaKind::Uuid => match readers_schema {
431 Schema::Uuid(UuidSchema::Bytes) => {
432 return check_writer_type(
433 writers_schema,
434 readers_schema,
435 vec![r_type, SchemaKind::Bytes],
436 );
437 }
438 Schema::Uuid(UuidSchema::String) => {
439 return check_writer_type(
440 writers_schema,
441 readers_schema,
442 vec![r_type, SchemaKind::String],
443 );
444 }
445 Schema::Uuid(UuidSchema::Fixed(FixedSchema {
446 name: r_name,
447 size: r_size,
448 ..
449 })) => match writers_schema {
450 Schema::Uuid(UuidSchema::Fixed(FixedSchema {
451 name: w_name,
452 size: w_size,
453 ..
454 }))
455 | Schema::Fixed(FixedSchema {
456 name: w_name,
457 size: w_size,
458 ..
459 }) => {
460 return (w_name.name == r_name.name && w_size == r_size)
461 .then_some(())
462 .ok_or(CompatibilityError::FixedMismatch);
463 }
464 _ => {
465 return Err(CompatibilityError::TypeExpected {
466 schema_type: String::from("writers_schema"),
467 expected_type: vec![SchemaKind::Uuid, SchemaKind::Fixed],
468 });
469 }
470 },
471 Schema::Null
472 | Schema::Boolean
473 | Schema::Int
474 | Schema::Long
475 | Schema::Float
476 | Schema::Double
477 | Schema::Bytes
478 | Schema::String
479 | Schema::Array(_)
480 | Schema::Map(_)
481 | Schema::Union(_)
482 | Schema::Record(_)
483 | Schema::Enum(_)
484 | Schema::Fixed(_)
485 | Schema::Decimal(_)
486 | Schema::BigDecimal
487 | Schema::Date
488 | Schema::TimeMillis
489 | Schema::TimeMicros
490 | Schema::TimestampMillis
491 | Schema::TimestampMicros
492 | Schema::TimestampNanos
493 | Schema::LocalTimestampMillis
494 | Schema::LocalTimestampMicros
495 | Schema::LocalTimestampNanos
496 | Schema::Duration
497 | Schema::Ref { .. } => {
498 unreachable!("SchemaKind::Uuid can only be a Schema::Uuid")
499 }
500 },
501 SchemaKind::Date | SchemaKind::TimeMillis => {
502 return check_writer_type(
503 writers_schema,
504 readers_schema,
505 vec![r_type, SchemaKind::Int],
506 );
507 }
508 SchemaKind::TimeMicros
509 | SchemaKind::TimestampNanos
510 | SchemaKind::TimestampMillis
511 | SchemaKind::TimestampMicros
512 | SchemaKind::LocalTimestampMillis
513 | SchemaKind::LocalTimestampMicros
514 | SchemaKind::LocalTimestampNanos => {
515 return check_writer_type(
516 writers_schema,
517 readers_schema,
518 vec![r_type, SchemaKind::Long],
519 );
520 }
521 SchemaKind::Duration => {
522 return Ok(());
523 }
524 SchemaKind::Ref => return match_ref_schemas(writers_schema, readers_schema),
525 _ => {
526 return Err(CompatibilityError::Inconclusive(String::from(
527 "readers_schema",
528 )));
529 }
530 };
531 }
532
533 match w_type {
535 SchemaKind::Int => check_reader_type_multi(
536 r_type,
537 vec![
538 SchemaKind::Long,
539 SchemaKind::Float,
540 SchemaKind::Double,
541 SchemaKind::Date,
542 SchemaKind::TimeMillis,
543 ],
544 w_type,
545 ),
546 SchemaKind::Long => check_reader_type_multi(
547 r_type,
548 vec![
549 SchemaKind::Float,
550 SchemaKind::Double,
551 SchemaKind::TimeMicros,
552 SchemaKind::TimestampMillis,
553 SchemaKind::TimestampMicros,
554 SchemaKind::TimestampNanos,
555 SchemaKind::LocalTimestampMillis,
556 SchemaKind::LocalTimestampMicros,
557 SchemaKind::LocalTimestampNanos,
558 ],
559 w_type,
560 ),
561 SchemaKind::Float => {
562 check_reader_type_multi(r_type, vec![SchemaKind::Float, SchemaKind::Double], w_type)
563 }
564 SchemaKind::String => {
565 check_reader_type_multi(r_type, vec![SchemaKind::Bytes, SchemaKind::Uuid], w_type)
566 }
567 SchemaKind::Bytes => {
568 check_reader_type_multi(r_type, vec![SchemaKind::String, SchemaKind::Uuid], w_type)
569 }
570 SchemaKind::Uuid => check_reader_type_multi(
571 r_type,
572 vec![SchemaKind::String, SchemaKind::Bytes, SchemaKind::Fixed],
573 w_type,
574 ),
575 SchemaKind::Fixed => check_reader_type_multi(
576 r_type,
577 vec![SchemaKind::Duration, SchemaKind::Uuid],
578 w_type,
579 ),
580 SchemaKind::Date | SchemaKind::TimeMillis => {
581 check_reader_type(r_type, SchemaKind::Int, w_type)
582 }
583 SchemaKind::TimeMicros
584 | SchemaKind::TimestampMicros
585 | SchemaKind::TimestampMillis
586 | SchemaKind::TimestampNanos
587 | SchemaKind::LocalTimestampMillis
588 | SchemaKind::LocalTimestampMicros
589 | SchemaKind::LocalTimestampNanos => {
590 check_reader_type(r_type, SchemaKind::Long, w_type)
591 }
592 _ => Err(CompatibilityError::Inconclusive(String::from(
593 "writers_schema",
594 ))),
595 }
596 }
597}
598
599#[cfg(test)]
600mod tests {
601 use super::*;
602 use crate::schema::{Name, UuidSchema};
603 use crate::{
604 Codec, Reader, Writer,
605 types::{Record, Value},
606 };
607 use apache_avro_test_helper::TestResult;
608 use rstest::*;
609
610 fn int_array_schema() -> Schema {
611 Schema::parse_str(r#"{"type":"array", "items":"int"}"#).unwrap()
612 }
613
614 fn long_array_schema() -> Schema {
615 Schema::parse_str(r#"{"type":"array", "items":"long"}"#).unwrap()
616 }
617
618 fn string_array_schema() -> Schema {
619 Schema::parse_str(r#"{"type":"array", "items":"string"}"#).unwrap()
620 }
621
622 fn int_map_schema() -> Schema {
623 Schema::parse_str(r#"{"type":"map", "values":"int"}"#).unwrap()
624 }
625
626 fn long_map_schema() -> Schema {
627 Schema::parse_str(r#"{"type":"map", "values":"long"}"#).unwrap()
628 }
629
630 fn string_map_schema() -> Schema {
631 Schema::parse_str(r#"{"type":"map", "values":"string"}"#).unwrap()
632 }
633
634 fn enum1_ab_schema() -> Schema {
635 Schema::parse_str(r#"{"type":"enum", "name":"Enum1", "symbols":["A","B"]}"#).unwrap()
636 }
637
638 fn enum1_abc_schema() -> Schema {
639 Schema::parse_str(r#"{"type":"enum", "name":"Enum1", "symbols":["A","B","C"]}"#).unwrap()
640 }
641
642 fn enum1_bc_schema() -> Schema {
643 Schema::parse_str(r#"{"type":"enum", "name":"Enum1", "symbols":["B","C"]}"#).unwrap()
644 }
645
646 fn enum2_ab_schema() -> Schema {
647 Schema::parse_str(r#"{"type":"enum", "name":"Enum2", "symbols":["A","B"]}"#).unwrap()
648 }
649
650 fn empty_record1_schema() -> Schema {
651 Schema::parse_str(r#"{"type":"record", "name":"Record1", "fields":[]}"#).unwrap()
652 }
653
654 fn empty_record2_schema() -> Schema {
655 Schema::parse_str(r#"{"type":"record", "name":"Record2", "fields": []}"#).unwrap()
656 }
657
658 fn a_int_record1_schema() -> Schema {
659 Schema::parse_str(
660 r#"{"type":"record", "name":"Record1", "fields":[{"name":"a", "type":"int"}]}"#,
661 )
662 .unwrap()
663 }
664
665 fn a_long_record1_schema() -> Schema {
666 Schema::parse_str(
667 r#"{"type":"record", "name":"Record1", "fields":[{"name":"a", "type":"long"}]}"#,
668 )
669 .unwrap()
670 }
671
672 fn a_int_b_int_record1_schema() -> Schema {
673 Schema::parse_str(r#"{"type":"record", "name":"Record1", "fields":[{"name":"a", "type":"int"}, {"name":"b", "type":"int"}]}"#).unwrap()
674 }
675
676 fn a_dint_record1_schema() -> Schema {
677 Schema::parse_str(r#"{"type":"record", "name":"Record1", "fields":[{"name":"a", "type":"int", "default":0}]}"#).unwrap()
678 }
679
680 fn a_int_b_dint_record1_schema() -> Schema {
681 Schema::parse_str(r#"{"type":"record", "name":"Record1", "fields":[{"name":"a", "type":"int"}, {"name":"b", "type":"int", "default":0}]}"#).unwrap()
682 }
683
684 fn a_dint_b_dint_record1_schema() -> Schema {
685 Schema::parse_str(r#"{"type":"record", "name":"Record1", "fields":[{"name":"a", "type":"int", "default":0}, {"name":"b", "type":"int", "default":0}]}"#).unwrap()
686 }
687
688 fn nested_record() -> Schema {
689 Schema::parse_str(r#"{"type":"record","name":"parent","fields":[{"name":"attribute","type":{"type":"record","name":"child","fields":[{"name":"id","type":"string"}]}}]}"#).unwrap()
690 }
691
692 fn nested_optional_record() -> Schema {
693 Schema::parse_str(r#"{"type":"record","name":"parent","fields":[{"name":"attribute","type":["null",{"type":"record","name":"child","fields":[{"name":"id","type":"string"}]}],"default":null}]}"#).unwrap()
694 }
695
696 fn int_list_record_schema() -> Schema {
697 Schema::parse_str(r#"{"type":"record", "name":"List", "fields": [{"name": "head", "type": "int"},{"name": "tail", "type": "array", "items": "int"}]}"#).unwrap()
698 }
699
700 fn long_list_record_schema() -> Schema {
701 Schema::parse_str(
702 r#"
703 {
704 "type":"record", "name":"List", "fields": [
705 {"name": "head", "type": "long"},
706 {"name": "tail", "type": "array", "items": "long"}
707 ]}
708"#,
709 )
710 .unwrap()
711 }
712
713 fn union_schema(schemas: Vec<Schema>) -> Schema {
714 let schema_string = schemas
715 .iter()
716 .map(|s| s.canonical_form())
717 .collect::<Vec<String>>()
718 .join(",");
719 Schema::parse_str(&format!("[{schema_string}]")).unwrap()
720 }
721
722 fn empty_union_schema() -> Schema {
723 union_schema(vec![])
724 }
725
726 fn int_union_schema() -> Schema {
730 union_schema(vec![Schema::Int])
731 }
732
733 fn long_union_schema() -> Schema {
734 union_schema(vec![Schema::Long])
735 }
736
737 fn string_union_schema() -> Schema {
738 union_schema(vec![Schema::String])
739 }
740
741 fn int_string_union_schema() -> Schema {
742 union_schema(vec![Schema::Int, Schema::String])
743 }
744
745 fn string_int_union_schema() -> Schema {
746 union_schema(vec![Schema::String, Schema::Int])
747 }
748
749 #[test]
750 fn test_broken() {
751 assert_eq!(
752 CompatibilityError::MissingUnionElements,
753 SchemaCompatibility::can_read(&int_string_union_schema(), &int_union_schema())
754 .unwrap_err()
755 )
756 }
757
758 #[test]
759 fn test_incompatible_reader_writer_pairs() {
760 let incompatible_schemas = vec![
761 (Schema::Null, Schema::Int),
763 (Schema::Null, Schema::Long),
764 (Schema::Boolean, Schema::Int),
766 (Schema::Int, Schema::Null),
768 (Schema::Int, Schema::Boolean),
769 (Schema::Int, Schema::Long),
770 (Schema::Int, Schema::Float),
771 (Schema::Int, Schema::Double),
772 (Schema::Long, Schema::Float),
774 (Schema::Long, Schema::Double),
775 (Schema::Float, Schema::Double),
777 (Schema::String, Schema::Boolean),
779 (Schema::String, Schema::Int),
780 (Schema::Bytes, Schema::Null),
782 (Schema::Bytes, Schema::Int),
783 (Schema::TimeMicros, Schema::Int),
785 (Schema::TimestampMillis, Schema::Int),
786 (Schema::TimestampMicros, Schema::Int),
787 (Schema::TimestampNanos, Schema::Int),
788 (Schema::LocalTimestampMillis, Schema::Int),
789 (Schema::LocalTimestampMicros, Schema::Int),
790 (Schema::LocalTimestampNanos, Schema::Int),
791 (Schema::Date, Schema::Long),
792 (Schema::TimeMillis, Schema::Long),
793 (int_array_schema(), long_array_schema()),
795 (int_map_schema(), int_array_schema()),
796 (int_array_schema(), int_map_schema()),
797 (int_map_schema(), long_map_schema()),
798 (enum1_ab_schema(), enum1_abc_schema()),
800 (enum1_bc_schema(), enum1_abc_schema()),
801 (enum1_ab_schema(), enum2_ab_schema()),
802 (Schema::Int, enum2_ab_schema()),
803 (enum2_ab_schema(), Schema::Int),
804 (int_union_schema(), int_string_union_schema()),
806 (string_union_schema(), int_string_union_schema()),
807 (empty_record2_schema(), empty_record1_schema()),
809 (a_int_record1_schema(), empty_record1_schema()),
810 (a_int_b_dint_record1_schema(), empty_record1_schema()),
811 (int_list_record_schema(), long_list_record_schema()),
812 (nested_record(), nested_optional_record()),
813 ];
814
815 assert!(
816 incompatible_schemas
817 .iter()
818 .any(|(reader, writer)| SchemaCompatibility::can_read(writer, reader).is_err())
819 );
820 }
821
822 #[rstest]
823 #[case(
825 r#"{"type": "record", "name": "record_a", "fields": [{"type": "long", "name": "date"}]}"#,
826 r#"{"type": "record", "name": "record_a", "fields": [{"type": "long", "name": "date", "default": 18181}]}"#
827 )]
828 #[case(
830 r#"{"type": "fixed", "name": "EmployeeId", "size": 16}"#,
831 r#"{"type": "fixed", "name": "EmployeeId", "size": 16, "default": "u00ffffffffffffx"}"#
832 )]
833 #[case(
835 r#"{"type": "enum", "name":"Enum1", "symbols": ["A","B"]}"#,
836 r#"{"type": "enum", "name":"Enum1", "symbols": ["A","B", "C"], "default": "C"}"#
837 )]
838 #[case(
840 r#"{"type": "map", "values": "int"}"#,
841 r#"{"type": "map", "values": "long"}"#
842 )]
843 #[case(r#"{"type": "int"}"#, r#"{"type": "int", "logicalType": "date"}"#)]
845 #[case(
847 r#"{"type": "int"}"#,
848 r#"{"type": "int", "logicalType": "time-millis"}"#
849 )]
850 #[case(
852 r#"{"type": "long"}"#,
853 r#"{"type": "long", "logicalType": "time-micros"}"#
854 )]
855 #[case(
857 r#"{"type": "long"}"#,
858 r#"{"type": "long", "logicalType": "timestamp-nanos"}"#
859 )]
860 #[case(
862 r#"{"type": "long"}"#,
863 r#"{"type": "long", "logicalType": "timestamp-millis"}"#
864 )]
865 #[case(
867 r#"{"type": "long"}"#,
868 r#"{"type": "long", "logicalType": "timestamp-micros"}"#
869 )]
870 #[case(
872 r#"{"type": "long"}"#,
873 r#"{"type": "long", "logicalType": "local-timestamp-millis"}"#
874 )]
875 #[case(
877 r#"{"type": "long"}"#,
878 r#"{"type": "long", "logicalType": "local-timestamp-micros"}"#
879 )]
880 #[case(
882 r#"{"type": "long"}"#,
883 r#"{"type": "long", "logicalType": "local-timestamp-nanos"}"#
884 )]
885 #[case(
887 r#"{"type": "array", "items": "int"}"#,
888 r#"{"type": "array", "items": "long"}"#
889 )]
890 fn test_avro_3950_match_schemas_ok(
891 #[case] writer_schema_str: &str,
892 #[case] reader_schema_str: &str,
893 ) {
894 let writer_schema = Schema::parse_str(writer_schema_str).unwrap();
895 let reader_schema = Schema::parse_str(reader_schema_str).unwrap();
896
897 assert!(SchemaCompatibility::match_schemas(&writer_schema, &reader_schema).is_ok());
898 }
899
900 #[rstest]
901 #[case(
903 r#"{"type": "record", "name":"record_a", "fields": [{"type": "long", "name": "date"}]}"#,
904 r#"{"type": "record", "name":"record_b", "fields": [{"type": "long", "name": "date"}]}"#,
905 CompatibilityError::NameMismatch{writer_name: String::from("record_a"), reader_name: String::from("record_b")}
906 )]
907 #[case(
909 r#"{"type": "fixed", "name": "EmployeeId", "size": 16}"#,
910 r#"{"type": "fixed", "name": "EmployeeId", "size": 20}"#,
911 CompatibilityError::FixedMismatch
912 )]
913 #[case(
915 r#"{"type": "enum", "name": "Enum1", "symbols": ["A","B"]}"#,
916 r#"{"type": "enum", "name": "Enum2", "symbols": ["A","B"]}"#,
917 CompatibilityError::NameMismatch{writer_name: String::from("Enum1"), reader_name: String::from("Enum2")}
918 )]
919 #[case(
921 r#"{"type":"map", "values": "long"}"#,
922 r#"{"type":"map", "values": "int"}"#,
923 CompatibilityError::TypeExpected {schema_type: String::from("readers_schema"), expected_type: vec![
924 SchemaKind::Long,
925 SchemaKind::Float,
926 SchemaKind::Double,
927 SchemaKind::TimeMicros,
928 SchemaKind::TimestampMillis,
929 SchemaKind::TimestampMicros,
930 SchemaKind::TimestampNanos,
931 SchemaKind::LocalTimestampMillis,
932 SchemaKind::LocalTimestampMicros,
933 SchemaKind::LocalTimestampNanos,
934 ]}
935 )]
936 #[case(
938 r#"{"type": "array", "items": "long"}"#,
939 r#"{"type": "array", "items": "int"}"#,
940 CompatibilityError::TypeExpected {schema_type: String::from("readers_schema"), expected_type: vec![
941 SchemaKind::Long,
942 SchemaKind::Float,
943 SchemaKind::Double,
944 SchemaKind::TimeMicros,
945 SchemaKind::TimestampMillis,
946 SchemaKind::TimestampMicros,
947 SchemaKind::TimestampNanos,
948 SchemaKind::LocalTimestampMillis,
949 SchemaKind::LocalTimestampMicros,
950 SchemaKind::LocalTimestampNanos,
951 ]}
952 )]
953 #[case(
955 r#"{"type": "string"}"#,
956 r#"{"type": "int", "logicalType": "date"}"#,
957 CompatibilityError::TypeExpected{schema_type: String::from("readers_schema"), expected_type: vec![
958 SchemaKind::String,
959 SchemaKind::Bytes,
960 SchemaKind::Uuid,
961 ]}
962 )]
963 #[case(
965 r#"{"type": "string"}"#,
966 r#"{"type": "int", "logicalType": "time-millis"}"#,
967 CompatibilityError::TypeExpected{schema_type: String::from("readers_schema"), expected_type: vec![
968 SchemaKind::String,
969 SchemaKind::Bytes,
970 SchemaKind::Uuid,
971 ]}
972 )]
973 #[case(
975 r#"{"type": "int"}"#,
976 r#"{"type": "long", "logicalType": "time-micros"}"#,
977 CompatibilityError::TypeExpected{schema_type: String::from("readers_schema"), expected_type: vec![
978 SchemaKind::Int,
979 SchemaKind::Long,
980 SchemaKind::Float,
981 SchemaKind::Double,
982 SchemaKind::Date,
983 SchemaKind::TimeMillis
984 ]}
985 )]
986 #[case(
1001 r#"{"type": "int"}"#,
1002 r#"{"type": "long", "logicalType": "timestamp-millis"}"#,
1003 CompatibilityError::TypeExpected{schema_type: String::from("readers_schema"), expected_type: vec![
1004 SchemaKind::Int,
1005 SchemaKind::Long,
1006 SchemaKind::Float,
1007 SchemaKind::Double,
1008 SchemaKind::Date,
1009 SchemaKind::TimeMillis
1010 ]}
1011 )]
1012 #[case(
1014 r#"{"type": "int"}"#,
1015 r#"{"type": "long", "logicalType": "timestamp-micros"}"#,
1016 CompatibilityError::TypeExpected{schema_type: String::from("readers_schema"), expected_type: vec![
1017 SchemaKind::Int,
1018 SchemaKind::Long,
1019 SchemaKind::Float,
1020 SchemaKind::Double,
1021 SchemaKind::Date,
1022 SchemaKind::TimeMillis
1023 ]}
1024 )]
1025 #[case(
1027 r#"{"type": "int"}"#,
1028 r#"{"type": "long", "logicalType": "local-timestamp-millis"}"#,
1029 CompatibilityError::TypeExpected{schema_type: String::from("readers_schema"), expected_type: vec![
1030 SchemaKind::Int,
1031 SchemaKind::Long,
1032 SchemaKind::Float,
1033 SchemaKind::Double,
1034 SchemaKind::Date,
1035 SchemaKind::TimeMillis
1036 ]}
1037 )]
1038 #[case(
1040 r#"{"type": "int"}"#,
1041 r#"{"type": "long", "logicalType": "local-timestamp-micros"}"#,
1042 CompatibilityError::TypeExpected{schema_type: String::from("readers_schema"), expected_type: vec![
1043 SchemaKind::Int,
1044 SchemaKind::Long,
1045 SchemaKind::Float,
1046 SchemaKind::Double,
1047 SchemaKind::Date,
1048 SchemaKind::TimeMillis
1049 ]}
1050 )]
1051 #[case(
1066 r#"{"type": "record", "name":"record_b", "fields": [{"type": "long", "name": "date"}]}"#,
1067 r#"{"type": "fixed", "name": "EmployeeId", "size": 16}"#,
1068 CompatibilityError::Inconclusive(String::from("writers_schema"))
1069 )]
1070 fn test_avro_3950_match_schemas_error(
1071 #[case] writer_schema_str: &str,
1072 #[case] reader_schema_str: &str,
1073 #[case] expected_error: CompatibilityError,
1074 ) {
1075 let writer_schema = Schema::parse_str(writer_schema_str).unwrap();
1076 let reader_schema = Schema::parse_str(reader_schema_str).unwrap();
1077
1078 assert_eq!(
1079 expected_error,
1080 SchemaCompatibility::match_schemas(&writer_schema, &reader_schema).unwrap_err()
1081 )
1082 }
1083
1084 #[test]
1085 fn test_compatible_reader_writer_pairs() {
1086 let uuid_fixed = FixedSchema {
1087 name: Name {
1088 name: String::new(),
1089 namespace: None,
1090 },
1091 aliases: None,
1092 doc: None,
1093 size: 16,
1094 default: None,
1095 attributes: Default::default(),
1096 };
1097
1098 let compatible_schemas = vec![
1099 (Schema::Null, Schema::Null),
1100 (Schema::Long, Schema::Int),
1101 (Schema::Float, Schema::Int),
1102 (Schema::Float, Schema::Long),
1103 (Schema::Double, Schema::Long),
1104 (Schema::Double, Schema::Int),
1105 (Schema::Double, Schema::Float),
1106 (Schema::String, Schema::Bytes),
1107 (Schema::Bytes, Schema::String),
1108 (
1110 Schema::Uuid(UuidSchema::String),
1111 Schema::Uuid(UuidSchema::String),
1112 ),
1113 (Schema::Uuid(UuidSchema::String), Schema::String),
1114 (Schema::String, Schema::Uuid(UuidSchema::String)),
1115 (
1116 Schema::Uuid(UuidSchema::Bytes),
1117 Schema::Uuid(UuidSchema::Bytes),
1118 ),
1119 (Schema::Uuid(UuidSchema::Bytes), Schema::Bytes),
1120 (Schema::Bytes, Schema::Uuid(UuidSchema::Bytes)),
1121 (
1122 Schema::Uuid(UuidSchema::Fixed(uuid_fixed.clone())),
1123 Schema::Uuid(UuidSchema::Fixed(uuid_fixed.clone())),
1124 ),
1125 (
1126 Schema::Uuid(UuidSchema::Fixed(uuid_fixed.clone())),
1127 Schema::Fixed(uuid_fixed.clone()),
1128 ),
1129 (
1130 Schema::Fixed(uuid_fixed.clone()),
1131 Schema::Uuid(UuidSchema::Fixed(uuid_fixed.clone())),
1132 ),
1133 (Schema::Date, Schema::Int),
1134 (Schema::TimeMillis, Schema::Int),
1135 (Schema::TimeMicros, Schema::Long),
1136 (Schema::TimestampMillis, Schema::Long),
1137 (Schema::TimestampMicros, Schema::Long),
1138 (Schema::TimestampNanos, Schema::Long),
1139 (Schema::LocalTimestampMillis, Schema::Long),
1140 (Schema::LocalTimestampMicros, Schema::Long),
1141 (Schema::LocalTimestampNanos, Schema::Long),
1142 (Schema::Int, Schema::Date),
1143 (Schema::Int, Schema::TimeMillis),
1144 (Schema::Long, Schema::TimeMicros),
1145 (Schema::Long, Schema::TimestampMillis),
1146 (Schema::Long, Schema::TimestampMicros),
1147 (Schema::Long, Schema::TimestampNanos),
1148 (Schema::Long, Schema::LocalTimestampMillis),
1149 (Schema::Long, Schema::LocalTimestampMicros),
1150 (Schema::Long, Schema::LocalTimestampNanos),
1151 (int_array_schema(), int_array_schema()),
1152 (long_array_schema(), int_array_schema()),
1153 (int_map_schema(), int_map_schema()),
1154 (long_map_schema(), int_map_schema()),
1155 (enum1_ab_schema(), enum1_ab_schema()),
1156 (enum1_abc_schema(), enum1_ab_schema()),
1157 (empty_union_schema(), empty_union_schema()),
1158 (int_union_schema(), int_union_schema()),
1159 (int_string_union_schema(), string_int_union_schema()),
1160 (int_union_schema(), empty_union_schema()),
1161 (long_union_schema(), int_union_schema()),
1162 (int_union_schema(), Schema::Int),
1163 (Schema::Int, int_union_schema()),
1164 (empty_record1_schema(), empty_record1_schema()),
1165 (empty_record1_schema(), a_int_record1_schema()),
1166 (a_int_record1_schema(), a_int_record1_schema()),
1167 (a_dint_record1_schema(), a_int_record1_schema()),
1168 (a_dint_record1_schema(), a_dint_record1_schema()),
1169 (a_int_record1_schema(), a_dint_record1_schema()),
1170 (a_long_record1_schema(), a_int_record1_schema()),
1171 (a_int_record1_schema(), a_int_b_int_record1_schema()),
1172 (a_dint_record1_schema(), a_int_b_int_record1_schema()),
1173 (a_int_b_dint_record1_schema(), a_int_record1_schema()),
1174 (a_dint_b_dint_record1_schema(), empty_record1_schema()),
1175 (a_dint_b_dint_record1_schema(), a_int_record1_schema()),
1176 (a_int_b_int_record1_schema(), a_dint_b_dint_record1_schema()),
1177 (int_list_record_schema(), int_list_record_schema()),
1178 (long_list_record_schema(), long_list_record_schema()),
1179 (long_list_record_schema(), int_list_record_schema()),
1180 (nested_optional_record(), nested_record()),
1181 ];
1182
1183 for (reader, writer) in compatible_schemas {
1184 SchemaCompatibility::can_read(&writer, &reader).unwrap();
1185 }
1186 }
1187
1188 fn writer_schema() -> Schema {
1189 Schema::parse_str(
1190 r#"
1191 {"type":"record", "name":"Record", "fields":[
1192 {"name":"oldfield1", "type":"int"},
1193 {"name":"oldfield2", "type":"string"}
1194 ]}
1195"#,
1196 )
1197 .unwrap()
1198 }
1199
1200 #[test]
1201 fn test_missing_field() -> TestResult {
1202 let reader_schema = Schema::parse_str(
1203 r#"
1204 {"type":"record", "name":"Record", "fields":[
1205 {"name":"oldfield1", "type":"int"}
1206 ]}
1207"#,
1208 )?;
1209 assert!(SchemaCompatibility::can_read(&writer_schema(), &reader_schema,).is_ok());
1210 assert_eq!(
1211 CompatibilityError::MissingDefaultValue(String::from("oldfield2")),
1212 SchemaCompatibility::can_read(&reader_schema, &writer_schema()).unwrap_err()
1213 );
1214
1215 Ok(())
1216 }
1217
1218 #[test]
1219 fn test_missing_second_field() -> TestResult {
1220 let reader_schema = Schema::parse_str(
1221 r#"
1222 {"type":"record", "name":"Record", "fields":[
1223 {"name":"oldfield2", "type":"string"}
1224 ]}
1225"#,
1226 )?;
1227 assert!(SchemaCompatibility::can_read(&writer_schema(), &reader_schema).is_ok());
1228 assert_eq!(
1229 CompatibilityError::MissingDefaultValue(String::from("oldfield1")),
1230 SchemaCompatibility::can_read(&reader_schema, &writer_schema()).unwrap_err()
1231 );
1232
1233 Ok(())
1234 }
1235
1236 #[test]
1237 fn test_all_fields() -> TestResult {
1238 let reader_schema = Schema::parse_str(
1239 r#"
1240 {"type":"record", "name":"Record", "fields":[
1241 {"name":"oldfield1", "type":"int"},
1242 {"name":"oldfield2", "type":"string"}
1243 ]}
1244"#,
1245 )?;
1246 assert!(SchemaCompatibility::can_read(&writer_schema(), &reader_schema).is_ok());
1247 assert!(SchemaCompatibility::can_read(&reader_schema, &writer_schema()).is_ok());
1248
1249 Ok(())
1250 }
1251
1252 #[test]
1253 fn test_new_field_with_default() -> TestResult {
1254 let reader_schema = Schema::parse_str(
1255 r#"
1256 {"type":"record", "name":"Record", "fields":[
1257 {"name":"oldfield1", "type":"int"},
1258 {"name":"newfield1", "type":"int", "default":42}
1259 ]}
1260"#,
1261 )?;
1262 assert!(SchemaCompatibility::can_read(&writer_schema(), &reader_schema).is_ok());
1263 assert_eq!(
1264 CompatibilityError::MissingDefaultValue(String::from("oldfield2")),
1265 SchemaCompatibility::can_read(&reader_schema, &writer_schema()).unwrap_err()
1266 );
1267
1268 Ok(())
1269 }
1270
1271 #[test]
1272 fn test_new_field() -> TestResult {
1273 let reader_schema = Schema::parse_str(
1274 r#"
1275 {"type":"record", "name":"Record", "fields":[
1276 {"name":"oldfield1", "type":"int"},
1277 {"name":"newfield1", "type":"int"}
1278 ]}
1279"#,
1280 )?;
1281 assert_eq!(
1282 CompatibilityError::MissingDefaultValue(String::from("newfield1")),
1283 SchemaCompatibility::can_read(&writer_schema(), &reader_schema).unwrap_err()
1284 );
1285 assert_eq!(
1286 CompatibilityError::MissingDefaultValue(String::from("oldfield2")),
1287 SchemaCompatibility::can_read(&reader_schema, &writer_schema()).unwrap_err()
1288 );
1289
1290 Ok(())
1291 }
1292
1293 #[test]
1294 fn test_array_writer_schema() {
1295 let valid_reader = string_array_schema();
1296 let invalid_reader = string_map_schema();
1297
1298 assert!(SchemaCompatibility::can_read(&string_array_schema(), &valid_reader).is_ok());
1299 assert_eq!(
1300 CompatibilityError::Inconclusive(String::from("writers_schema")),
1301 SchemaCompatibility::can_read(&string_array_schema(), &invalid_reader).unwrap_err()
1302 );
1303 }
1304
1305 #[test]
1306 fn test_primitive_writer_schema() {
1307 let valid_reader = Schema::String;
1308 assert!(SchemaCompatibility::can_read(&Schema::String, &valid_reader).is_ok());
1309 assert_eq!(
1310 CompatibilityError::TypeExpected {
1311 schema_type: String::from("readers_schema"),
1312 expected_type: vec![
1313 SchemaKind::Int,
1314 SchemaKind::Long,
1315 SchemaKind::Float,
1316 SchemaKind::Double,
1317 SchemaKind::Date,
1318 SchemaKind::TimeMillis
1319 ],
1320 },
1321 SchemaCompatibility::can_read(&Schema::Int, &Schema::String).unwrap_err()
1322 );
1323 }
1324
1325 #[test]
1326 fn test_union_reader_writer_subset_incompatibility() {
1327 let union_writer = union_schema(vec![Schema::Int, Schema::String]);
1329 let union_reader = union_schema(vec![Schema::String]);
1330
1331 assert_eq!(
1332 CompatibilityError::MissingUnionElements,
1333 SchemaCompatibility::can_read(&union_writer, &union_reader).unwrap_err()
1334 );
1335 assert!(SchemaCompatibility::can_read(&union_reader, &union_writer).is_ok());
1336 }
1337
1338 #[test]
1339 fn test_incompatible_record_field() -> TestResult {
1340 let string_schema = Schema::parse_str(
1341 r#"
1342 {"type":"record", "name":"MyRecord", "namespace":"ns", "fields": [
1343 {"name":"field1", "type":"string"}
1344 ]}
1345 "#,
1346 )?;
1347
1348 let int_schema = Schema::parse_str(
1349 r#"
1350 {"type":"record", "name":"MyRecord", "namespace":"ns", "fields": [
1351 {"name":"field1", "type":"int"}
1352 ]}
1353 "#,
1354 )?;
1355
1356 assert_eq!(
1357 CompatibilityError::FieldTypeMismatch(
1358 "field1".to_owned(),
1359 Box::new(CompatibilityError::TypeExpected {
1360 schema_type: "readers_schema".to_owned(),
1361 expected_type: vec![SchemaKind::String, SchemaKind::Bytes, SchemaKind::Uuid]
1362 })
1363 ),
1364 SchemaCompatibility::can_read(&string_schema, &int_schema).unwrap_err()
1365 );
1366
1367 Ok(())
1368 }
1369
1370 #[test]
1371 fn test_enum_symbols() -> TestResult {
1372 let enum_schema1 = Schema::parse_str(
1373 r#"
1374 {"type":"enum", "name":"MyEnum", "symbols":["A","B"]}
1375"#,
1376 )?;
1377 let enum_schema2 =
1378 Schema::parse_str(r#"{"type":"enum", "name":"MyEnum", "symbols":["A","B","C"]}"#)?;
1379 assert_eq!(
1380 CompatibilityError::MissingSymbols,
1381 SchemaCompatibility::can_read(&enum_schema2, &enum_schema1).unwrap_err()
1382 );
1383 assert!(SchemaCompatibility::can_read(&enum_schema1, &enum_schema2).is_ok());
1384
1385 Ok(())
1386 }
1387
1388 fn point_2d_schema() -> Schema {
1389 Schema::parse_str(
1390 r#"
1391 {"type":"record", "name":"Point2D", "fields":[
1392 {"name":"x", "type":"double"},
1393 {"name":"y", "type":"double"}
1394 ]}
1395 "#,
1396 )
1397 .unwrap()
1398 }
1399
1400 fn point_2d_fullname_schema() -> Schema {
1401 Schema::parse_str(
1402 r#"
1403 {"type":"record", "name":"Point", "namespace":"written", "fields":[
1404 {"name":"x", "type":"double"},
1405 {"name":"y", "type":"double"}
1406 ]}
1407 "#,
1408 )
1409 .unwrap()
1410 }
1411
1412 fn point_3d_no_default_schema() -> Schema {
1413 Schema::parse_str(
1414 r#"
1415 {"type":"record", "name":"Point", "fields":[
1416 {"name":"x", "type":"double"},
1417 {"name":"y", "type":"double"},
1418 {"name":"z", "type":"double"}
1419 ]}
1420 "#,
1421 )
1422 .unwrap()
1423 }
1424
1425 fn point_3d_schema() -> Schema {
1426 Schema::parse_str(
1427 r#"
1428 {"type":"record", "name":"Point3D", "fields":[
1429 {"name":"x", "type":"double"},
1430 {"name":"y", "type":"double"},
1431 {"name":"z", "type":"double", "default": 0.0}
1432 ]}
1433 "#,
1434 )
1435 .unwrap()
1436 }
1437
1438 fn point_3d_match_name_schema() -> Schema {
1439 Schema::parse_str(
1440 r#"
1441 {"type":"record", "name":"Point", "fields":[
1442 {"name":"x", "type":"double"},
1443 {"name":"y", "type":"double"},
1444 {"name":"z", "type":"double", "default": 0.0}
1445 ]}
1446 "#,
1447 )
1448 .unwrap()
1449 }
1450
1451 #[test]
1452 fn test_union_resolution_no_structure_match() {
1453 let read_schema = union_schema(vec![Schema::Null, point_3d_no_default_schema()]);
1455 assert_eq!(
1456 CompatibilityError::MissingUnionElements,
1457 SchemaCompatibility::can_read(&point_2d_fullname_schema(), &read_schema).unwrap_err()
1458 );
1459 }
1460
1461 #[test]
1462 fn test_union_resolution_first_structure_match_2d() {
1463 let read_schema = union_schema(vec![
1465 Schema::Null,
1466 point_3d_no_default_schema(),
1467 point_2d_schema(),
1468 point_3d_schema(),
1469 ]);
1470 assert_eq!(
1471 CompatibilityError::MissingUnionElements,
1472 SchemaCompatibility::can_read(&point_2d_fullname_schema(), &read_schema).unwrap_err()
1473 );
1474 }
1475
1476 #[test]
1477 fn test_union_resolution_first_structure_match_3d() {
1478 let read_schema = union_schema(vec![
1480 Schema::Null,
1481 point_3d_no_default_schema(),
1482 point_3d_schema(),
1483 point_2d_schema(),
1484 ]);
1485 assert_eq!(
1486 CompatibilityError::MissingUnionElements,
1487 SchemaCompatibility::can_read(&point_2d_fullname_schema(), &read_schema).unwrap_err()
1488 );
1489 }
1490
1491 #[test]
1492 fn test_union_resolution_named_structure_match() {
1493 let read_schema = union_schema(vec![
1495 Schema::Null,
1496 point_2d_schema(),
1497 point_3d_match_name_schema(),
1498 point_3d_schema(),
1499 ]);
1500 assert_eq!(
1501 CompatibilityError::MissingUnionElements,
1502 SchemaCompatibility::can_read(&point_2d_fullname_schema(), &read_schema).unwrap_err()
1503 );
1504 }
1505
1506 #[test]
1507 fn test_union_resolution_full_name_match() {
1508 let read_schema = union_schema(vec![
1510 Schema::Null,
1511 point_2d_schema(),
1512 point_3d_match_name_schema(),
1513 point_3d_schema(),
1514 point_2d_fullname_schema(),
1515 ]);
1516 assert!(SchemaCompatibility::can_read(&point_2d_fullname_schema(), &read_schema).is_ok());
1517 }
1518
1519 #[test]
1520 fn test_avro_3772_enum_default() -> TestResult {
1521 let writer_raw_schema = r#"
1522 {
1523 "type": "record",
1524 "name": "test",
1525 "fields": [
1526 {"name": "a", "type": "long", "default": 42},
1527 {"name": "b", "type": "string"},
1528 {
1529 "name": "c",
1530 "type": {
1531 "type": "enum",
1532 "name": "suit",
1533 "symbols": ["diamonds", "spades", "clubs", "hearts"],
1534 "default": "spades"
1535 }
1536 }
1537 ]
1538 }
1539 "#;
1540
1541 let reader_raw_schema = r#"
1542 {
1543 "type": "record",
1544 "name": "test",
1545 "fields": [
1546 {"name": "a", "type": "long", "default": 42},
1547 {"name": "b", "type": "string"},
1548 {
1549 "name": "c",
1550 "type": {
1551 "type": "enum",
1552 "name": "suit",
1553 "symbols": ["diamonds", "spades", "ninja", "hearts"],
1554 "default": "spades"
1555 }
1556 }
1557 ]
1558 }
1559 "#;
1560 let writer_schema = Schema::parse_str(writer_raw_schema)?;
1561 let reader_schema = Schema::parse_str(reader_raw_schema)?;
1562 let mut writer = Writer::with_codec(&writer_schema, Vec::new(), Codec::Null)?;
1563 let mut record = Record::new(writer.schema()).unwrap();
1564 record.put("a", 27i64);
1565 record.put("b", "foo");
1566 record.put("c", "clubs");
1567 writer.append(record).unwrap();
1568 let input = writer.into_inner()?;
1569 let mut reader = Reader::with_schema(&reader_schema, &input[..])?;
1570 assert_eq!(
1571 reader.next().unwrap().unwrap(),
1572 Value::Record(vec![
1573 ("a".to_string(), Value::Long(27)),
1574 ("b".to_string(), Value::String("foo".to_string())),
1575 ("c".to_string(), Value::Enum(1, "spades".to_string())),
1576 ])
1577 );
1578 assert!(reader.next().is_none());
1579
1580 Ok(())
1581 }
1582
1583 #[test]
1584 fn test_avro_3772_enum_default_less_symbols() -> TestResult {
1585 let writer_raw_schema = r#"
1586 {
1587 "type": "record",
1588 "name": "test",
1589 "fields": [
1590 {"name": "a", "type": "long", "default": 42},
1591 {"name": "b", "type": "string"},
1592 {
1593 "name": "c",
1594 "type": {
1595 "type": "enum",
1596 "name": "suit",
1597 "symbols": ["diamonds", "spades", "clubs", "hearts"],
1598 "default": "spades"
1599 }
1600 }
1601 ]
1602 }
1603 "#;
1604
1605 let reader_raw_schema = r#"
1606 {
1607 "type": "record",
1608 "name": "test",
1609 "fields": [
1610 {"name": "a", "type": "long", "default": 42},
1611 {"name": "b", "type": "string"},
1612 {
1613 "name": "c",
1614 "type": {
1615 "type": "enum",
1616 "name": "suit",
1617 "symbols": ["hearts", "spades"],
1618 "default": "spades"
1619 }
1620 }
1621 ]
1622 }
1623 "#;
1624 let writer_schema = Schema::parse_str(writer_raw_schema)?;
1625 let reader_schema = Schema::parse_str(reader_raw_schema)?;
1626 let mut writer = Writer::with_codec(&writer_schema, Vec::new(), Codec::Null)?;
1627 let mut record = Record::new(writer.schema()).unwrap();
1628 record.put("a", 27i64);
1629 record.put("b", "foo");
1630 record.put("c", "hearts");
1631 writer.append(record).unwrap();
1632 let input = writer.into_inner()?;
1633 let mut reader = Reader::with_schema(&reader_schema, &input[..])?;
1634 assert_eq!(
1635 reader.next().unwrap().unwrap(),
1636 Value::Record(vec![
1637 ("a".to_string(), Value::Long(27)),
1638 ("b".to_string(), Value::String("foo".to_string())),
1639 ("c".to_string(), Value::Enum(0, "hearts".to_string())),
1640 ])
1641 );
1642 assert!(reader.next().is_none());
1643
1644 Ok(())
1645 }
1646
1647 #[test]
1648 fn avro_3894_take_aliases_into_account_when_serializing_for_schema_compatibility() -> TestResult
1649 {
1650 let schema_v1 = Schema::parse_str(
1651 r#"
1652 {
1653 "type": "record",
1654 "name": "Conference",
1655 "namespace": "advdaba",
1656 "fields": [
1657 {"type": "string", "name": "name"},
1658 {"type": "long", "name": "date"}
1659 ]
1660 }"#,
1661 )?;
1662
1663 let schema_v2 = Schema::parse_str(
1664 r#"
1665 {
1666 "type": "record",
1667 "name": "Conference",
1668 "namespace": "advdaba",
1669 "fields": [
1670 {"type": "string", "name": "name"},
1671 {"type": "long", "name": "date", "aliases" : [ "time" ]}
1672 ]
1673 }"#,
1674 )?;
1675
1676 assert!(SchemaCompatibility::mutual_read(&schema_v1, &schema_v2).is_ok());
1677
1678 Ok(())
1679 }
1680
1681 #[test]
1682 fn avro_3917_take_aliases_into_account_for_schema_compatibility() -> TestResult {
1683 let schema_v1 = Schema::parse_str(
1684 r#"
1685 {
1686 "type": "record",
1687 "name": "Conference",
1688 "namespace": "advdaba",
1689 "fields": [
1690 {"type": "string", "name": "name"},
1691 {"type": "long", "name": "date", "aliases" : [ "time" ]}
1692 ]
1693 }"#,
1694 )?;
1695
1696 let schema_v2 = Schema::parse_str(
1697 r#"
1698 {
1699 "type": "record",
1700 "name": "Conference",
1701 "namespace": "advdaba",
1702 "fields": [
1703 {"type": "string", "name": "name"},
1704 {"type": "long", "name": "time"}
1705 ]
1706 }"#,
1707 )?;
1708
1709 assert!(SchemaCompatibility::can_read(&schema_v2, &schema_v1).is_ok());
1710 assert_eq!(
1711 CompatibilityError::MissingDefaultValue(String::from("time")),
1712 SchemaCompatibility::can_read(&schema_v1, &schema_v2).unwrap_err()
1713 );
1714
1715 Ok(())
1716 }
1717
1718 #[test]
1719 fn test_avro_3898_record_schemas_match_by_unqualified_name() -> TestResult {
1720 let schemas = [
1721 (
1723 Schema::parse_str(
1724 r#"{
1725 "type": "record",
1726 "name": "Statistics",
1727 "fields": [
1728 { "name": "success", "type": "int" },
1729 { "name": "fail", "type": "int" },
1730 { "name": "time", "type": "string" },
1731 { "name": "max", "type": "int", "default": 0 }
1732 ]
1733 }"#,
1734 )?,
1735 Schema::parse_str(
1736 r#"{
1737 "type": "record",
1738 "name": "Statistics",
1739 "namespace": "my.namespace",
1740 "fields": [
1741 { "name": "success", "type": "int" },
1742 { "name": "fail", "type": "int" },
1743 { "name": "time", "type": "string" },
1744 { "name": "average", "type": "int", "default": 0}
1745 ]
1746 }"#,
1747 )?,
1748 ),
1749 (
1751 Schema::parse_str(
1752 r#"{
1753 "type": "enum",
1754 "name": "Suit",
1755 "symbols": ["diamonds", "spades", "clubs"]
1756 }"#,
1757 )?,
1758 Schema::parse_str(
1759 r#"{
1760 "type": "enum",
1761 "name": "Suit",
1762 "namespace": "my.namespace",
1763 "symbols": ["diamonds", "spades", "clubs", "hearts"]
1764 }"#,
1765 )?,
1766 ),
1767 (
1769 Schema::parse_str(
1770 r#"{
1771 "type": "fixed",
1772 "name": "EmployeeId",
1773 "size": 16
1774 }"#,
1775 )?,
1776 Schema::parse_str(
1777 r#"{
1778 "type": "fixed",
1779 "name": "EmployeeId",
1780 "namespace": "my.namespace",
1781 "size": 16
1782 }"#,
1783 )?,
1784 ),
1785 ];
1786
1787 for (schema_1, schema_2) in schemas {
1788 assert!(SchemaCompatibility::can_read(&schema_1, &schema_2).is_ok());
1789 }
1790
1791 Ok(())
1792 }
1793
1794 #[test]
1795 fn test_can_read_compatibility_errors() -> TestResult {
1796 let schemas = [
1797 (
1798 Schema::parse_str(
1799 r#"{
1800 "type": "record",
1801 "name": "StatisticsMap",
1802 "fields": [
1803 {"name": "average", "type": "int", "default": 0},
1804 {"name": "success", "type": {"type": "map", "values": "int"}}
1805 ]
1806 }"#,
1807 )?,
1808 Schema::parse_str(
1809 r#"{
1810 "type": "record",
1811 "name": "StatisticsMap",
1812 "fields": [
1813 {"name": "average", "type": "int", "default": 0},
1814 {"name": "success", "type": ["null", {"type": "map", "values": "int"}], "default": null}
1815 ]
1816 }"#,
1817 )?,
1818 "Incompatible schemata! Field 'success' in reader schema does not match the type in the writer schema",
1819 ),
1820 (
1821 Schema::parse_str(
1822 r#"{
1823 "type": "record",
1824 "name": "StatisticsArray",
1825 "fields": [
1826 {"name": "max_values", "type": {"type": "array", "items": "int"}}
1827 ]
1828 }"#,
1829 )?,
1830 Schema::parse_str(
1831 r#"{
1832 "type": "record",
1833 "name": "StatisticsArray",
1834 "fields": [
1835 {"name": "max_values", "type": ["null", {"type": "array", "items": "int"}], "default": null}
1836 ]
1837 }"#,
1838 )?,
1839 "Incompatible schemata! Field 'max_values' in reader schema does not match the type in the writer schema",
1840 ),
1841 ];
1842
1843 for (schema_1, schema_2, error) in schemas {
1844 assert!(SchemaCompatibility::can_read(&schema_1, &schema_2).is_ok());
1845 assert_eq!(
1846 error,
1847 SchemaCompatibility::can_read(&schema_2, &schema_1)
1848 .unwrap_err()
1849 .to_string()
1850 );
1851 }
1852
1853 Ok(())
1854 }
1855
1856 #[test]
1857 fn avro_3974_can_read_schema_references() -> TestResult {
1858 let schema_strs = vec![
1859 r#"{
1860 "type": "record",
1861 "name": "Child",
1862 "namespace": "avro",
1863 "fields": [
1864 {
1865 "name": "val",
1866 "type": "int"
1867 }
1868 ]
1869 }
1870 "#,
1871 r#"{
1872 "type": "record",
1873 "name": "Parent",
1874 "namespace": "avro",
1875 "fields": [
1876 {
1877 "name": "child",
1878 "type": "avro.Child"
1879 }
1880 ]
1881 }
1882 "#,
1883 ];
1884
1885 let schemas = Schema::parse_list(schema_strs).unwrap();
1886 SchemaCompatibility::can_read(&schemas[1], &schemas[1])?;
1887
1888 Ok(())
1889 }
1890}