1use crate::{
20 error::CompatibilityError,
21 schema::{EnumSchema, FixedSchema, RecordSchema, Schema, SchemaKind},
22};
23use std::{
24 collections::{hash_map::DefaultHasher, HashSet},
25 hash::Hasher,
26 ptr,
27};
28
29fn match_ref_schemas(
30 writers_schema: &Schema,
31 readers_schema: &Schema,
32) -> Result<(), CompatibilityError> {
33 match (readers_schema, writers_schema) {
34 (Schema::Ref { name: r_name }, Schema::Ref { name: w_name }) => {
35 if r_name == w_name {
36 Ok(())
37 } else {
38 Err(CompatibilityError::NameMismatch {
39 writer_name: w_name.fullname(None),
40 reader_name: r_name.fullname(None),
41 })
42 }
43 }
44 _ => Err(CompatibilityError::WrongType {
45 writer_schema_type: format!("{:#?}", writers_schema),
46 reader_schema_type: format!("{:#?}", readers_schema),
47 }),
48 }
49}
50
51pub struct SchemaCompatibility;
52
53struct Checker {
54 recursion: HashSet<(u64, u64)>,
55}
56
57impl Checker {
58 pub(crate) fn new() -> Self {
60 Self {
61 recursion: HashSet::new(),
62 }
63 }
64
65 pub(crate) fn can_read(
66 &mut self,
67 writers_schema: &Schema,
68 readers_schema: &Schema,
69 ) -> Result<(), CompatibilityError> {
70 self.full_match_schemas(writers_schema, readers_schema)
71 }
72
73 pub(crate) fn full_match_schemas(
74 &mut self,
75 writers_schema: &Schema,
76 readers_schema: &Schema,
77 ) -> Result<(), CompatibilityError> {
78 if self.recursion_in_progress(writers_schema, readers_schema) {
79 return Ok(());
80 }
81
82 SchemaCompatibility::match_schemas(writers_schema, readers_schema)?;
83
84 let w_type = SchemaKind::from(writers_schema);
85 let r_type = SchemaKind::from(readers_schema);
86
87 if w_type != SchemaKind::Union
88 && (r_type.is_primitive()
89 || r_type == SchemaKind::Fixed
90 || r_type == SchemaKind::Uuid
91 || r_type == SchemaKind::Date
92 || r_type == SchemaKind::TimeMillis
93 || r_type == SchemaKind::TimeMicros
94 || r_type == SchemaKind::TimestampMillis
95 || r_type == SchemaKind::TimestampMicros
96 || r_type == SchemaKind::TimestampNanos
97 || r_type == SchemaKind::LocalTimestampMillis
98 || r_type == SchemaKind::LocalTimestampMicros
99 || r_type == SchemaKind::LocalTimestampNanos)
100 {
101 return Ok(());
102 }
103
104 match r_type {
105 SchemaKind::Ref => match_ref_schemas(writers_schema, readers_schema),
106 SchemaKind::Record => self.match_record_schemas(writers_schema, readers_schema),
107 SchemaKind::Map => {
108 if let Schema::Map(w_m) = writers_schema {
109 match readers_schema {
110 Schema::Map(r_m) => self.full_match_schemas(&w_m.types, &r_m.types),
111 _ => Err(CompatibilityError::WrongType {
112 writer_schema_type: format!("{:#?}", writers_schema),
113 reader_schema_type: format!("{:#?}", readers_schema),
114 }),
115 }
116 } else {
117 Err(CompatibilityError::TypeExpected {
118 schema_type: String::from("writers_schema"),
119 expected_type: vec![SchemaKind::Record],
120 })
121 }
122 }
123 SchemaKind::Array => {
124 if let Schema::Array(w_a) = writers_schema {
125 match readers_schema {
126 Schema::Array(r_a) => self.full_match_schemas(&w_a.items, &r_a.items),
127 _ => Err(CompatibilityError::WrongType {
128 writer_schema_type: format!("{:#?}", writers_schema),
129 reader_schema_type: format!("{:#?}", readers_schema),
130 }),
131 }
132 } else {
133 Err(CompatibilityError::TypeExpected {
134 schema_type: String::from("writers_schema"),
135 expected_type: vec![SchemaKind::Array],
136 })
137 }
138 }
139 SchemaKind::Union => self.match_union_schemas(writers_schema, readers_schema),
140 SchemaKind::Enum => {
141 if let Schema::Enum(EnumSchema {
143 symbols: w_symbols, ..
144 }) = writers_schema
145 {
146 if let Schema::Enum(EnumSchema {
147 symbols: r_symbols, ..
148 }) = readers_schema
149 {
150 if w_symbols.iter().all(|e| r_symbols.contains(e)) {
151 return Ok(());
152 }
153 }
154 }
155 Err(CompatibilityError::MissingSymbols)
156 }
157 _ => {
158 if w_type == SchemaKind::Union {
159 if let Schema::Union(r) = writers_schema {
160 if r.schemas.len() == 1 {
161 return self.full_match_schemas(&r.schemas[0], readers_schema);
162 }
163 }
164 }
165 Err(CompatibilityError::Inconclusive(String::from(
166 "writers_schema",
167 )))
168 }
169 }
170 }
171
172 fn match_record_schemas(
173 &mut self,
174 writers_schema: &Schema,
175 readers_schema: &Schema,
176 ) -> Result<(), CompatibilityError> {
177 let w_type = SchemaKind::from(writers_schema);
178
179 if w_type == SchemaKind::Union {
180 return Err(CompatibilityError::TypeExpected {
181 schema_type: String::from("writers_schema"),
182 expected_type: vec![SchemaKind::Record],
183 });
184 }
185
186 if let Schema::Record(RecordSchema {
187 fields: w_fields,
188 lookup: w_lookup,
189 ..
190 }) = writers_schema
191 {
192 if let Schema::Record(RecordSchema {
193 fields: r_fields, ..
194 }) = readers_schema
195 {
196 for field in r_fields.iter() {
197 let mut fields_names = vec![&field.name];
199 if let Some(ref aliases) = field.aliases {
200 for alias in aliases {
201 fields_names.push(alias);
202 }
203 }
204
205 let position = fields_names.iter().find_map(|field_name| {
209 if let Some(pos) = w_lookup.get(*field_name) {
210 if &w_fields[*pos].name == *field_name {
211 return Some(pos);
212 }
213 }
214 None
215 });
216
217 match position {
218 Some(pos) => {
219 if let Err(err) =
220 self.full_match_schemas(&w_fields[*pos].schema, &field.schema)
221 {
222 return Err(CompatibilityError::FieldTypeMismatch(
223 field.name.clone(),
224 Box::new(err),
225 ));
226 }
227 }
228 _ => {
229 if field.default.is_none() {
230 return Err(CompatibilityError::MissingDefaultValue(
231 field.name.clone(),
232 ));
233 }
234 }
235 }
236 }
237 }
238 }
239 Ok(())
240 }
241
242 fn match_union_schemas(
243 &mut self,
244 writers_schema: &Schema,
245 readers_schema: &Schema,
246 ) -> Result<(), CompatibilityError> {
247 if let Schema::Union(u) = writers_schema {
248 if u.schemas
249 .iter()
250 .all(|schema| self.full_match_schemas(schema, readers_schema).is_ok())
251 {
252 return Ok(());
253 } else {
254 return Err(CompatibilityError::MissingUnionElements);
255 }
256 } else if let Schema::Union(u) = readers_schema {
257 if u.schemas
261 .iter()
262 .any(|schema| self.full_match_schemas(writers_schema, schema).is_ok())
263 {
264 return Ok(());
265 }
266 }
267 Err(CompatibilityError::MissingUnionElements)
268 }
269
270 fn recursion_in_progress(&mut self, writers_schema: &Schema, readers_schema: &Schema) -> bool {
271 let mut hasher = DefaultHasher::new();
272 ptr::hash(writers_schema, &mut hasher);
273 let w_hash = hasher.finish();
274
275 hasher = DefaultHasher::new();
276 ptr::hash(readers_schema, &mut hasher);
277 let r_hash = hasher.finish();
278
279 let key = (w_hash, r_hash);
280 !self.recursion.insert(key)
283 }
284}
285
286impl SchemaCompatibility {
287 pub fn can_read(
290 writers_schema: &Schema,
291 readers_schema: &Schema,
292 ) -> Result<(), CompatibilityError> {
293 let mut c = Checker::new();
294 c.can_read(writers_schema, readers_schema)
295 }
296
297 pub fn mutual_read(
300 writers_schema: &Schema,
301 readers_schema: &Schema,
302 ) -> Result<(), CompatibilityError> {
303 SchemaCompatibility::can_read(writers_schema, readers_schema)?;
304 SchemaCompatibility::can_read(readers_schema, writers_schema)
305 }
306
307 pub(crate) fn match_schemas(
313 writers_schema: &Schema,
314 readers_schema: &Schema,
315 ) -> Result<(), CompatibilityError> {
316 fn check_reader_type_multi(
317 reader_type: SchemaKind,
318 allowed_reader_types: Vec<SchemaKind>,
319 writer_type: SchemaKind,
320 ) -> Result<(), CompatibilityError> {
321 if allowed_reader_types.iter().any(|&t| t == reader_type) {
322 Ok(())
323 } else {
324 let mut allowed_types: Vec<SchemaKind> = vec![writer_type];
325 allowed_types.extend_from_slice(allowed_reader_types.as_slice());
326 Err(CompatibilityError::TypeExpected {
327 schema_type: String::from("readers_schema"),
328 expected_type: allowed_types,
329 })
330 }
331 }
332
333 fn check_reader_type(
334 reader_type: SchemaKind,
335 allowed_reader_type: SchemaKind,
336 writer_type: SchemaKind,
337 ) -> Result<(), CompatibilityError> {
338 if reader_type == allowed_reader_type {
339 Ok(())
340 } else {
341 Err(CompatibilityError::TypeExpected {
342 schema_type: String::from("readers_schema"),
343 expected_type: vec![writer_type, allowed_reader_type],
344 })
345 }
346 }
347
348 fn check_writer_type(
349 writers_schema: &Schema,
350 allowed_schema: &Schema,
351 expected_schema_types: Vec<SchemaKind>,
352 ) -> Result<(), CompatibilityError> {
353 if *allowed_schema == *writers_schema {
354 Ok(())
355 } else {
356 Err(CompatibilityError::TypeExpected {
357 schema_type: String::from("writers_schema"),
358 expected_type: expected_schema_types,
359 })
360 }
361 }
362
363 let w_type = SchemaKind::from(writers_schema);
364 let r_type = SchemaKind::from(readers_schema);
365
366 if w_type == SchemaKind::Union || r_type == SchemaKind::Union {
367 return Ok(());
368 }
369
370 if w_type == r_type {
371 if r_type.is_primitive() {
372 return Ok(());
373 }
374
375 match r_type {
376 SchemaKind::Record | SchemaKind::Enum => {
377 let msg = format!("A {} type must always has a name", readers_schema);
378 let writers_name = writers_schema.name().expect(&msg);
379 let readers_name = readers_schema.name().expect(&msg);
380
381 if writers_name.name == readers_name.name {
382 return Ok(());
383 }
384
385 return Err(CompatibilityError::NameMismatch {
386 writer_name: writers_name.name.clone(),
387 reader_name: readers_name.name.clone(),
388 });
389 }
390 SchemaKind::Fixed => {
391 if let Schema::Fixed(FixedSchema {
392 name: w_name,
393 aliases: _,
394 doc: _w_doc,
395 size: w_size,
396 default: _w_default,
397 attributes: _,
398 }) = writers_schema
399 {
400 if let Schema::Fixed(FixedSchema {
401 name: r_name,
402 aliases: _,
403 doc: _r_doc,
404 size: r_size,
405 default: _r_default,
406 attributes: _,
407 }) = readers_schema
408 {
409 return (w_name.name == r_name.name && w_size == r_size)
410 .then_some(())
411 .ok_or(CompatibilityError::FixedMismatch);
412 }
413 }
414 }
415 SchemaKind::Map => {
416 if let Schema::Map(w_m) = writers_schema {
417 if let Schema::Map(r_m) = readers_schema {
418 return SchemaCompatibility::match_schemas(&w_m.types, &r_m.types);
419 }
420 }
421 }
422 SchemaKind::Array => {
423 if let Schema::Array(w_a) = writers_schema {
424 if let Schema::Array(r_a) = readers_schema {
425 return SchemaCompatibility::match_schemas(&w_a.items, &r_a.items);
426 }
427 }
428 }
429 SchemaKind::Uuid => {
430 return check_writer_type(
431 writers_schema,
432 readers_schema,
433 vec![r_type, SchemaKind::String],
434 );
435 }
436 SchemaKind::Date | SchemaKind::TimeMillis => {
437 return check_writer_type(
438 writers_schema,
439 readers_schema,
440 vec![r_type, SchemaKind::Int],
441 );
442 }
443 SchemaKind::TimeMicros
444 | SchemaKind::TimestampNanos
445 | SchemaKind::TimestampMillis
446 | SchemaKind::TimestampMicros
447 | SchemaKind::LocalTimestampMillis
448 | SchemaKind::LocalTimestampMicros
449 | SchemaKind::LocalTimestampNanos => {
450 return check_writer_type(
451 writers_schema,
452 readers_schema,
453 vec![r_type, SchemaKind::Long],
454 );
455 }
456 SchemaKind::Duration => {
457 return Ok(());
458 }
459 SchemaKind::Ref => return match_ref_schemas(writers_schema, readers_schema),
460 _ => {
461 return Err(CompatibilityError::Inconclusive(String::from(
462 "readers_schema",
463 )))
464 }
465 };
466 }
467
468 match w_type {
470 SchemaKind::Int => check_reader_type_multi(
471 r_type,
472 vec![
473 SchemaKind::Long,
474 SchemaKind::Float,
475 SchemaKind::Double,
476 SchemaKind::Date,
477 SchemaKind::TimeMillis,
478 ],
479 w_type,
480 ),
481 SchemaKind::Long => check_reader_type_multi(
482 r_type,
483 vec![
484 SchemaKind::Float,
485 SchemaKind::Double,
486 SchemaKind::TimeMicros,
487 SchemaKind::TimestampMillis,
488 SchemaKind::TimestampMicros,
489 SchemaKind::TimestampNanos,
490 SchemaKind::LocalTimestampMillis,
491 SchemaKind::LocalTimestampMicros,
492 SchemaKind::LocalTimestampNanos,
493 ],
494 w_type,
495 ),
496 SchemaKind::Float => {
497 check_reader_type_multi(r_type, vec![SchemaKind::Float, SchemaKind::Double], w_type)
498 }
499 SchemaKind::String => {
500 check_reader_type_multi(r_type, vec![SchemaKind::Bytes, SchemaKind::Uuid], w_type)
501 }
502 SchemaKind::Bytes => check_reader_type(r_type, SchemaKind::String, w_type),
503 SchemaKind::Uuid => check_reader_type(r_type, SchemaKind::String, w_type),
504 SchemaKind::Date | SchemaKind::TimeMillis => {
505 check_reader_type(r_type, SchemaKind::Int, w_type)
506 }
507 SchemaKind::TimeMicros
508 | SchemaKind::TimestampMicros
509 | SchemaKind::TimestampMillis
510 | SchemaKind::TimestampNanos
511 | SchemaKind::LocalTimestampMillis
512 | SchemaKind::LocalTimestampMicros
513 | SchemaKind::LocalTimestampNanos => {
514 check_reader_type(r_type, SchemaKind::Long, w_type)
515 }
516 _ => Err(CompatibilityError::Inconclusive(String::from(
517 "writers_schema",
518 ))),
519 }
520 }
521}
522
523#[cfg(test)]
524mod tests {
525 use super::*;
526 use crate::{
527 types::{Record, Value},
528 Codec, Reader, Writer,
529 };
530 use apache_avro_test_helper::TestResult;
531 use rstest::*;
532
533 fn int_array_schema() -> Schema {
534 Schema::parse_str(r#"{"type":"array", "items":"int"}"#).unwrap()
535 }
536
537 fn long_array_schema() -> Schema {
538 Schema::parse_str(r#"{"type":"array", "items":"long"}"#).unwrap()
539 }
540
541 fn string_array_schema() -> Schema {
542 Schema::parse_str(r#"{"type":"array", "items":"string"}"#).unwrap()
543 }
544
545 fn int_map_schema() -> Schema {
546 Schema::parse_str(r#"{"type":"map", "values":"int"}"#).unwrap()
547 }
548
549 fn long_map_schema() -> Schema {
550 Schema::parse_str(r#"{"type":"map", "values":"long"}"#).unwrap()
551 }
552
553 fn string_map_schema() -> Schema {
554 Schema::parse_str(r#"{"type":"map", "values":"string"}"#).unwrap()
555 }
556
557 fn enum1_ab_schema() -> Schema {
558 Schema::parse_str(r#"{"type":"enum", "name":"Enum1", "symbols":["A","B"]}"#).unwrap()
559 }
560
561 fn enum1_abc_schema() -> Schema {
562 Schema::parse_str(r#"{"type":"enum", "name":"Enum1", "symbols":["A","B","C"]}"#).unwrap()
563 }
564
565 fn enum1_bc_schema() -> Schema {
566 Schema::parse_str(r#"{"type":"enum", "name":"Enum1", "symbols":["B","C"]}"#).unwrap()
567 }
568
569 fn enum2_ab_schema() -> Schema {
570 Schema::parse_str(r#"{"type":"enum", "name":"Enum2", "symbols":["A","B"]}"#).unwrap()
571 }
572
573 fn empty_record1_schema() -> Schema {
574 Schema::parse_str(r#"{"type":"record", "name":"Record1", "fields":[]}"#).unwrap()
575 }
576
577 fn empty_record2_schema() -> Schema {
578 Schema::parse_str(r#"{"type":"record", "name":"Record2", "fields": []}"#).unwrap()
579 }
580
581 fn a_int_record1_schema() -> Schema {
582 Schema::parse_str(
583 r#"{"type":"record", "name":"Record1", "fields":[{"name":"a", "type":"int"}]}"#,
584 )
585 .unwrap()
586 }
587
588 fn a_long_record1_schema() -> Schema {
589 Schema::parse_str(
590 r#"{"type":"record", "name":"Record1", "fields":[{"name":"a", "type":"long"}]}"#,
591 )
592 .unwrap()
593 }
594
595 fn a_int_b_int_record1_schema() -> Schema {
596 Schema::parse_str(r#"{"type":"record", "name":"Record1", "fields":[{"name":"a", "type":"int"}, {"name":"b", "type":"int"}]}"#).unwrap()
597 }
598
599 fn a_dint_record1_schema() -> Schema {
600 Schema::parse_str(r#"{"type":"record", "name":"Record1", "fields":[{"name":"a", "type":"int", "default":0}]}"#).unwrap()
601 }
602
603 fn a_int_b_dint_record1_schema() -> Schema {
604 Schema::parse_str(r#"{"type":"record", "name":"Record1", "fields":[{"name":"a", "type":"int"}, {"name":"b", "type":"int", "default":0}]}"#).unwrap()
605 }
606
607 fn a_dint_b_dint_record1_schema() -> Schema {
608 Schema::parse_str(r#"{"type":"record", "name":"Record1", "fields":[{"name":"a", "type":"int", "default":0}, {"name":"b", "type":"int", "default":0}]}"#).unwrap()
609 }
610
611 fn nested_record() -> Schema {
612 Schema::parse_str(r#"{"type":"record","name":"parent","fields":[{"name":"attribute","type":{"type":"record","name":"child","fields":[{"name":"id","type":"string"}]}}]}"#).unwrap()
613 }
614
615 fn nested_optional_record() -> Schema {
616 Schema::parse_str(r#"{"type":"record","name":"parent","fields":[{"name":"attribute","type":["null",{"type":"record","name":"child","fields":[{"name":"id","type":"string"}]}],"default":null}]}"#).unwrap()
617 }
618
619 fn int_list_record_schema() -> Schema {
620 Schema::parse_str(r#"{"type":"record", "name":"List", "fields": [{"name": "head", "type": "int"},{"name": "tail", "type": "array", "items": "int"}]}"#).unwrap()
621 }
622
623 fn long_list_record_schema() -> Schema {
624 Schema::parse_str(
625 r#"
626 {
627 "type":"record", "name":"List", "fields": [
628 {"name": "head", "type": "long"},
629 {"name": "tail", "type": "array", "items": "long"}
630 ]}
631"#,
632 )
633 .unwrap()
634 }
635
636 fn union_schema(schemas: Vec<Schema>) -> Schema {
637 let schema_string = schemas
638 .iter()
639 .map(|s| s.canonical_form())
640 .collect::<Vec<String>>()
641 .join(",");
642 Schema::parse_str(&format!("[{schema_string}]")).unwrap()
643 }
644
645 fn empty_union_schema() -> Schema {
646 union_schema(vec![])
647 }
648
649 fn int_union_schema() -> Schema {
653 union_schema(vec![Schema::Int])
654 }
655
656 fn long_union_schema() -> Schema {
657 union_schema(vec![Schema::Long])
658 }
659
660 fn string_union_schema() -> Schema {
661 union_schema(vec![Schema::String])
662 }
663
664 fn int_string_union_schema() -> Schema {
665 union_schema(vec![Schema::Int, Schema::String])
666 }
667
668 fn string_int_union_schema() -> Schema {
669 union_schema(vec![Schema::String, Schema::Int])
670 }
671
672 #[test]
673 fn test_broken() {
674 assert_eq!(
675 CompatibilityError::MissingUnionElements,
676 SchemaCompatibility::can_read(&int_string_union_schema(), &int_union_schema())
677 .unwrap_err()
678 )
679 }
680
681 #[test]
682 fn test_incompatible_reader_writer_pairs() {
683 let incompatible_schemas = vec![
684 (Schema::Null, Schema::Int),
686 (Schema::Null, Schema::Long),
687 (Schema::Boolean, Schema::Int),
689 (Schema::Int, Schema::Null),
691 (Schema::Int, Schema::Boolean),
692 (Schema::Int, Schema::Long),
693 (Schema::Int, Schema::Float),
694 (Schema::Int, Schema::Double),
695 (Schema::Long, Schema::Float),
697 (Schema::Long, Schema::Double),
698 (Schema::Float, Schema::Double),
700 (Schema::String, Schema::Boolean),
702 (Schema::String, Schema::Int),
703 (Schema::Bytes, Schema::Null),
705 (Schema::Bytes, Schema::Int),
706 (Schema::TimeMicros, Schema::Int),
708 (Schema::TimestampMillis, Schema::Int),
709 (Schema::TimestampMicros, Schema::Int),
710 (Schema::TimestampNanos, Schema::Int),
711 (Schema::LocalTimestampMillis, Schema::Int),
712 (Schema::LocalTimestampMicros, Schema::Int),
713 (Schema::LocalTimestampNanos, Schema::Int),
714 (Schema::Date, Schema::Long),
715 (Schema::TimeMillis, Schema::Long),
716 (int_array_schema(), long_array_schema()),
718 (int_map_schema(), int_array_schema()),
719 (int_array_schema(), int_map_schema()),
720 (int_map_schema(), long_map_schema()),
721 (enum1_ab_schema(), enum1_abc_schema()),
723 (enum1_bc_schema(), enum1_abc_schema()),
724 (enum1_ab_schema(), enum2_ab_schema()),
725 (Schema::Int, enum2_ab_schema()),
726 (enum2_ab_schema(), Schema::Int),
727 (int_union_schema(), int_string_union_schema()),
729 (string_union_schema(), int_string_union_schema()),
730 (empty_record2_schema(), empty_record1_schema()),
732 (a_int_record1_schema(), empty_record1_schema()),
733 (a_int_b_dint_record1_schema(), empty_record1_schema()),
734 (int_list_record_schema(), long_list_record_schema()),
735 (nested_record(), nested_optional_record()),
736 ];
737
738 assert!(incompatible_schemas
739 .iter()
740 .any(|(reader, writer)| SchemaCompatibility::can_read(writer, reader).is_err()));
741 }
742
743 #[rstest]
744 #[case(
746 r#"{"type": "record", "name": "record_a", "fields": [{"type": "long", "name": "date"}]}"#,
747 r#"{"type": "record", "name": "record_a", "fields": [{"type": "long", "name": "date", "default": 18181}]}"#
748 )]
749 #[case(
751 r#"{"type": "fixed", "name": "EmployeeId", "size": 16}"#,
752 r#"{"type": "fixed", "name": "EmployeeId", "size": 16, "default": "u00ffffffffffffx"}"#
753 )]
754 #[case(
756 r#"{"type": "enum", "name":"Enum1", "symbols": ["A","B"]}"#,
757 r#"{"type": "enum", "name":"Enum1", "symbols": ["A","B", "C"], "default": "C"}"#
758 )]
759 #[case(
761 r#"{"type": "map", "values": "int"}"#,
762 r#"{"type": "map", "values": "long"}"#
763 )]
764 #[case(r#"{"type": "int"}"#, r#"{"type": "int", "logicalType": "date"}"#)]
766 #[case(
768 r#"{"type": "int"}"#,
769 r#"{"type": "int", "logicalType": "time-millis"}"#
770 )]
771 #[case(
773 r#"{"type": "long"}"#,
774 r#"{"type": "long", "logicalType": "time-micros"}"#
775 )]
776 #[case(
778 r#"{"type": "long"}"#,
779 r#"{"type": "long", "logicalType": "timestamp-nanos"}"#
780 )]
781 #[case(
783 r#"{"type": "long"}"#,
784 r#"{"type": "long", "logicalType": "timestamp-millis"}"#
785 )]
786 #[case(
788 r#"{"type": "long"}"#,
789 r#"{"type": "long", "logicalType": "timestamp-micros"}"#
790 )]
791 #[case(
793 r#"{"type": "long"}"#,
794 r#"{"type": "long", "logicalType": "local-timestamp-millis"}"#
795 )]
796 #[case(
798 r#"{"type": "long"}"#,
799 r#"{"type": "long", "logicalType": "local-timestamp-micros"}"#
800 )]
801 #[case(
803 r#"{"type": "long"}"#,
804 r#"{"type": "long", "logicalType": "local-timestamp-nanos"}"#
805 )]
806 #[case(
808 r#"{"type": "array", "items": "int"}"#,
809 r#"{"type": "array", "items": "long"}"#
810 )]
811 fn test_avro_3950_match_schemas_ok(
812 #[case] writer_schema_str: &str,
813 #[case] reader_schema_str: &str,
814 ) {
815 let writer_schema = Schema::parse_str(writer_schema_str).unwrap();
816 let reader_schema = Schema::parse_str(reader_schema_str).unwrap();
817
818 assert!(SchemaCompatibility::match_schemas(&writer_schema, &reader_schema).is_ok());
819 }
820
821 #[rstest]
822 #[case(
824 r#"{"type": "record", "name":"record_a", "fields": [{"type": "long", "name": "date"}]}"#,
825 r#"{"type": "record", "name":"record_b", "fields": [{"type": "long", "name": "date"}]}"#,
826 CompatibilityError::NameMismatch{writer_name: String::from("record_a"), reader_name: String::from("record_b")}
827 )]
828 #[case(
830 r#"{"type": "fixed", "name": "EmployeeId", "size": 16}"#,
831 r#"{"type": "fixed", "name": "EmployeeId", "size": 20}"#,
832 CompatibilityError::FixedMismatch
833 )]
834 #[case(
836 r#"{"type": "enum", "name": "Enum1", "symbols": ["A","B"]}"#,
837 r#"{"type": "enum", "name": "Enum2", "symbols": ["A","B"]}"#,
838 CompatibilityError::NameMismatch{writer_name: String::from("Enum1"), reader_name: String::from("Enum2")}
839 )]
840 #[case(
842 r#"{"type":"map", "values": "long"}"#,
843 r#"{"type":"map", "values": "int"}"#,
844 CompatibilityError::TypeExpected {schema_type: String::from("readers_schema"), expected_type: vec![
845 SchemaKind::Long,
846 SchemaKind::Float,
847 SchemaKind::Double,
848 SchemaKind::TimeMicros,
849 SchemaKind::TimestampMillis,
850 SchemaKind::TimestampMicros,
851 SchemaKind::TimestampNanos,
852 SchemaKind::LocalTimestampMillis,
853 SchemaKind::LocalTimestampMicros,
854 SchemaKind::LocalTimestampNanos,
855 ]}
856 )]
857 #[case(
859 r#"{"type": "array", "items": "long"}"#,
860 r#"{"type": "array", "items": "int"}"#,
861 CompatibilityError::TypeExpected {schema_type: String::from("readers_schema"), expected_type: vec![
862 SchemaKind::Long,
863 SchemaKind::Float,
864 SchemaKind::Double,
865 SchemaKind::TimeMicros,
866 SchemaKind::TimestampMillis,
867 SchemaKind::TimestampMicros,
868 SchemaKind::TimestampNanos,
869 SchemaKind::LocalTimestampMillis,
870 SchemaKind::LocalTimestampMicros,
871 SchemaKind::LocalTimestampNanos,
872 ]}
873 )]
874 #[case(
876 r#"{"type": "string"}"#,
877 r#"{"type": "int", "logicalType": "date"}"#,
878 CompatibilityError::TypeExpected{schema_type: String::from("readers_schema"), expected_type: vec![
879 SchemaKind::String,
880 SchemaKind::Bytes,
881 SchemaKind::Uuid,
882 ]}
883 )]
884 #[case(
886 r#"{"type": "string"}"#,
887 r#"{"type": "int", "logicalType": "time-millis"}"#,
888 CompatibilityError::TypeExpected{schema_type: String::from("readers_schema"), expected_type: vec![
889 SchemaKind::String,
890 SchemaKind::Bytes,
891 SchemaKind::Uuid,
892 ]}
893 )]
894 #[case(
896 r#"{"type": "int"}"#,
897 r#"{"type": "long", "logicalType": "time-micros"}"#,
898 CompatibilityError::TypeExpected{schema_type: String::from("readers_schema"), expected_type: vec![
899 SchemaKind::Int,
900 SchemaKind::Long,
901 SchemaKind::Float,
902 SchemaKind::Double,
903 SchemaKind::Date,
904 SchemaKind::TimeMillis
905 ]}
906 )]
907 #[case(
922 r#"{"type": "int"}"#,
923 r#"{"type": "long", "logicalType": "timestamp-millis"}"#,
924 CompatibilityError::TypeExpected{schema_type: String::from("readers_schema"), expected_type: vec![
925 SchemaKind::Int,
926 SchemaKind::Long,
927 SchemaKind::Float,
928 SchemaKind::Double,
929 SchemaKind::Date,
930 SchemaKind::TimeMillis
931 ]}
932 )]
933 #[case(
935 r#"{"type": "int"}"#,
936 r#"{"type": "long", "logicalType": "timestamp-micros"}"#,
937 CompatibilityError::TypeExpected{schema_type: String::from("readers_schema"), expected_type: vec![
938 SchemaKind::Int,
939 SchemaKind::Long,
940 SchemaKind::Float,
941 SchemaKind::Double,
942 SchemaKind::Date,
943 SchemaKind::TimeMillis
944 ]}
945 )]
946 #[case(
948 r#"{"type": "int"}"#,
949 r#"{"type": "long", "logicalType": "local-timestamp-millis"}"#,
950 CompatibilityError::TypeExpected{schema_type: String::from("readers_schema"), expected_type: vec![
951 SchemaKind::Int,
952 SchemaKind::Long,
953 SchemaKind::Float,
954 SchemaKind::Double,
955 SchemaKind::Date,
956 SchemaKind::TimeMillis
957 ]}
958 )]
959 #[case(
961 r#"{"type": "int"}"#,
962 r#"{"type": "long", "logicalType": "local-timestamp-micros"}"#,
963 CompatibilityError::TypeExpected{schema_type: String::from("readers_schema"), expected_type: vec![
964 SchemaKind::Int,
965 SchemaKind::Long,
966 SchemaKind::Float,
967 SchemaKind::Double,
968 SchemaKind::Date,
969 SchemaKind::TimeMillis
970 ]}
971 )]
972 #[case(
987 r#"{"type": "record", "name":"record_b", "fields": [{"type": "long", "name": "date"}]}"#,
988 r#"{"type": "fixed", "name": "EmployeeId", "size": 16}"#,
989 CompatibilityError::Inconclusive(String::from("writers_schema"))
990 )]
991 fn test_avro_3950_match_schemas_error(
992 #[case] writer_schema_str: &str,
993 #[case] reader_schema_str: &str,
994 #[case] expected_error: CompatibilityError,
995 ) {
996 let writer_schema = Schema::parse_str(writer_schema_str).unwrap();
997 let reader_schema = Schema::parse_str(reader_schema_str).unwrap();
998
999 assert_eq!(
1000 expected_error,
1001 SchemaCompatibility::match_schemas(&writer_schema, &reader_schema).unwrap_err()
1002 )
1003 }
1004
1005 #[test]
1006 fn test_compatible_reader_writer_pairs() {
1007 let compatible_schemas = vec![
1008 (Schema::Null, Schema::Null),
1009 (Schema::Long, Schema::Int),
1010 (Schema::Float, Schema::Int),
1011 (Schema::Float, Schema::Long),
1012 (Schema::Double, Schema::Long),
1013 (Schema::Double, Schema::Int),
1014 (Schema::Double, Schema::Float),
1015 (Schema::String, Schema::Bytes),
1016 (Schema::Bytes, Schema::String),
1017 (Schema::Uuid, Schema::Uuid),
1019 (Schema::Uuid, Schema::String),
1020 (Schema::Date, Schema::Int),
1021 (Schema::TimeMillis, Schema::Int),
1022 (Schema::TimeMicros, Schema::Long),
1023 (Schema::TimestampMillis, Schema::Long),
1024 (Schema::TimestampMicros, Schema::Long),
1025 (Schema::TimestampNanos, Schema::Long),
1026 (Schema::LocalTimestampMillis, Schema::Long),
1027 (Schema::LocalTimestampMicros, Schema::Long),
1028 (Schema::LocalTimestampNanos, Schema::Long),
1029 (Schema::String, Schema::Uuid),
1030 (Schema::Int, Schema::Date),
1031 (Schema::Int, Schema::TimeMillis),
1032 (Schema::Long, Schema::TimeMicros),
1033 (Schema::Long, Schema::TimestampMillis),
1034 (Schema::Long, Schema::TimestampMicros),
1035 (Schema::Long, Schema::TimestampNanos),
1036 (Schema::Long, Schema::LocalTimestampMillis),
1037 (Schema::Long, Schema::LocalTimestampMicros),
1038 (Schema::Long, Schema::LocalTimestampNanos),
1039 (int_array_schema(), int_array_schema()),
1040 (long_array_schema(), int_array_schema()),
1041 (int_map_schema(), int_map_schema()),
1042 (long_map_schema(), int_map_schema()),
1043 (enum1_ab_schema(), enum1_ab_schema()),
1044 (enum1_abc_schema(), enum1_ab_schema()),
1045 (empty_union_schema(), empty_union_schema()),
1046 (int_union_schema(), int_union_schema()),
1047 (int_string_union_schema(), string_int_union_schema()),
1048 (int_union_schema(), empty_union_schema()),
1049 (long_union_schema(), int_union_schema()),
1050 (int_union_schema(), Schema::Int),
1051 (Schema::Int, int_union_schema()),
1052 (empty_record1_schema(), empty_record1_schema()),
1053 (empty_record1_schema(), a_int_record1_schema()),
1054 (a_int_record1_schema(), a_int_record1_schema()),
1055 (a_dint_record1_schema(), a_int_record1_schema()),
1056 (a_dint_record1_schema(), a_dint_record1_schema()),
1057 (a_int_record1_schema(), a_dint_record1_schema()),
1058 (a_long_record1_schema(), a_int_record1_schema()),
1059 (a_int_record1_schema(), a_int_b_int_record1_schema()),
1060 (a_dint_record1_schema(), a_int_b_int_record1_schema()),
1061 (a_int_b_dint_record1_schema(), a_int_record1_schema()),
1062 (a_dint_b_dint_record1_schema(), empty_record1_schema()),
1063 (a_dint_b_dint_record1_schema(), a_int_record1_schema()),
1064 (a_int_b_int_record1_schema(), a_dint_b_dint_record1_schema()),
1065 (int_list_record_schema(), int_list_record_schema()),
1066 (long_list_record_schema(), long_list_record_schema()),
1067 (long_list_record_schema(), int_list_record_schema()),
1068 (nested_optional_record(), nested_record()),
1069 ];
1070
1071 assert!(compatible_schemas
1072 .iter()
1073 .all(|(reader, writer)| SchemaCompatibility::can_read(writer, reader).is_ok()));
1074 }
1075
1076 fn writer_schema() -> Schema {
1077 Schema::parse_str(
1078 r#"
1079 {"type":"record", "name":"Record", "fields":[
1080 {"name":"oldfield1", "type":"int"},
1081 {"name":"oldfield2", "type":"string"}
1082 ]}
1083"#,
1084 )
1085 .unwrap()
1086 }
1087
1088 #[test]
1089 fn test_missing_field() -> TestResult {
1090 let reader_schema = Schema::parse_str(
1091 r#"
1092 {"type":"record", "name":"Record", "fields":[
1093 {"name":"oldfield1", "type":"int"}
1094 ]}
1095"#,
1096 )?;
1097 assert!(SchemaCompatibility::can_read(&writer_schema(), &reader_schema,).is_ok());
1098 assert_eq!(
1099 CompatibilityError::MissingDefaultValue(String::from("oldfield2")),
1100 SchemaCompatibility::can_read(&reader_schema, &writer_schema()).unwrap_err()
1101 );
1102
1103 Ok(())
1104 }
1105
1106 #[test]
1107 fn test_missing_second_field() -> TestResult {
1108 let reader_schema = Schema::parse_str(
1109 r#"
1110 {"type":"record", "name":"Record", "fields":[
1111 {"name":"oldfield2", "type":"string"}
1112 ]}
1113"#,
1114 )?;
1115 assert!(SchemaCompatibility::can_read(&writer_schema(), &reader_schema).is_ok());
1116 assert_eq!(
1117 CompatibilityError::MissingDefaultValue(String::from("oldfield1")),
1118 SchemaCompatibility::can_read(&reader_schema, &writer_schema()).unwrap_err()
1119 );
1120
1121 Ok(())
1122 }
1123
1124 #[test]
1125 fn test_all_fields() -> TestResult {
1126 let reader_schema = Schema::parse_str(
1127 r#"
1128 {"type":"record", "name":"Record", "fields":[
1129 {"name":"oldfield1", "type":"int"},
1130 {"name":"oldfield2", "type":"string"}
1131 ]}
1132"#,
1133 )?;
1134 assert!(SchemaCompatibility::can_read(&writer_schema(), &reader_schema).is_ok());
1135 assert!(SchemaCompatibility::can_read(&reader_schema, &writer_schema()).is_ok());
1136
1137 Ok(())
1138 }
1139
1140 #[test]
1141 fn test_new_field_with_default() -> TestResult {
1142 let reader_schema = Schema::parse_str(
1143 r#"
1144 {"type":"record", "name":"Record", "fields":[
1145 {"name":"oldfield1", "type":"int"},
1146 {"name":"newfield1", "type":"int", "default":42}
1147 ]}
1148"#,
1149 )?;
1150 assert!(SchemaCompatibility::can_read(&writer_schema(), &reader_schema).is_ok());
1151 assert_eq!(
1152 CompatibilityError::MissingDefaultValue(String::from("oldfield2")),
1153 SchemaCompatibility::can_read(&reader_schema, &writer_schema()).unwrap_err()
1154 );
1155
1156 Ok(())
1157 }
1158
1159 #[test]
1160 fn test_new_field() -> TestResult {
1161 let reader_schema = Schema::parse_str(
1162 r#"
1163 {"type":"record", "name":"Record", "fields":[
1164 {"name":"oldfield1", "type":"int"},
1165 {"name":"newfield1", "type":"int"}
1166 ]}
1167"#,
1168 )?;
1169 assert_eq!(
1170 CompatibilityError::MissingDefaultValue(String::from("newfield1")),
1171 SchemaCompatibility::can_read(&writer_schema(), &reader_schema).unwrap_err()
1172 );
1173 assert_eq!(
1174 CompatibilityError::MissingDefaultValue(String::from("oldfield2")),
1175 SchemaCompatibility::can_read(&reader_schema, &writer_schema()).unwrap_err()
1176 );
1177
1178 Ok(())
1179 }
1180
1181 #[test]
1182 fn test_array_writer_schema() {
1183 let valid_reader = string_array_schema();
1184 let invalid_reader = string_map_schema();
1185
1186 assert!(SchemaCompatibility::can_read(&string_array_schema(), &valid_reader).is_ok());
1187 assert_eq!(
1188 CompatibilityError::Inconclusive(String::from("writers_schema")),
1189 SchemaCompatibility::can_read(&string_array_schema(), &invalid_reader).unwrap_err()
1190 );
1191 }
1192
1193 #[test]
1194 fn test_primitive_writer_schema() {
1195 let valid_reader = Schema::String;
1196 assert!(SchemaCompatibility::can_read(&Schema::String, &valid_reader).is_ok());
1197 assert_eq!(
1198 CompatibilityError::TypeExpected {
1199 schema_type: String::from("readers_schema"),
1200 expected_type: vec![
1201 SchemaKind::Int,
1202 SchemaKind::Long,
1203 SchemaKind::Float,
1204 SchemaKind::Double,
1205 SchemaKind::Date,
1206 SchemaKind::TimeMillis
1207 ],
1208 },
1209 SchemaCompatibility::can_read(&Schema::Int, &Schema::String).unwrap_err()
1210 );
1211 }
1212
1213 #[test]
1214 fn test_union_reader_writer_subset_incompatibility() {
1215 let union_writer = union_schema(vec![Schema::Int, Schema::String]);
1217 let union_reader = union_schema(vec![Schema::String]);
1218
1219 assert_eq!(
1220 CompatibilityError::MissingUnionElements,
1221 SchemaCompatibility::can_read(&union_writer, &union_reader).unwrap_err()
1222 );
1223 assert!(SchemaCompatibility::can_read(&union_reader, &union_writer).is_ok());
1224 }
1225
1226 #[test]
1227 fn test_incompatible_record_field() -> TestResult {
1228 let string_schema = Schema::parse_str(
1229 r#"
1230 {"type":"record", "name":"MyRecord", "namespace":"ns", "fields": [
1231 {"name":"field1", "type":"string"}
1232 ]}
1233 "#,
1234 )?;
1235
1236 let int_schema = Schema::parse_str(
1237 r#"
1238 {"type":"record", "name":"MyRecord", "namespace":"ns", "fields": [
1239 {"name":"field1", "type":"int"}
1240 ]}
1241 "#,
1242 )?;
1243
1244 assert_eq!(
1245 CompatibilityError::FieldTypeMismatch(
1246 "field1".to_owned(),
1247 Box::new(CompatibilityError::TypeExpected {
1248 schema_type: "readers_schema".to_owned(),
1249 expected_type: vec![SchemaKind::String, SchemaKind::Bytes, SchemaKind::Uuid]
1250 })
1251 ),
1252 SchemaCompatibility::can_read(&string_schema, &int_schema).unwrap_err()
1253 );
1254
1255 Ok(())
1256 }
1257
1258 #[test]
1259 fn test_enum_symbols() -> TestResult {
1260 let enum_schema1 = Schema::parse_str(
1261 r#"
1262 {"type":"enum", "name":"MyEnum", "symbols":["A","B"]}
1263"#,
1264 )?;
1265 let enum_schema2 =
1266 Schema::parse_str(r#"{"type":"enum", "name":"MyEnum", "symbols":["A","B","C"]}"#)?;
1267 assert_eq!(
1268 CompatibilityError::MissingSymbols,
1269 SchemaCompatibility::can_read(&enum_schema2, &enum_schema1).unwrap_err()
1270 );
1271 assert!(SchemaCompatibility::can_read(&enum_schema1, &enum_schema2).is_ok());
1272
1273 Ok(())
1274 }
1275
1276 fn point_2d_schema() -> Schema {
1277 Schema::parse_str(
1278 r#"
1279 {"type":"record", "name":"Point2D", "fields":[
1280 {"name":"x", "type":"double"},
1281 {"name":"y", "type":"double"}
1282 ]}
1283 "#,
1284 )
1285 .unwrap()
1286 }
1287
1288 fn point_2d_fullname_schema() -> Schema {
1289 Schema::parse_str(
1290 r#"
1291 {"type":"record", "name":"Point", "namespace":"written", "fields":[
1292 {"name":"x", "type":"double"},
1293 {"name":"y", "type":"double"}
1294 ]}
1295 "#,
1296 )
1297 .unwrap()
1298 }
1299
1300 fn point_3d_no_default_schema() -> Schema {
1301 Schema::parse_str(
1302 r#"
1303 {"type":"record", "name":"Point", "fields":[
1304 {"name":"x", "type":"double"},
1305 {"name":"y", "type":"double"},
1306 {"name":"z", "type":"double"}
1307 ]}
1308 "#,
1309 )
1310 .unwrap()
1311 }
1312
1313 fn point_3d_schema() -> Schema {
1314 Schema::parse_str(
1315 r#"
1316 {"type":"record", "name":"Point3D", "fields":[
1317 {"name":"x", "type":"double"},
1318 {"name":"y", "type":"double"},
1319 {"name":"z", "type":"double", "default": 0.0}
1320 ]}
1321 "#,
1322 )
1323 .unwrap()
1324 }
1325
1326 fn point_3d_match_name_schema() -> Schema {
1327 Schema::parse_str(
1328 r#"
1329 {"type":"record", "name":"Point", "fields":[
1330 {"name":"x", "type":"double"},
1331 {"name":"y", "type":"double"},
1332 {"name":"z", "type":"double", "default": 0.0}
1333 ]}
1334 "#,
1335 )
1336 .unwrap()
1337 }
1338
1339 #[test]
1340 fn test_union_resolution_no_structure_match() {
1341 let read_schema = union_schema(vec![Schema::Null, point_3d_no_default_schema()]);
1343 assert_eq!(
1344 CompatibilityError::MissingUnionElements,
1345 SchemaCompatibility::can_read(&point_2d_fullname_schema(), &read_schema).unwrap_err()
1346 );
1347 }
1348
1349 #[test]
1350 fn test_union_resolution_first_structure_match_2d() {
1351 let read_schema = union_schema(vec![
1353 Schema::Null,
1354 point_3d_no_default_schema(),
1355 point_2d_schema(),
1356 point_3d_schema(),
1357 ]);
1358 assert_eq!(
1359 CompatibilityError::MissingUnionElements,
1360 SchemaCompatibility::can_read(&point_2d_fullname_schema(), &read_schema).unwrap_err()
1361 );
1362 }
1363
1364 #[test]
1365 fn test_union_resolution_first_structure_match_3d() {
1366 let read_schema = union_schema(vec![
1368 Schema::Null,
1369 point_3d_no_default_schema(),
1370 point_3d_schema(),
1371 point_2d_schema(),
1372 ]);
1373 assert_eq!(
1374 CompatibilityError::MissingUnionElements,
1375 SchemaCompatibility::can_read(&point_2d_fullname_schema(), &read_schema).unwrap_err()
1376 );
1377 }
1378
1379 #[test]
1380 fn test_union_resolution_named_structure_match() {
1381 let read_schema = union_schema(vec![
1383 Schema::Null,
1384 point_2d_schema(),
1385 point_3d_match_name_schema(),
1386 point_3d_schema(),
1387 ]);
1388 assert_eq!(
1389 CompatibilityError::MissingUnionElements,
1390 SchemaCompatibility::can_read(&point_2d_fullname_schema(), &read_schema).unwrap_err()
1391 );
1392 }
1393
1394 #[test]
1395 fn test_union_resolution_full_name_match() {
1396 let read_schema = union_schema(vec![
1398 Schema::Null,
1399 point_2d_schema(),
1400 point_3d_match_name_schema(),
1401 point_3d_schema(),
1402 point_2d_fullname_schema(),
1403 ]);
1404 assert!(SchemaCompatibility::can_read(&point_2d_fullname_schema(), &read_schema).is_ok());
1405 }
1406
1407 #[test]
1408 fn test_avro_3772_enum_default() -> TestResult {
1409 let writer_raw_schema = r#"
1410 {
1411 "type": "record",
1412 "name": "test",
1413 "fields": [
1414 {"name": "a", "type": "long", "default": 42},
1415 {"name": "b", "type": "string"},
1416 {
1417 "name": "c",
1418 "type": {
1419 "type": "enum",
1420 "name": "suit",
1421 "symbols": ["diamonds", "spades", "clubs", "hearts"],
1422 "default": "spades"
1423 }
1424 }
1425 ]
1426 }
1427 "#;
1428
1429 let reader_raw_schema = r#"
1430 {
1431 "type": "record",
1432 "name": "test",
1433 "fields": [
1434 {"name": "a", "type": "long", "default": 42},
1435 {"name": "b", "type": "string"},
1436 {
1437 "name": "c",
1438 "type": {
1439 "type": "enum",
1440 "name": "suit",
1441 "symbols": ["diamonds", "spades", "ninja", "hearts"],
1442 "default": "spades"
1443 }
1444 }
1445 ]
1446 }
1447 "#;
1448 let writer_schema = Schema::parse_str(writer_raw_schema)?;
1449 let reader_schema = Schema::parse_str(reader_raw_schema)?;
1450 let mut writer = Writer::with_codec(&writer_schema, Vec::new(), Codec::Null);
1451 let mut record = Record::new(writer.schema()).unwrap();
1452 record.put("a", 27i64);
1453 record.put("b", "foo");
1454 record.put("c", "clubs");
1455 writer.append(record).unwrap();
1456 let input = writer.into_inner()?;
1457 let mut reader = Reader::with_schema(&reader_schema, &input[..])?;
1458 assert_eq!(
1459 reader.next().unwrap().unwrap(),
1460 Value::Record(vec![
1461 ("a".to_string(), Value::Long(27)),
1462 ("b".to_string(), Value::String("foo".to_string())),
1463 ("c".to_string(), Value::Enum(1, "spades".to_string())),
1464 ])
1465 );
1466 assert!(reader.next().is_none());
1467
1468 Ok(())
1469 }
1470
1471 #[test]
1472 fn test_avro_3772_enum_default_less_symbols() -> TestResult {
1473 let writer_raw_schema = r#"
1474 {
1475 "type": "record",
1476 "name": "test",
1477 "fields": [
1478 {"name": "a", "type": "long", "default": 42},
1479 {"name": "b", "type": "string"},
1480 {
1481 "name": "c",
1482 "type": {
1483 "type": "enum",
1484 "name": "suit",
1485 "symbols": ["diamonds", "spades", "clubs", "hearts"],
1486 "default": "spades"
1487 }
1488 }
1489 ]
1490 }
1491 "#;
1492
1493 let reader_raw_schema = r#"
1494 {
1495 "type": "record",
1496 "name": "test",
1497 "fields": [
1498 {"name": "a", "type": "long", "default": 42},
1499 {"name": "b", "type": "string"},
1500 {
1501 "name": "c",
1502 "type": {
1503 "type": "enum",
1504 "name": "suit",
1505 "symbols": ["hearts", "spades"],
1506 "default": "spades"
1507 }
1508 }
1509 ]
1510 }
1511 "#;
1512 let writer_schema = Schema::parse_str(writer_raw_schema)?;
1513 let reader_schema = Schema::parse_str(reader_raw_schema)?;
1514 let mut writer = Writer::with_codec(&writer_schema, Vec::new(), Codec::Null);
1515 let mut record = Record::new(writer.schema()).unwrap();
1516 record.put("a", 27i64);
1517 record.put("b", "foo");
1518 record.put("c", "hearts");
1519 writer.append(record).unwrap();
1520 let input = writer.into_inner()?;
1521 let mut reader = Reader::with_schema(&reader_schema, &input[..])?;
1522 assert_eq!(
1523 reader.next().unwrap().unwrap(),
1524 Value::Record(vec![
1525 ("a".to_string(), Value::Long(27)),
1526 ("b".to_string(), Value::String("foo".to_string())),
1527 ("c".to_string(), Value::Enum(0, "hearts".to_string())),
1528 ])
1529 );
1530 assert!(reader.next().is_none());
1531
1532 Ok(())
1533 }
1534
1535 #[test]
1536 fn avro_3894_take_aliases_into_account_when_serializing_for_schema_compatibility() -> TestResult
1537 {
1538 let schema_v1 = Schema::parse_str(
1539 r#"
1540 {
1541 "type": "record",
1542 "name": "Conference",
1543 "namespace": "advdaba",
1544 "fields": [
1545 {"type": "string", "name": "name"},
1546 {"type": "long", "name": "date"}
1547 ]
1548 }"#,
1549 )?;
1550
1551 let schema_v2 = Schema::parse_str(
1552 r#"
1553 {
1554 "type": "record",
1555 "name": "Conference",
1556 "namespace": "advdaba",
1557 "fields": [
1558 {"type": "string", "name": "name"},
1559 {"type": "long", "name": "date", "aliases" : [ "time" ]}
1560 ]
1561 }"#,
1562 )?;
1563
1564 assert!(SchemaCompatibility::mutual_read(&schema_v1, &schema_v2).is_ok());
1565
1566 Ok(())
1567 }
1568
1569 #[test]
1570 fn avro_3917_take_aliases_into_account_for_schema_compatibility() -> TestResult {
1571 let schema_v1 = Schema::parse_str(
1572 r#"
1573 {
1574 "type": "record",
1575 "name": "Conference",
1576 "namespace": "advdaba",
1577 "fields": [
1578 {"type": "string", "name": "name"},
1579 {"type": "long", "name": "date", "aliases" : [ "time" ]}
1580 ]
1581 }"#,
1582 )?;
1583
1584 let schema_v2 = Schema::parse_str(
1585 r#"
1586 {
1587 "type": "record",
1588 "name": "Conference",
1589 "namespace": "advdaba",
1590 "fields": [
1591 {"type": "string", "name": "name"},
1592 {"type": "long", "name": "time"}
1593 ]
1594 }"#,
1595 )?;
1596
1597 assert!(SchemaCompatibility::can_read(&schema_v2, &schema_v1).is_ok());
1598 assert_eq!(
1599 CompatibilityError::MissingDefaultValue(String::from("time")),
1600 SchemaCompatibility::can_read(&schema_v1, &schema_v2).unwrap_err()
1601 );
1602
1603 Ok(())
1604 }
1605
1606 #[test]
1607 fn test_avro_3898_record_schemas_match_by_unqualified_name() -> TestResult {
1608 let schemas = [
1609 (
1611 Schema::parse_str(
1612 r#"{
1613 "type": "record",
1614 "name": "Statistics",
1615 "fields": [
1616 { "name": "success", "type": "int" },
1617 { "name": "fail", "type": "int" },
1618 { "name": "time", "type": "string" },
1619 { "name": "max", "type": "int", "default": 0 }
1620 ]
1621 }"#,
1622 )?,
1623 Schema::parse_str(
1624 r#"{
1625 "type": "record",
1626 "name": "Statistics",
1627 "namespace": "my.namespace",
1628 "fields": [
1629 { "name": "success", "type": "int" },
1630 { "name": "fail", "type": "int" },
1631 { "name": "time", "type": "string" },
1632 { "name": "average", "type": "int", "default": 0}
1633 ]
1634 }"#,
1635 )?,
1636 ),
1637 (
1639 Schema::parse_str(
1640 r#"{
1641 "type": "enum",
1642 "name": "Suit",
1643 "symbols": ["diamonds", "spades", "clubs"]
1644 }"#,
1645 )?,
1646 Schema::parse_str(
1647 r#"{
1648 "type": "enum",
1649 "name": "Suit",
1650 "namespace": "my.namespace",
1651 "symbols": ["diamonds", "spades", "clubs", "hearts"]
1652 }"#,
1653 )?,
1654 ),
1655 (
1657 Schema::parse_str(
1658 r#"{
1659 "type": "fixed",
1660 "name": "EmployeeId",
1661 "size": 16
1662 }"#,
1663 )?,
1664 Schema::parse_str(
1665 r#"{
1666 "type": "fixed",
1667 "name": "EmployeeId",
1668 "namespace": "my.namespace",
1669 "size": 16
1670 }"#,
1671 )?,
1672 ),
1673 ];
1674
1675 for (schema_1, schema_2) in schemas {
1676 assert!(SchemaCompatibility::can_read(&schema_1, &schema_2).is_ok());
1677 }
1678
1679 Ok(())
1680 }
1681
1682 #[test]
1683 fn test_can_read_compatibility_errors() -> TestResult {
1684 let schemas = [
1685 (
1686 Schema::parse_str(
1687 r#"{
1688 "type": "record",
1689 "name": "StatisticsMap",
1690 "fields": [
1691 {"name": "average", "type": "int", "default": 0},
1692 {"name": "success", "type": {"type": "map", "values": "int"}}
1693 ]
1694 }"#)?,
1695 Schema::parse_str(
1696 r#"{
1697 "type": "record",
1698 "name": "StatisticsMap",
1699 "fields": [
1700 {"name": "average", "type": "int", "default": 0},
1701 {"name": "success", "type": ["null", {"type": "map", "values": "int"}], "default": null}
1702 ]
1703 }"#)?,
1704 "Incompatible schemata! Field 'success' in reader schema does not match the type in the writer schema"
1705 ),
1706 (
1707 Schema::parse_str(
1708 r#"{
1709 "type": "record",
1710 "name": "StatisticsArray",
1711 "fields": [
1712 {"name": "max_values", "type": {"type": "array", "items": "int"}}
1713 ]
1714 }"#)?,
1715 Schema::parse_str(
1716 r#"{
1717 "type": "record",
1718 "name": "StatisticsArray",
1719 "fields": [
1720 {"name": "max_values", "type": ["null", {"type": "array", "items": "int"}], "default": null}
1721 ]
1722 }"#)?,
1723 "Incompatible schemata! Field 'max_values' in reader schema does not match the type in the writer schema"
1724 )
1725 ];
1726
1727 for (schema_1, schema_2, error) in schemas {
1728 assert!(SchemaCompatibility::can_read(&schema_1, &schema_2).is_ok());
1729 assert_eq!(
1730 error,
1731 SchemaCompatibility::can_read(&schema_2, &schema_1)
1732 .unwrap_err()
1733 .to_string()
1734 );
1735 }
1736
1737 Ok(())
1738 }
1739
1740 #[test]
1741 fn avro_3974_can_read_schema_references() -> TestResult {
1742 let schema_strs = vec![
1743 r#"{
1744 "type": "record",
1745 "name": "Child",
1746 "namespace": "avro",
1747 "fields": [
1748 {
1749 "name": "val",
1750 "type": "int"
1751 }
1752 ]
1753 }
1754 "#,
1755 r#"{
1756 "type": "record",
1757 "name": "Parent",
1758 "namespace": "avro",
1759 "fields": [
1760 {
1761 "name": "child",
1762 "type": "avro.Child"
1763 }
1764 ]
1765 }
1766 "#,
1767 ];
1768
1769 let schemas = Schema::parse_list(&schema_strs).unwrap();
1770 SchemaCompatibility::can_read(&schemas[1], &schemas[1])?;
1771
1772 Ok(())
1773 }
1774}