1use crate::{
20 error::CompatibilityError,
21 schema::{EnumSchema, FixedSchema, RecordSchema, Schema, SchemaKind},
22};
23use std::{
24 collections::{HashSet, hash_map::DefaultHasher},
25 hash::Hasher,
26 ptr,
27};
28
29fn match_ref_schemas(
30 writers_schema: &Schema,
31 readers_schema: &Schema,
32) -> Result<(), CompatibilityError> {
33 match (readers_schema, writers_schema) {
34 (Schema::Ref { name: r_name }, Schema::Ref { name: w_name }) => {
35 if r_name == w_name {
36 Ok(())
37 } else {
38 Err(CompatibilityError::NameMismatch {
39 writer_name: w_name.fullname(None),
40 reader_name: r_name.fullname(None),
41 })
42 }
43 }
44 _ => Err(CompatibilityError::WrongType {
45 writer_schema_type: format!("{writers_schema:#?}"),
46 reader_schema_type: format!("{readers_schema:#?}"),
47 }),
48 }
49}
50
51pub struct SchemaCompatibility;
52
53struct Checker {
54 recursion: HashSet<(u64, u64)>,
55}
56
57impl Checker {
58 pub(crate) fn new() -> Self {
60 Self {
61 recursion: HashSet::new(),
62 }
63 }
64
65 pub(crate) fn can_read(
66 &mut self,
67 writers_schema: &Schema,
68 readers_schema: &Schema,
69 ) -> Result<(), CompatibilityError> {
70 self.full_match_schemas(writers_schema, readers_schema)
71 }
72
73 pub(crate) fn full_match_schemas(
74 &mut self,
75 writers_schema: &Schema,
76 readers_schema: &Schema,
77 ) -> Result<(), CompatibilityError> {
78 if self.recursion_in_progress(writers_schema, readers_schema) {
79 return Ok(());
80 }
81
82 SchemaCompatibility::match_schemas(writers_schema, readers_schema)?;
83
84 let w_type = SchemaKind::from(writers_schema);
85 let r_type = SchemaKind::from(readers_schema);
86
87 if w_type != SchemaKind::Union
88 && (r_type.is_primitive()
89 || r_type == SchemaKind::Fixed
90 || r_type == SchemaKind::Uuid
91 || r_type == SchemaKind::Date
92 || r_type == SchemaKind::TimeMillis
93 || r_type == SchemaKind::TimeMicros
94 || r_type == SchemaKind::TimestampMillis
95 || r_type == SchemaKind::TimestampMicros
96 || r_type == SchemaKind::TimestampNanos
97 || r_type == SchemaKind::LocalTimestampMillis
98 || r_type == SchemaKind::LocalTimestampMicros
99 || r_type == SchemaKind::LocalTimestampNanos)
100 {
101 return Ok(());
102 }
103
104 match r_type {
105 SchemaKind::Ref => match_ref_schemas(writers_schema, readers_schema),
106 SchemaKind::Record => self.match_record_schemas(writers_schema, readers_schema),
107 SchemaKind::Map => {
108 if let Schema::Map(w_m) = writers_schema {
109 match readers_schema {
110 Schema::Map(r_m) => self.full_match_schemas(&w_m.types, &r_m.types),
111 _ => Err(CompatibilityError::WrongType {
112 writer_schema_type: format!("{writers_schema:#?}"),
113 reader_schema_type: format!("{readers_schema:#?}"),
114 }),
115 }
116 } else {
117 Err(CompatibilityError::TypeExpected {
118 schema_type: String::from("writers_schema"),
119 expected_type: vec![SchemaKind::Record],
120 })
121 }
122 }
123 SchemaKind::Array => {
124 if let Schema::Array(w_a) = writers_schema {
125 match readers_schema {
126 Schema::Array(r_a) => self.full_match_schemas(&w_a.items, &r_a.items),
127 _ => Err(CompatibilityError::WrongType {
128 writer_schema_type: format!("{writers_schema:#?}"),
129 reader_schema_type: format!("{readers_schema:#?}"),
130 }),
131 }
132 } else {
133 Err(CompatibilityError::TypeExpected {
134 schema_type: String::from("writers_schema"),
135 expected_type: vec![SchemaKind::Array],
136 })
137 }
138 }
139 SchemaKind::Union => self.match_union_schemas(writers_schema, readers_schema),
140 SchemaKind::Enum => {
141 if let Schema::Enum(EnumSchema {
143 symbols: w_symbols, ..
144 }) = writers_schema
145 {
146 if let Schema::Enum(EnumSchema {
147 symbols: r_symbols, ..
148 }) = readers_schema
149 {
150 if w_symbols.iter().all(|e| r_symbols.contains(e)) {
151 return Ok(());
152 }
153 }
154 }
155 Err(CompatibilityError::MissingSymbols)
156 }
157 _ => {
158 if w_type == SchemaKind::Union {
159 if let Schema::Union(r) = writers_schema {
160 if r.schemas.len() == 1 {
161 return self.full_match_schemas(&r.schemas[0], readers_schema);
162 }
163 }
164 }
165 Err(CompatibilityError::Inconclusive(String::from(
166 "writers_schema",
167 )))
168 }
169 }
170 }
171
172 fn match_record_schemas(
173 &mut self,
174 writers_schema: &Schema,
175 readers_schema: &Schema,
176 ) -> Result<(), CompatibilityError> {
177 let w_type = SchemaKind::from(writers_schema);
178
179 if w_type == SchemaKind::Union {
180 return Err(CompatibilityError::TypeExpected {
181 schema_type: String::from("writers_schema"),
182 expected_type: vec![SchemaKind::Record],
183 });
184 }
185
186 if let Schema::Record(RecordSchema {
187 fields: w_fields,
188 lookup: w_lookup,
189 ..
190 }) = writers_schema
191 {
192 if let Schema::Record(RecordSchema {
193 fields: r_fields, ..
194 }) = readers_schema
195 {
196 for field in r_fields.iter() {
197 let mut fields_names = vec![&field.name];
199 if let Some(ref aliases) = field.aliases {
200 for alias in aliases {
201 fields_names.push(alias);
202 }
203 }
204
205 let position = fields_names.iter().find_map(|field_name| {
209 if let Some(pos) = w_lookup.get(*field_name) {
210 if &w_fields[*pos].name == *field_name {
211 return Some(pos);
212 }
213 }
214 None
215 });
216
217 match position {
218 Some(pos) => {
219 if let Err(err) =
220 self.full_match_schemas(&w_fields[*pos].schema, &field.schema)
221 {
222 return Err(CompatibilityError::FieldTypeMismatch(
223 field.name.clone(),
224 Box::new(err),
225 ));
226 }
227 }
228 _ => {
229 if field.default.is_none() {
230 return Err(CompatibilityError::MissingDefaultValue(
231 field.name.clone(),
232 ));
233 }
234 }
235 }
236 }
237 }
238 }
239 Ok(())
240 }
241
242 fn match_union_schemas(
243 &mut self,
244 writers_schema: &Schema,
245 readers_schema: &Schema,
246 ) -> Result<(), CompatibilityError> {
247 if let Schema::Union(u) = writers_schema {
248 if u.schemas
249 .iter()
250 .all(|schema| self.full_match_schemas(schema, readers_schema).is_ok())
251 {
252 return Ok(());
253 } else {
254 return Err(CompatibilityError::MissingUnionElements);
255 }
256 } else if let Schema::Union(u) = readers_schema {
257 if u.schemas
261 .iter()
262 .any(|schema| self.full_match_schemas(writers_schema, schema).is_ok())
263 {
264 return Ok(());
265 }
266 }
267 Err(CompatibilityError::MissingUnionElements)
268 }
269
270 fn recursion_in_progress(&mut self, writers_schema: &Schema, readers_schema: &Schema) -> bool {
271 let mut hasher = DefaultHasher::new();
272 ptr::hash(writers_schema, &mut hasher);
273 let w_hash = hasher.finish();
274
275 hasher = DefaultHasher::new();
276 ptr::hash(readers_schema, &mut hasher);
277 let r_hash = hasher.finish();
278
279 let key = (w_hash, r_hash);
280 !self.recursion.insert(key)
283 }
284}
285
286impl SchemaCompatibility {
287 pub fn can_read(
290 writers_schema: &Schema,
291 readers_schema: &Schema,
292 ) -> Result<(), CompatibilityError> {
293 let mut c = Checker::new();
294 c.can_read(writers_schema, readers_schema)
295 }
296
297 pub fn mutual_read(
300 writers_schema: &Schema,
301 readers_schema: &Schema,
302 ) -> Result<(), CompatibilityError> {
303 SchemaCompatibility::can_read(writers_schema, readers_schema)?;
304 SchemaCompatibility::can_read(readers_schema, writers_schema)
305 }
306
307 pub(crate) fn match_schemas(
313 writers_schema: &Schema,
314 readers_schema: &Schema,
315 ) -> Result<(), CompatibilityError> {
316 fn check_reader_type_multi(
317 reader_type: SchemaKind,
318 allowed_reader_types: Vec<SchemaKind>,
319 writer_type: SchemaKind,
320 ) -> Result<(), CompatibilityError> {
321 if allowed_reader_types.contains(&reader_type) {
322 Ok(())
323 } else {
324 let mut allowed_types: Vec<SchemaKind> = vec![writer_type];
325 allowed_types.extend_from_slice(allowed_reader_types.as_slice());
326 Err(CompatibilityError::TypeExpected {
327 schema_type: String::from("readers_schema"),
328 expected_type: allowed_types,
329 })
330 }
331 }
332
333 fn check_reader_type(
334 reader_type: SchemaKind,
335 allowed_reader_type: SchemaKind,
336 writer_type: SchemaKind,
337 ) -> Result<(), CompatibilityError> {
338 if reader_type == allowed_reader_type {
339 Ok(())
340 } else {
341 Err(CompatibilityError::TypeExpected {
342 schema_type: String::from("readers_schema"),
343 expected_type: vec![writer_type, allowed_reader_type],
344 })
345 }
346 }
347
348 fn check_writer_type(
349 writers_schema: &Schema,
350 allowed_schema: &Schema,
351 expected_schema_types: Vec<SchemaKind>,
352 ) -> Result<(), CompatibilityError> {
353 if *allowed_schema == *writers_schema {
354 Ok(())
355 } else {
356 Err(CompatibilityError::TypeExpected {
357 schema_type: String::from("writers_schema"),
358 expected_type: expected_schema_types,
359 })
360 }
361 }
362
363 let w_type = SchemaKind::from(writers_schema);
364 let r_type = SchemaKind::from(readers_schema);
365
366 if w_type == SchemaKind::Union || r_type == SchemaKind::Union {
367 return Ok(());
368 }
369
370 if w_type == r_type {
371 if r_type.is_primitive() {
372 return Ok(());
373 }
374
375 match r_type {
376 SchemaKind::Record | SchemaKind::Enum => {
377 let msg = format!("A {readers_schema} type must always has a name");
378 let writers_name = writers_schema.name().expect(&msg);
379 let readers_name = readers_schema.name().expect(&msg);
380
381 if writers_name.name == readers_name.name {
382 return Ok(());
383 }
384
385 return Err(CompatibilityError::NameMismatch {
386 writer_name: writers_name.name.clone(),
387 reader_name: readers_name.name.clone(),
388 });
389 }
390 SchemaKind::Fixed => {
391 if let Schema::Fixed(FixedSchema {
392 name: w_name,
393 aliases: _,
394 doc: _w_doc,
395 size: w_size,
396 default: _w_default,
397 attributes: _,
398 }) = writers_schema
399 {
400 if let Schema::Fixed(FixedSchema {
401 name: r_name,
402 aliases: _,
403 doc: _r_doc,
404 size: r_size,
405 default: _r_default,
406 attributes: _,
407 }) = readers_schema
408 {
409 return (w_name.name == r_name.name && w_size == r_size)
410 .then_some(())
411 .ok_or(CompatibilityError::FixedMismatch);
412 }
413 }
414 }
415 SchemaKind::Map => {
416 if let Schema::Map(w_m) = writers_schema {
417 if let Schema::Map(r_m) = readers_schema {
418 return SchemaCompatibility::match_schemas(&w_m.types, &r_m.types);
419 }
420 }
421 }
422 SchemaKind::Array => {
423 if let Schema::Array(w_a) = writers_schema {
424 if let Schema::Array(r_a) = readers_schema {
425 return SchemaCompatibility::match_schemas(&w_a.items, &r_a.items);
426 }
427 }
428 }
429 SchemaKind::Uuid => {
430 return check_writer_type(
431 writers_schema,
432 readers_schema,
433 vec![r_type, SchemaKind::String],
434 );
435 }
436 SchemaKind::Date | SchemaKind::TimeMillis => {
437 return check_writer_type(
438 writers_schema,
439 readers_schema,
440 vec![r_type, SchemaKind::Int],
441 );
442 }
443 SchemaKind::TimeMicros
444 | SchemaKind::TimestampNanos
445 | SchemaKind::TimestampMillis
446 | SchemaKind::TimestampMicros
447 | SchemaKind::LocalTimestampMillis
448 | SchemaKind::LocalTimestampMicros
449 | SchemaKind::LocalTimestampNanos => {
450 return check_writer_type(
451 writers_schema,
452 readers_schema,
453 vec![r_type, SchemaKind::Long],
454 );
455 }
456 SchemaKind::Duration => {
457 return Ok(());
458 }
459 SchemaKind::Ref => return match_ref_schemas(writers_schema, readers_schema),
460 _ => {
461 return Err(CompatibilityError::Inconclusive(String::from(
462 "readers_schema",
463 )));
464 }
465 };
466 }
467
468 match w_type {
470 SchemaKind::Int => check_reader_type_multi(
471 r_type,
472 vec![
473 SchemaKind::Long,
474 SchemaKind::Float,
475 SchemaKind::Double,
476 SchemaKind::Date,
477 SchemaKind::TimeMillis,
478 ],
479 w_type,
480 ),
481 SchemaKind::Long => check_reader_type_multi(
482 r_type,
483 vec![
484 SchemaKind::Float,
485 SchemaKind::Double,
486 SchemaKind::TimeMicros,
487 SchemaKind::TimestampMillis,
488 SchemaKind::TimestampMicros,
489 SchemaKind::TimestampNanos,
490 SchemaKind::LocalTimestampMillis,
491 SchemaKind::LocalTimestampMicros,
492 SchemaKind::LocalTimestampNanos,
493 ],
494 w_type,
495 ),
496 SchemaKind::Float => {
497 check_reader_type_multi(r_type, vec![SchemaKind::Float, SchemaKind::Double], w_type)
498 }
499 SchemaKind::String => {
500 check_reader_type_multi(r_type, vec![SchemaKind::Bytes, SchemaKind::Uuid], w_type)
501 }
502 SchemaKind::Bytes => check_reader_type(r_type, SchemaKind::String, w_type),
503 SchemaKind::Uuid => check_reader_type(r_type, SchemaKind::String, w_type),
504 SchemaKind::Date | SchemaKind::TimeMillis => {
505 check_reader_type(r_type, SchemaKind::Int, w_type)
506 }
507 SchemaKind::TimeMicros
508 | SchemaKind::TimestampMicros
509 | SchemaKind::TimestampMillis
510 | SchemaKind::TimestampNanos
511 | SchemaKind::LocalTimestampMillis
512 | SchemaKind::LocalTimestampMicros
513 | SchemaKind::LocalTimestampNanos => {
514 check_reader_type(r_type, SchemaKind::Long, w_type)
515 }
516 _ => Err(CompatibilityError::Inconclusive(String::from(
517 "writers_schema",
518 ))),
519 }
520 }
521}
522
523#[cfg(test)]
524mod tests {
525 use super::*;
526 use crate::{
527 Codec, Reader, Writer,
528 types::{Record, Value},
529 };
530 use apache_avro_test_helper::TestResult;
531 use rstest::*;
532
533 fn int_array_schema() -> Schema {
534 Schema::parse_str(r#"{"type":"array", "items":"int"}"#).unwrap()
535 }
536
537 fn long_array_schema() -> Schema {
538 Schema::parse_str(r#"{"type":"array", "items":"long"}"#).unwrap()
539 }
540
541 fn string_array_schema() -> Schema {
542 Schema::parse_str(r#"{"type":"array", "items":"string"}"#).unwrap()
543 }
544
545 fn int_map_schema() -> Schema {
546 Schema::parse_str(r#"{"type":"map", "values":"int"}"#).unwrap()
547 }
548
549 fn long_map_schema() -> Schema {
550 Schema::parse_str(r#"{"type":"map", "values":"long"}"#).unwrap()
551 }
552
553 fn string_map_schema() -> Schema {
554 Schema::parse_str(r#"{"type":"map", "values":"string"}"#).unwrap()
555 }
556
557 fn enum1_ab_schema() -> Schema {
558 Schema::parse_str(r#"{"type":"enum", "name":"Enum1", "symbols":["A","B"]}"#).unwrap()
559 }
560
561 fn enum1_abc_schema() -> Schema {
562 Schema::parse_str(r#"{"type":"enum", "name":"Enum1", "symbols":["A","B","C"]}"#).unwrap()
563 }
564
565 fn enum1_bc_schema() -> Schema {
566 Schema::parse_str(r#"{"type":"enum", "name":"Enum1", "symbols":["B","C"]}"#).unwrap()
567 }
568
569 fn enum2_ab_schema() -> Schema {
570 Schema::parse_str(r#"{"type":"enum", "name":"Enum2", "symbols":["A","B"]}"#).unwrap()
571 }
572
573 fn empty_record1_schema() -> Schema {
574 Schema::parse_str(r#"{"type":"record", "name":"Record1", "fields":[]}"#).unwrap()
575 }
576
577 fn empty_record2_schema() -> Schema {
578 Schema::parse_str(r#"{"type":"record", "name":"Record2", "fields": []}"#).unwrap()
579 }
580
581 fn a_int_record1_schema() -> Schema {
582 Schema::parse_str(
583 r#"{"type":"record", "name":"Record1", "fields":[{"name":"a", "type":"int"}]}"#,
584 )
585 .unwrap()
586 }
587
588 fn a_long_record1_schema() -> Schema {
589 Schema::parse_str(
590 r#"{"type":"record", "name":"Record1", "fields":[{"name":"a", "type":"long"}]}"#,
591 )
592 .unwrap()
593 }
594
595 fn a_int_b_int_record1_schema() -> Schema {
596 Schema::parse_str(r#"{"type":"record", "name":"Record1", "fields":[{"name":"a", "type":"int"}, {"name":"b", "type":"int"}]}"#).unwrap()
597 }
598
599 fn a_dint_record1_schema() -> Schema {
600 Schema::parse_str(r#"{"type":"record", "name":"Record1", "fields":[{"name":"a", "type":"int", "default":0}]}"#).unwrap()
601 }
602
603 fn a_int_b_dint_record1_schema() -> Schema {
604 Schema::parse_str(r#"{"type":"record", "name":"Record1", "fields":[{"name":"a", "type":"int"}, {"name":"b", "type":"int", "default":0}]}"#).unwrap()
605 }
606
607 fn a_dint_b_dint_record1_schema() -> Schema {
608 Schema::parse_str(r#"{"type":"record", "name":"Record1", "fields":[{"name":"a", "type":"int", "default":0}, {"name":"b", "type":"int", "default":0}]}"#).unwrap()
609 }
610
611 fn nested_record() -> Schema {
612 Schema::parse_str(r#"{"type":"record","name":"parent","fields":[{"name":"attribute","type":{"type":"record","name":"child","fields":[{"name":"id","type":"string"}]}}]}"#).unwrap()
613 }
614
615 fn nested_optional_record() -> Schema {
616 Schema::parse_str(r#"{"type":"record","name":"parent","fields":[{"name":"attribute","type":["null",{"type":"record","name":"child","fields":[{"name":"id","type":"string"}]}],"default":null}]}"#).unwrap()
617 }
618
619 fn int_list_record_schema() -> Schema {
620 Schema::parse_str(r#"{"type":"record", "name":"List", "fields": [{"name": "head", "type": "int"},{"name": "tail", "type": "array", "items": "int"}]}"#).unwrap()
621 }
622
623 fn long_list_record_schema() -> Schema {
624 Schema::parse_str(
625 r#"
626 {
627 "type":"record", "name":"List", "fields": [
628 {"name": "head", "type": "long"},
629 {"name": "tail", "type": "array", "items": "long"}
630 ]}
631"#,
632 )
633 .unwrap()
634 }
635
636 fn union_schema(schemas: Vec<Schema>) -> Schema {
637 let schema_string = schemas
638 .iter()
639 .map(|s| s.canonical_form())
640 .collect::<Vec<String>>()
641 .join(",");
642 Schema::parse_str(&format!("[{schema_string}]")).unwrap()
643 }
644
645 fn empty_union_schema() -> Schema {
646 union_schema(vec![])
647 }
648
649 fn int_union_schema() -> Schema {
653 union_schema(vec![Schema::Int])
654 }
655
656 fn long_union_schema() -> Schema {
657 union_schema(vec![Schema::Long])
658 }
659
660 fn string_union_schema() -> Schema {
661 union_schema(vec![Schema::String])
662 }
663
664 fn int_string_union_schema() -> Schema {
665 union_schema(vec![Schema::Int, Schema::String])
666 }
667
668 fn string_int_union_schema() -> Schema {
669 union_schema(vec![Schema::String, Schema::Int])
670 }
671
672 #[test]
673 fn test_broken() {
674 assert_eq!(
675 CompatibilityError::MissingUnionElements,
676 SchemaCompatibility::can_read(&int_string_union_schema(), &int_union_schema())
677 .unwrap_err()
678 )
679 }
680
681 #[test]
682 fn test_incompatible_reader_writer_pairs() {
683 let incompatible_schemas = vec![
684 (Schema::Null, Schema::Int),
686 (Schema::Null, Schema::Long),
687 (Schema::Boolean, Schema::Int),
689 (Schema::Int, Schema::Null),
691 (Schema::Int, Schema::Boolean),
692 (Schema::Int, Schema::Long),
693 (Schema::Int, Schema::Float),
694 (Schema::Int, Schema::Double),
695 (Schema::Long, Schema::Float),
697 (Schema::Long, Schema::Double),
698 (Schema::Float, Schema::Double),
700 (Schema::String, Schema::Boolean),
702 (Schema::String, Schema::Int),
703 (Schema::Bytes, Schema::Null),
705 (Schema::Bytes, Schema::Int),
706 (Schema::TimeMicros, Schema::Int),
708 (Schema::TimestampMillis, Schema::Int),
709 (Schema::TimestampMicros, Schema::Int),
710 (Schema::TimestampNanos, Schema::Int),
711 (Schema::LocalTimestampMillis, Schema::Int),
712 (Schema::LocalTimestampMicros, Schema::Int),
713 (Schema::LocalTimestampNanos, Schema::Int),
714 (Schema::Date, Schema::Long),
715 (Schema::TimeMillis, Schema::Long),
716 (int_array_schema(), long_array_schema()),
718 (int_map_schema(), int_array_schema()),
719 (int_array_schema(), int_map_schema()),
720 (int_map_schema(), long_map_schema()),
721 (enum1_ab_schema(), enum1_abc_schema()),
723 (enum1_bc_schema(), enum1_abc_schema()),
724 (enum1_ab_schema(), enum2_ab_schema()),
725 (Schema::Int, enum2_ab_schema()),
726 (enum2_ab_schema(), Schema::Int),
727 (int_union_schema(), int_string_union_schema()),
729 (string_union_schema(), int_string_union_schema()),
730 (empty_record2_schema(), empty_record1_schema()),
732 (a_int_record1_schema(), empty_record1_schema()),
733 (a_int_b_dint_record1_schema(), empty_record1_schema()),
734 (int_list_record_schema(), long_list_record_schema()),
735 (nested_record(), nested_optional_record()),
736 ];
737
738 assert!(
739 incompatible_schemas
740 .iter()
741 .any(|(reader, writer)| SchemaCompatibility::can_read(writer, reader).is_err())
742 );
743 }
744
745 #[rstest]
746 #[case(
748 r#"{"type": "record", "name": "record_a", "fields": [{"type": "long", "name": "date"}]}"#,
749 r#"{"type": "record", "name": "record_a", "fields": [{"type": "long", "name": "date", "default": 18181}]}"#
750 )]
751 #[case(
753 r#"{"type": "fixed", "name": "EmployeeId", "size": 16}"#,
754 r#"{"type": "fixed", "name": "EmployeeId", "size": 16, "default": "u00ffffffffffffx"}"#
755 )]
756 #[case(
758 r#"{"type": "enum", "name":"Enum1", "symbols": ["A","B"]}"#,
759 r#"{"type": "enum", "name":"Enum1", "symbols": ["A","B", "C"], "default": "C"}"#
760 )]
761 #[case(
763 r#"{"type": "map", "values": "int"}"#,
764 r#"{"type": "map", "values": "long"}"#
765 )]
766 #[case(r#"{"type": "int"}"#, r#"{"type": "int", "logicalType": "date"}"#)]
768 #[case(
770 r#"{"type": "int"}"#,
771 r#"{"type": "int", "logicalType": "time-millis"}"#
772 )]
773 #[case(
775 r#"{"type": "long"}"#,
776 r#"{"type": "long", "logicalType": "time-micros"}"#
777 )]
778 #[case(
780 r#"{"type": "long"}"#,
781 r#"{"type": "long", "logicalType": "timestamp-nanos"}"#
782 )]
783 #[case(
785 r#"{"type": "long"}"#,
786 r#"{"type": "long", "logicalType": "timestamp-millis"}"#
787 )]
788 #[case(
790 r#"{"type": "long"}"#,
791 r#"{"type": "long", "logicalType": "timestamp-micros"}"#
792 )]
793 #[case(
795 r#"{"type": "long"}"#,
796 r#"{"type": "long", "logicalType": "local-timestamp-millis"}"#
797 )]
798 #[case(
800 r#"{"type": "long"}"#,
801 r#"{"type": "long", "logicalType": "local-timestamp-micros"}"#
802 )]
803 #[case(
805 r#"{"type": "long"}"#,
806 r#"{"type": "long", "logicalType": "local-timestamp-nanos"}"#
807 )]
808 #[case(
810 r#"{"type": "array", "items": "int"}"#,
811 r#"{"type": "array", "items": "long"}"#
812 )]
813 fn test_avro_3950_match_schemas_ok(
814 #[case] writer_schema_str: &str,
815 #[case] reader_schema_str: &str,
816 ) {
817 let writer_schema = Schema::parse_str(writer_schema_str).unwrap();
818 let reader_schema = Schema::parse_str(reader_schema_str).unwrap();
819
820 assert!(SchemaCompatibility::match_schemas(&writer_schema, &reader_schema).is_ok());
821 }
822
823 #[rstest]
824 #[case(
826 r#"{"type": "record", "name":"record_a", "fields": [{"type": "long", "name": "date"}]}"#,
827 r#"{"type": "record", "name":"record_b", "fields": [{"type": "long", "name": "date"}]}"#,
828 CompatibilityError::NameMismatch{writer_name: String::from("record_a"), reader_name: String::from("record_b")}
829 )]
830 #[case(
832 r#"{"type": "fixed", "name": "EmployeeId", "size": 16}"#,
833 r#"{"type": "fixed", "name": "EmployeeId", "size": 20}"#,
834 CompatibilityError::FixedMismatch
835 )]
836 #[case(
838 r#"{"type": "enum", "name": "Enum1", "symbols": ["A","B"]}"#,
839 r#"{"type": "enum", "name": "Enum2", "symbols": ["A","B"]}"#,
840 CompatibilityError::NameMismatch{writer_name: String::from("Enum1"), reader_name: String::from("Enum2")}
841 )]
842 #[case(
844 r#"{"type":"map", "values": "long"}"#,
845 r#"{"type":"map", "values": "int"}"#,
846 CompatibilityError::TypeExpected {schema_type: String::from("readers_schema"), expected_type: vec![
847 SchemaKind::Long,
848 SchemaKind::Float,
849 SchemaKind::Double,
850 SchemaKind::TimeMicros,
851 SchemaKind::TimestampMillis,
852 SchemaKind::TimestampMicros,
853 SchemaKind::TimestampNanos,
854 SchemaKind::LocalTimestampMillis,
855 SchemaKind::LocalTimestampMicros,
856 SchemaKind::LocalTimestampNanos,
857 ]}
858 )]
859 #[case(
861 r#"{"type": "array", "items": "long"}"#,
862 r#"{"type": "array", "items": "int"}"#,
863 CompatibilityError::TypeExpected {schema_type: String::from("readers_schema"), expected_type: vec![
864 SchemaKind::Long,
865 SchemaKind::Float,
866 SchemaKind::Double,
867 SchemaKind::TimeMicros,
868 SchemaKind::TimestampMillis,
869 SchemaKind::TimestampMicros,
870 SchemaKind::TimestampNanos,
871 SchemaKind::LocalTimestampMillis,
872 SchemaKind::LocalTimestampMicros,
873 SchemaKind::LocalTimestampNanos,
874 ]}
875 )]
876 #[case(
878 r#"{"type": "string"}"#,
879 r#"{"type": "int", "logicalType": "date"}"#,
880 CompatibilityError::TypeExpected{schema_type: String::from("readers_schema"), expected_type: vec![
881 SchemaKind::String,
882 SchemaKind::Bytes,
883 SchemaKind::Uuid,
884 ]}
885 )]
886 #[case(
888 r#"{"type": "string"}"#,
889 r#"{"type": "int", "logicalType": "time-millis"}"#,
890 CompatibilityError::TypeExpected{schema_type: String::from("readers_schema"), expected_type: vec![
891 SchemaKind::String,
892 SchemaKind::Bytes,
893 SchemaKind::Uuid,
894 ]}
895 )]
896 #[case(
898 r#"{"type": "int"}"#,
899 r#"{"type": "long", "logicalType": "time-micros"}"#,
900 CompatibilityError::TypeExpected{schema_type: String::from("readers_schema"), expected_type: vec![
901 SchemaKind::Int,
902 SchemaKind::Long,
903 SchemaKind::Float,
904 SchemaKind::Double,
905 SchemaKind::Date,
906 SchemaKind::TimeMillis
907 ]}
908 )]
909 #[case(
924 r#"{"type": "int"}"#,
925 r#"{"type": "long", "logicalType": "timestamp-millis"}"#,
926 CompatibilityError::TypeExpected{schema_type: String::from("readers_schema"), expected_type: vec![
927 SchemaKind::Int,
928 SchemaKind::Long,
929 SchemaKind::Float,
930 SchemaKind::Double,
931 SchemaKind::Date,
932 SchemaKind::TimeMillis
933 ]}
934 )]
935 #[case(
937 r#"{"type": "int"}"#,
938 r#"{"type": "long", "logicalType": "timestamp-micros"}"#,
939 CompatibilityError::TypeExpected{schema_type: String::from("readers_schema"), expected_type: vec![
940 SchemaKind::Int,
941 SchemaKind::Long,
942 SchemaKind::Float,
943 SchemaKind::Double,
944 SchemaKind::Date,
945 SchemaKind::TimeMillis
946 ]}
947 )]
948 #[case(
950 r#"{"type": "int"}"#,
951 r#"{"type": "long", "logicalType": "local-timestamp-millis"}"#,
952 CompatibilityError::TypeExpected{schema_type: String::from("readers_schema"), expected_type: vec![
953 SchemaKind::Int,
954 SchemaKind::Long,
955 SchemaKind::Float,
956 SchemaKind::Double,
957 SchemaKind::Date,
958 SchemaKind::TimeMillis
959 ]}
960 )]
961 #[case(
963 r#"{"type": "int"}"#,
964 r#"{"type": "long", "logicalType": "local-timestamp-micros"}"#,
965 CompatibilityError::TypeExpected{schema_type: String::from("readers_schema"), expected_type: vec![
966 SchemaKind::Int,
967 SchemaKind::Long,
968 SchemaKind::Float,
969 SchemaKind::Double,
970 SchemaKind::Date,
971 SchemaKind::TimeMillis
972 ]}
973 )]
974 #[case(
989 r#"{"type": "record", "name":"record_b", "fields": [{"type": "long", "name": "date"}]}"#,
990 r#"{"type": "fixed", "name": "EmployeeId", "size": 16}"#,
991 CompatibilityError::Inconclusive(String::from("writers_schema"))
992 )]
993 fn test_avro_3950_match_schemas_error(
994 #[case] writer_schema_str: &str,
995 #[case] reader_schema_str: &str,
996 #[case] expected_error: CompatibilityError,
997 ) {
998 let writer_schema = Schema::parse_str(writer_schema_str).unwrap();
999 let reader_schema = Schema::parse_str(reader_schema_str).unwrap();
1000
1001 assert_eq!(
1002 expected_error,
1003 SchemaCompatibility::match_schemas(&writer_schema, &reader_schema).unwrap_err()
1004 )
1005 }
1006
1007 #[test]
1008 fn test_compatible_reader_writer_pairs() {
1009 let compatible_schemas = vec![
1010 (Schema::Null, Schema::Null),
1011 (Schema::Long, Schema::Int),
1012 (Schema::Float, Schema::Int),
1013 (Schema::Float, Schema::Long),
1014 (Schema::Double, Schema::Long),
1015 (Schema::Double, Schema::Int),
1016 (Schema::Double, Schema::Float),
1017 (Schema::String, Schema::Bytes),
1018 (Schema::Bytes, Schema::String),
1019 (Schema::Uuid, Schema::Uuid),
1021 (Schema::Uuid, Schema::String),
1022 (Schema::Date, Schema::Int),
1023 (Schema::TimeMillis, Schema::Int),
1024 (Schema::TimeMicros, Schema::Long),
1025 (Schema::TimestampMillis, Schema::Long),
1026 (Schema::TimestampMicros, Schema::Long),
1027 (Schema::TimestampNanos, Schema::Long),
1028 (Schema::LocalTimestampMillis, Schema::Long),
1029 (Schema::LocalTimestampMicros, Schema::Long),
1030 (Schema::LocalTimestampNanos, Schema::Long),
1031 (Schema::String, Schema::Uuid),
1032 (Schema::Int, Schema::Date),
1033 (Schema::Int, Schema::TimeMillis),
1034 (Schema::Long, Schema::TimeMicros),
1035 (Schema::Long, Schema::TimestampMillis),
1036 (Schema::Long, Schema::TimestampMicros),
1037 (Schema::Long, Schema::TimestampNanos),
1038 (Schema::Long, Schema::LocalTimestampMillis),
1039 (Schema::Long, Schema::LocalTimestampMicros),
1040 (Schema::Long, Schema::LocalTimestampNanos),
1041 (int_array_schema(), int_array_schema()),
1042 (long_array_schema(), int_array_schema()),
1043 (int_map_schema(), int_map_schema()),
1044 (long_map_schema(), int_map_schema()),
1045 (enum1_ab_schema(), enum1_ab_schema()),
1046 (enum1_abc_schema(), enum1_ab_schema()),
1047 (empty_union_schema(), empty_union_schema()),
1048 (int_union_schema(), int_union_schema()),
1049 (int_string_union_schema(), string_int_union_schema()),
1050 (int_union_schema(), empty_union_schema()),
1051 (long_union_schema(), int_union_schema()),
1052 (int_union_schema(), Schema::Int),
1053 (Schema::Int, int_union_schema()),
1054 (empty_record1_schema(), empty_record1_schema()),
1055 (empty_record1_schema(), a_int_record1_schema()),
1056 (a_int_record1_schema(), a_int_record1_schema()),
1057 (a_dint_record1_schema(), a_int_record1_schema()),
1058 (a_dint_record1_schema(), a_dint_record1_schema()),
1059 (a_int_record1_schema(), a_dint_record1_schema()),
1060 (a_long_record1_schema(), a_int_record1_schema()),
1061 (a_int_record1_schema(), a_int_b_int_record1_schema()),
1062 (a_dint_record1_schema(), a_int_b_int_record1_schema()),
1063 (a_int_b_dint_record1_schema(), a_int_record1_schema()),
1064 (a_dint_b_dint_record1_schema(), empty_record1_schema()),
1065 (a_dint_b_dint_record1_schema(), a_int_record1_schema()),
1066 (a_int_b_int_record1_schema(), a_dint_b_dint_record1_schema()),
1067 (int_list_record_schema(), int_list_record_schema()),
1068 (long_list_record_schema(), long_list_record_schema()),
1069 (long_list_record_schema(), int_list_record_schema()),
1070 (nested_optional_record(), nested_record()),
1071 ];
1072
1073 assert!(
1074 compatible_schemas
1075 .iter()
1076 .all(|(reader, writer)| SchemaCompatibility::can_read(writer, reader).is_ok())
1077 );
1078 }
1079
1080 fn writer_schema() -> Schema {
1081 Schema::parse_str(
1082 r#"
1083 {"type":"record", "name":"Record", "fields":[
1084 {"name":"oldfield1", "type":"int"},
1085 {"name":"oldfield2", "type":"string"}
1086 ]}
1087"#,
1088 )
1089 .unwrap()
1090 }
1091
1092 #[test]
1093 fn test_missing_field() -> TestResult {
1094 let reader_schema = Schema::parse_str(
1095 r#"
1096 {"type":"record", "name":"Record", "fields":[
1097 {"name":"oldfield1", "type":"int"}
1098 ]}
1099"#,
1100 )?;
1101 assert!(SchemaCompatibility::can_read(&writer_schema(), &reader_schema,).is_ok());
1102 assert_eq!(
1103 CompatibilityError::MissingDefaultValue(String::from("oldfield2")),
1104 SchemaCompatibility::can_read(&reader_schema, &writer_schema()).unwrap_err()
1105 );
1106
1107 Ok(())
1108 }
1109
1110 #[test]
1111 fn test_missing_second_field() -> TestResult {
1112 let reader_schema = Schema::parse_str(
1113 r#"
1114 {"type":"record", "name":"Record", "fields":[
1115 {"name":"oldfield2", "type":"string"}
1116 ]}
1117"#,
1118 )?;
1119 assert!(SchemaCompatibility::can_read(&writer_schema(), &reader_schema).is_ok());
1120 assert_eq!(
1121 CompatibilityError::MissingDefaultValue(String::from("oldfield1")),
1122 SchemaCompatibility::can_read(&reader_schema, &writer_schema()).unwrap_err()
1123 );
1124
1125 Ok(())
1126 }
1127
1128 #[test]
1129 fn test_all_fields() -> TestResult {
1130 let reader_schema = Schema::parse_str(
1131 r#"
1132 {"type":"record", "name":"Record", "fields":[
1133 {"name":"oldfield1", "type":"int"},
1134 {"name":"oldfield2", "type":"string"}
1135 ]}
1136"#,
1137 )?;
1138 assert!(SchemaCompatibility::can_read(&writer_schema(), &reader_schema).is_ok());
1139 assert!(SchemaCompatibility::can_read(&reader_schema, &writer_schema()).is_ok());
1140
1141 Ok(())
1142 }
1143
1144 #[test]
1145 fn test_new_field_with_default() -> TestResult {
1146 let reader_schema = Schema::parse_str(
1147 r#"
1148 {"type":"record", "name":"Record", "fields":[
1149 {"name":"oldfield1", "type":"int"},
1150 {"name":"newfield1", "type":"int", "default":42}
1151 ]}
1152"#,
1153 )?;
1154 assert!(SchemaCompatibility::can_read(&writer_schema(), &reader_schema).is_ok());
1155 assert_eq!(
1156 CompatibilityError::MissingDefaultValue(String::from("oldfield2")),
1157 SchemaCompatibility::can_read(&reader_schema, &writer_schema()).unwrap_err()
1158 );
1159
1160 Ok(())
1161 }
1162
1163 #[test]
1164 fn test_new_field() -> TestResult {
1165 let reader_schema = Schema::parse_str(
1166 r#"
1167 {"type":"record", "name":"Record", "fields":[
1168 {"name":"oldfield1", "type":"int"},
1169 {"name":"newfield1", "type":"int"}
1170 ]}
1171"#,
1172 )?;
1173 assert_eq!(
1174 CompatibilityError::MissingDefaultValue(String::from("newfield1")),
1175 SchemaCompatibility::can_read(&writer_schema(), &reader_schema).unwrap_err()
1176 );
1177 assert_eq!(
1178 CompatibilityError::MissingDefaultValue(String::from("oldfield2")),
1179 SchemaCompatibility::can_read(&reader_schema, &writer_schema()).unwrap_err()
1180 );
1181
1182 Ok(())
1183 }
1184
1185 #[test]
1186 fn test_array_writer_schema() {
1187 let valid_reader = string_array_schema();
1188 let invalid_reader = string_map_schema();
1189
1190 assert!(SchemaCompatibility::can_read(&string_array_schema(), &valid_reader).is_ok());
1191 assert_eq!(
1192 CompatibilityError::Inconclusive(String::from("writers_schema")),
1193 SchemaCompatibility::can_read(&string_array_schema(), &invalid_reader).unwrap_err()
1194 );
1195 }
1196
1197 #[test]
1198 fn test_primitive_writer_schema() {
1199 let valid_reader = Schema::String;
1200 assert!(SchemaCompatibility::can_read(&Schema::String, &valid_reader).is_ok());
1201 assert_eq!(
1202 CompatibilityError::TypeExpected {
1203 schema_type: String::from("readers_schema"),
1204 expected_type: vec![
1205 SchemaKind::Int,
1206 SchemaKind::Long,
1207 SchemaKind::Float,
1208 SchemaKind::Double,
1209 SchemaKind::Date,
1210 SchemaKind::TimeMillis
1211 ],
1212 },
1213 SchemaCompatibility::can_read(&Schema::Int, &Schema::String).unwrap_err()
1214 );
1215 }
1216
1217 #[test]
1218 fn test_union_reader_writer_subset_incompatibility() {
1219 let union_writer = union_schema(vec![Schema::Int, Schema::String]);
1221 let union_reader = union_schema(vec![Schema::String]);
1222
1223 assert_eq!(
1224 CompatibilityError::MissingUnionElements,
1225 SchemaCompatibility::can_read(&union_writer, &union_reader).unwrap_err()
1226 );
1227 assert!(SchemaCompatibility::can_read(&union_reader, &union_writer).is_ok());
1228 }
1229
1230 #[test]
1231 fn test_incompatible_record_field() -> TestResult {
1232 let string_schema = Schema::parse_str(
1233 r#"
1234 {"type":"record", "name":"MyRecord", "namespace":"ns", "fields": [
1235 {"name":"field1", "type":"string"}
1236 ]}
1237 "#,
1238 )?;
1239
1240 let int_schema = Schema::parse_str(
1241 r#"
1242 {"type":"record", "name":"MyRecord", "namespace":"ns", "fields": [
1243 {"name":"field1", "type":"int"}
1244 ]}
1245 "#,
1246 )?;
1247
1248 assert_eq!(
1249 CompatibilityError::FieldTypeMismatch(
1250 "field1".to_owned(),
1251 Box::new(CompatibilityError::TypeExpected {
1252 schema_type: "readers_schema".to_owned(),
1253 expected_type: vec![SchemaKind::String, SchemaKind::Bytes, SchemaKind::Uuid]
1254 })
1255 ),
1256 SchemaCompatibility::can_read(&string_schema, &int_schema).unwrap_err()
1257 );
1258
1259 Ok(())
1260 }
1261
1262 #[test]
1263 fn test_enum_symbols() -> TestResult {
1264 let enum_schema1 = Schema::parse_str(
1265 r#"
1266 {"type":"enum", "name":"MyEnum", "symbols":["A","B"]}
1267"#,
1268 )?;
1269 let enum_schema2 =
1270 Schema::parse_str(r#"{"type":"enum", "name":"MyEnum", "symbols":["A","B","C"]}"#)?;
1271 assert_eq!(
1272 CompatibilityError::MissingSymbols,
1273 SchemaCompatibility::can_read(&enum_schema2, &enum_schema1).unwrap_err()
1274 );
1275 assert!(SchemaCompatibility::can_read(&enum_schema1, &enum_schema2).is_ok());
1276
1277 Ok(())
1278 }
1279
1280 fn point_2d_schema() -> Schema {
1281 Schema::parse_str(
1282 r#"
1283 {"type":"record", "name":"Point2D", "fields":[
1284 {"name":"x", "type":"double"},
1285 {"name":"y", "type":"double"}
1286 ]}
1287 "#,
1288 )
1289 .unwrap()
1290 }
1291
1292 fn point_2d_fullname_schema() -> Schema {
1293 Schema::parse_str(
1294 r#"
1295 {"type":"record", "name":"Point", "namespace":"written", "fields":[
1296 {"name":"x", "type":"double"},
1297 {"name":"y", "type":"double"}
1298 ]}
1299 "#,
1300 )
1301 .unwrap()
1302 }
1303
1304 fn point_3d_no_default_schema() -> Schema {
1305 Schema::parse_str(
1306 r#"
1307 {"type":"record", "name":"Point", "fields":[
1308 {"name":"x", "type":"double"},
1309 {"name":"y", "type":"double"},
1310 {"name":"z", "type":"double"}
1311 ]}
1312 "#,
1313 )
1314 .unwrap()
1315 }
1316
1317 fn point_3d_schema() -> Schema {
1318 Schema::parse_str(
1319 r#"
1320 {"type":"record", "name":"Point3D", "fields":[
1321 {"name":"x", "type":"double"},
1322 {"name":"y", "type":"double"},
1323 {"name":"z", "type":"double", "default": 0.0}
1324 ]}
1325 "#,
1326 )
1327 .unwrap()
1328 }
1329
1330 fn point_3d_match_name_schema() -> Schema {
1331 Schema::parse_str(
1332 r#"
1333 {"type":"record", "name":"Point", "fields":[
1334 {"name":"x", "type":"double"},
1335 {"name":"y", "type":"double"},
1336 {"name":"z", "type":"double", "default": 0.0}
1337 ]}
1338 "#,
1339 )
1340 .unwrap()
1341 }
1342
1343 #[test]
1344 fn test_union_resolution_no_structure_match() {
1345 let read_schema = union_schema(vec![Schema::Null, point_3d_no_default_schema()]);
1347 assert_eq!(
1348 CompatibilityError::MissingUnionElements,
1349 SchemaCompatibility::can_read(&point_2d_fullname_schema(), &read_schema).unwrap_err()
1350 );
1351 }
1352
1353 #[test]
1354 fn test_union_resolution_first_structure_match_2d() {
1355 let read_schema = union_schema(vec![
1357 Schema::Null,
1358 point_3d_no_default_schema(),
1359 point_2d_schema(),
1360 point_3d_schema(),
1361 ]);
1362 assert_eq!(
1363 CompatibilityError::MissingUnionElements,
1364 SchemaCompatibility::can_read(&point_2d_fullname_schema(), &read_schema).unwrap_err()
1365 );
1366 }
1367
1368 #[test]
1369 fn test_union_resolution_first_structure_match_3d() {
1370 let read_schema = union_schema(vec![
1372 Schema::Null,
1373 point_3d_no_default_schema(),
1374 point_3d_schema(),
1375 point_2d_schema(),
1376 ]);
1377 assert_eq!(
1378 CompatibilityError::MissingUnionElements,
1379 SchemaCompatibility::can_read(&point_2d_fullname_schema(), &read_schema).unwrap_err()
1380 );
1381 }
1382
1383 #[test]
1384 fn test_union_resolution_named_structure_match() {
1385 let read_schema = union_schema(vec![
1387 Schema::Null,
1388 point_2d_schema(),
1389 point_3d_match_name_schema(),
1390 point_3d_schema(),
1391 ]);
1392 assert_eq!(
1393 CompatibilityError::MissingUnionElements,
1394 SchemaCompatibility::can_read(&point_2d_fullname_schema(), &read_schema).unwrap_err()
1395 );
1396 }
1397
1398 #[test]
1399 fn test_union_resolution_full_name_match() {
1400 let read_schema = union_schema(vec![
1402 Schema::Null,
1403 point_2d_schema(),
1404 point_3d_match_name_schema(),
1405 point_3d_schema(),
1406 point_2d_fullname_schema(),
1407 ]);
1408 assert!(SchemaCompatibility::can_read(&point_2d_fullname_schema(), &read_schema).is_ok());
1409 }
1410
1411 #[test]
1412 fn test_avro_3772_enum_default() -> TestResult {
1413 let writer_raw_schema = r#"
1414 {
1415 "type": "record",
1416 "name": "test",
1417 "fields": [
1418 {"name": "a", "type": "long", "default": 42},
1419 {"name": "b", "type": "string"},
1420 {
1421 "name": "c",
1422 "type": {
1423 "type": "enum",
1424 "name": "suit",
1425 "symbols": ["diamonds", "spades", "clubs", "hearts"],
1426 "default": "spades"
1427 }
1428 }
1429 ]
1430 }
1431 "#;
1432
1433 let reader_raw_schema = r#"
1434 {
1435 "type": "record",
1436 "name": "test",
1437 "fields": [
1438 {"name": "a", "type": "long", "default": 42},
1439 {"name": "b", "type": "string"},
1440 {
1441 "name": "c",
1442 "type": {
1443 "type": "enum",
1444 "name": "suit",
1445 "symbols": ["diamonds", "spades", "ninja", "hearts"],
1446 "default": "spades"
1447 }
1448 }
1449 ]
1450 }
1451 "#;
1452 let writer_schema = Schema::parse_str(writer_raw_schema)?;
1453 let reader_schema = Schema::parse_str(reader_raw_schema)?;
1454 let mut writer = Writer::with_codec(&writer_schema, Vec::new(), Codec::Null);
1455 let mut record = Record::new(writer.schema()).unwrap();
1456 record.put("a", 27i64);
1457 record.put("b", "foo");
1458 record.put("c", "clubs");
1459 writer.append(record).unwrap();
1460 let input = writer.into_inner()?;
1461 let mut reader = Reader::with_schema(&reader_schema, &input[..])?;
1462 assert_eq!(
1463 reader.next().unwrap().unwrap(),
1464 Value::Record(vec![
1465 ("a".to_string(), Value::Long(27)),
1466 ("b".to_string(), Value::String("foo".to_string())),
1467 ("c".to_string(), Value::Enum(1, "spades".to_string())),
1468 ])
1469 );
1470 assert!(reader.next().is_none());
1471
1472 Ok(())
1473 }
1474
1475 #[test]
1476 fn test_avro_3772_enum_default_less_symbols() -> TestResult {
1477 let writer_raw_schema = r#"
1478 {
1479 "type": "record",
1480 "name": "test",
1481 "fields": [
1482 {"name": "a", "type": "long", "default": 42},
1483 {"name": "b", "type": "string"},
1484 {
1485 "name": "c",
1486 "type": {
1487 "type": "enum",
1488 "name": "suit",
1489 "symbols": ["diamonds", "spades", "clubs", "hearts"],
1490 "default": "spades"
1491 }
1492 }
1493 ]
1494 }
1495 "#;
1496
1497 let reader_raw_schema = r#"
1498 {
1499 "type": "record",
1500 "name": "test",
1501 "fields": [
1502 {"name": "a", "type": "long", "default": 42},
1503 {"name": "b", "type": "string"},
1504 {
1505 "name": "c",
1506 "type": {
1507 "type": "enum",
1508 "name": "suit",
1509 "symbols": ["hearts", "spades"],
1510 "default": "spades"
1511 }
1512 }
1513 ]
1514 }
1515 "#;
1516 let writer_schema = Schema::parse_str(writer_raw_schema)?;
1517 let reader_schema = Schema::parse_str(reader_raw_schema)?;
1518 let mut writer = Writer::with_codec(&writer_schema, Vec::new(), Codec::Null);
1519 let mut record = Record::new(writer.schema()).unwrap();
1520 record.put("a", 27i64);
1521 record.put("b", "foo");
1522 record.put("c", "hearts");
1523 writer.append(record).unwrap();
1524 let input = writer.into_inner()?;
1525 let mut reader = Reader::with_schema(&reader_schema, &input[..])?;
1526 assert_eq!(
1527 reader.next().unwrap().unwrap(),
1528 Value::Record(vec![
1529 ("a".to_string(), Value::Long(27)),
1530 ("b".to_string(), Value::String("foo".to_string())),
1531 ("c".to_string(), Value::Enum(0, "hearts".to_string())),
1532 ])
1533 );
1534 assert!(reader.next().is_none());
1535
1536 Ok(())
1537 }
1538
1539 #[test]
1540 fn avro_3894_take_aliases_into_account_when_serializing_for_schema_compatibility() -> TestResult
1541 {
1542 let schema_v1 = Schema::parse_str(
1543 r#"
1544 {
1545 "type": "record",
1546 "name": "Conference",
1547 "namespace": "advdaba",
1548 "fields": [
1549 {"type": "string", "name": "name"},
1550 {"type": "long", "name": "date"}
1551 ]
1552 }"#,
1553 )?;
1554
1555 let schema_v2 = Schema::parse_str(
1556 r#"
1557 {
1558 "type": "record",
1559 "name": "Conference",
1560 "namespace": "advdaba",
1561 "fields": [
1562 {"type": "string", "name": "name"},
1563 {"type": "long", "name": "date", "aliases" : [ "time" ]}
1564 ]
1565 }"#,
1566 )?;
1567
1568 assert!(SchemaCompatibility::mutual_read(&schema_v1, &schema_v2).is_ok());
1569
1570 Ok(())
1571 }
1572
1573 #[test]
1574 fn avro_3917_take_aliases_into_account_for_schema_compatibility() -> TestResult {
1575 let schema_v1 = Schema::parse_str(
1576 r#"
1577 {
1578 "type": "record",
1579 "name": "Conference",
1580 "namespace": "advdaba",
1581 "fields": [
1582 {"type": "string", "name": "name"},
1583 {"type": "long", "name": "date", "aliases" : [ "time" ]}
1584 ]
1585 }"#,
1586 )?;
1587
1588 let schema_v2 = Schema::parse_str(
1589 r#"
1590 {
1591 "type": "record",
1592 "name": "Conference",
1593 "namespace": "advdaba",
1594 "fields": [
1595 {"type": "string", "name": "name"},
1596 {"type": "long", "name": "time"}
1597 ]
1598 }"#,
1599 )?;
1600
1601 assert!(SchemaCompatibility::can_read(&schema_v2, &schema_v1).is_ok());
1602 assert_eq!(
1603 CompatibilityError::MissingDefaultValue(String::from("time")),
1604 SchemaCompatibility::can_read(&schema_v1, &schema_v2).unwrap_err()
1605 );
1606
1607 Ok(())
1608 }
1609
1610 #[test]
1611 fn test_avro_3898_record_schemas_match_by_unqualified_name() -> TestResult {
1612 let schemas = [
1613 (
1615 Schema::parse_str(
1616 r#"{
1617 "type": "record",
1618 "name": "Statistics",
1619 "fields": [
1620 { "name": "success", "type": "int" },
1621 { "name": "fail", "type": "int" },
1622 { "name": "time", "type": "string" },
1623 { "name": "max", "type": "int", "default": 0 }
1624 ]
1625 }"#,
1626 )?,
1627 Schema::parse_str(
1628 r#"{
1629 "type": "record",
1630 "name": "Statistics",
1631 "namespace": "my.namespace",
1632 "fields": [
1633 { "name": "success", "type": "int" },
1634 { "name": "fail", "type": "int" },
1635 { "name": "time", "type": "string" },
1636 { "name": "average", "type": "int", "default": 0}
1637 ]
1638 }"#,
1639 )?,
1640 ),
1641 (
1643 Schema::parse_str(
1644 r#"{
1645 "type": "enum",
1646 "name": "Suit",
1647 "symbols": ["diamonds", "spades", "clubs"]
1648 }"#,
1649 )?,
1650 Schema::parse_str(
1651 r#"{
1652 "type": "enum",
1653 "name": "Suit",
1654 "namespace": "my.namespace",
1655 "symbols": ["diamonds", "spades", "clubs", "hearts"]
1656 }"#,
1657 )?,
1658 ),
1659 (
1661 Schema::parse_str(
1662 r#"{
1663 "type": "fixed",
1664 "name": "EmployeeId",
1665 "size": 16
1666 }"#,
1667 )?,
1668 Schema::parse_str(
1669 r#"{
1670 "type": "fixed",
1671 "name": "EmployeeId",
1672 "namespace": "my.namespace",
1673 "size": 16
1674 }"#,
1675 )?,
1676 ),
1677 ];
1678
1679 for (schema_1, schema_2) in schemas {
1680 assert!(SchemaCompatibility::can_read(&schema_1, &schema_2).is_ok());
1681 }
1682
1683 Ok(())
1684 }
1685
1686 #[test]
1687 fn test_can_read_compatibility_errors() -> TestResult {
1688 let schemas = [
1689 (
1690 Schema::parse_str(
1691 r#"{
1692 "type": "record",
1693 "name": "StatisticsMap",
1694 "fields": [
1695 {"name": "average", "type": "int", "default": 0},
1696 {"name": "success", "type": {"type": "map", "values": "int"}}
1697 ]
1698 }"#,
1699 )?,
1700 Schema::parse_str(
1701 r#"{
1702 "type": "record",
1703 "name": "StatisticsMap",
1704 "fields": [
1705 {"name": "average", "type": "int", "default": 0},
1706 {"name": "success", "type": ["null", {"type": "map", "values": "int"}], "default": null}
1707 ]
1708 }"#,
1709 )?,
1710 "Incompatible schemata! Field 'success' in reader schema does not match the type in the writer schema",
1711 ),
1712 (
1713 Schema::parse_str(
1714 r#"{
1715 "type": "record",
1716 "name": "StatisticsArray",
1717 "fields": [
1718 {"name": "max_values", "type": {"type": "array", "items": "int"}}
1719 ]
1720 }"#,
1721 )?,
1722 Schema::parse_str(
1723 r#"{
1724 "type": "record",
1725 "name": "StatisticsArray",
1726 "fields": [
1727 {"name": "max_values", "type": ["null", {"type": "array", "items": "int"}], "default": null}
1728 ]
1729 }"#,
1730 )?,
1731 "Incompatible schemata! Field 'max_values' in reader schema does not match the type in the writer schema",
1732 ),
1733 ];
1734
1735 for (schema_1, schema_2, error) in schemas {
1736 assert!(SchemaCompatibility::can_read(&schema_1, &schema_2).is_ok());
1737 assert_eq!(
1738 error,
1739 SchemaCompatibility::can_read(&schema_2, &schema_1)
1740 .unwrap_err()
1741 .to_string()
1742 );
1743 }
1744
1745 Ok(())
1746 }
1747
1748 #[test]
1749 fn avro_3974_can_read_schema_references() -> TestResult {
1750 let schema_strs = vec![
1751 r#"{
1752 "type": "record",
1753 "name": "Child",
1754 "namespace": "avro",
1755 "fields": [
1756 {
1757 "name": "val",
1758 "type": "int"
1759 }
1760 ]
1761 }
1762 "#,
1763 r#"{
1764 "type": "record",
1765 "name": "Parent",
1766 "namespace": "avro",
1767 "fields": [
1768 {
1769 "name": "child",
1770 "type": "avro.Child"
1771 }
1772 ]
1773 }
1774 "#,
1775 ];
1776
1777 let schemas = Schema::parse_list(schema_strs).unwrap();
1778 SchemaCompatibility::can_read(&schemas[1], &schemas[1])?;
1779
1780 Ok(())
1781 }
1782}