1use crate::{
20 AvroResult,
21 error::{Details, Error},
22 schema_equality, types,
23 util::MapHelper,
24 validator::{
25 validate_enum_symbol_name, validate_namespace, validate_record_field_name,
26 validate_schema_name,
27 },
28};
29use digest::Digest;
30use log::{debug, error, warn};
31use serde::{
32 Deserialize, Serialize, Serializer,
33 ser::{SerializeMap, SerializeSeq},
34};
35use serde_json::{Map, Value};
36use std::{
37 borrow::Borrow,
38 collections::{BTreeMap, HashMap, HashSet},
39 fmt,
40 fmt::Debug,
41 hash::Hash,
42 io::Read,
43 str::FromStr,
44};
45use strum_macros::{Display, EnumDiscriminants, EnumString};
46
47pub struct SchemaFingerprint {
51 pub bytes: Vec<u8>,
52}
53
54impl fmt::Display for SchemaFingerprint {
55 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
56 write!(
57 f,
58 "{}",
59 self.bytes
60 .iter()
61 .map(|byte| format!("{byte:02x}"))
62 .collect::<Vec<String>>()
63 .join("")
64 )
65 }
66}
67
68#[derive(Clone, Debug, EnumDiscriminants, Display)]
72#[strum_discriminants(name(SchemaKind), derive(Hash, Ord, PartialOrd))]
73pub enum Schema {
74 Null,
76 Boolean,
78 Int,
80 Long,
82 Float,
84 Double,
86 Bytes,
89 String,
92 Array(ArraySchema),
95 Map(MapSchema),
99 Union(UnionSchema),
101 Record(RecordSchema),
103 Enum(EnumSchema),
105 Fixed(FixedSchema),
107 Decimal(DecimalSchema),
110 BigDecimal,
113 Uuid,
115 Date,
118 TimeMillis,
121 TimeMicros,
124 TimestampMillis,
126 TimestampMicros,
128 TimestampNanos,
130 LocalTimestampMillis,
132 LocalTimestampMicros,
134 LocalTimestampNanos,
136 Duration,
138 Ref { name: Name },
140}
141
142#[derive(Clone, Debug, PartialEq)]
143pub struct MapSchema {
144 pub types: Box<Schema>,
145 pub attributes: BTreeMap<String, Value>,
146}
147
148#[derive(Clone, Debug, PartialEq)]
149pub struct ArraySchema {
150 pub items: Box<Schema>,
151 pub attributes: BTreeMap<String, Value>,
152}
153
154impl PartialEq for Schema {
155 fn eq(&self, other: &Self) -> bool {
160 schema_equality::compare_schemata(self, other)
161 }
162}
163
164impl SchemaKind {
165 pub fn is_primitive(self) -> bool {
166 matches!(
167 self,
168 SchemaKind::Null
169 | SchemaKind::Boolean
170 | SchemaKind::Int
171 | SchemaKind::Long
172 | SchemaKind::Double
173 | SchemaKind::Float
174 | SchemaKind::Bytes
175 | SchemaKind::String,
176 )
177 }
178
179 pub fn is_named(self) -> bool {
180 matches!(
181 self,
182 SchemaKind::Record | SchemaKind::Enum | SchemaKind::Fixed | SchemaKind::Ref
183 )
184 }
185}
186
187impl From<&types::Value> for SchemaKind {
188 fn from(value: &types::Value) -> Self {
189 use crate::types::Value;
190 match value {
191 Value::Null => Self::Null,
192 Value::Boolean(_) => Self::Boolean,
193 Value::Int(_) => Self::Int,
194 Value::Long(_) => Self::Long,
195 Value::Float(_) => Self::Float,
196 Value::Double(_) => Self::Double,
197 Value::Bytes(_) => Self::Bytes,
198 Value::String(_) => Self::String,
199 Value::Array(_) => Self::Array,
200 Value::Map(_) => Self::Map,
201 Value::Union(_, _) => Self::Union,
202 Value::Record(_) => Self::Record,
203 Value::Enum(_, _) => Self::Enum,
204 Value::Fixed(_, _) => Self::Fixed,
205 Value::Decimal { .. } => Self::Decimal,
206 Value::BigDecimal(_) => Self::BigDecimal,
207 Value::Uuid(_) => Self::Uuid,
208 Value::Date(_) => Self::Date,
209 Value::TimeMillis(_) => Self::TimeMillis,
210 Value::TimeMicros(_) => Self::TimeMicros,
211 Value::TimestampMillis(_) => Self::TimestampMillis,
212 Value::TimestampMicros(_) => Self::TimestampMicros,
213 Value::TimestampNanos(_) => Self::TimestampNanos,
214 Value::LocalTimestampMillis(_) => Self::LocalTimestampMillis,
215 Value::LocalTimestampMicros(_) => Self::LocalTimestampMicros,
216 Value::LocalTimestampNanos(_) => Self::LocalTimestampNanos,
217 Value::Duration { .. } => Self::Duration,
218 }
219 }
220}
221
222#[derive(Clone, Debug, Hash, PartialEq, Eq)]
233pub struct Name {
234 pub name: String,
235 pub namespace: Namespace,
236}
237
238pub type Documentation = Option<String>;
240pub type Aliases = Option<Vec<Alias>>;
242pub(crate) type Names = HashMap<Name, Schema>;
244pub type NamesRef<'a> = HashMap<Name, &'a Schema>;
246pub type Namespace = Option<String>;
248
249impl Name {
250 pub fn new(name: &str) -> AvroResult<Self> {
254 let (name, namespace) = Name::get_name_and_namespace(name)?;
255 Ok(Self {
256 name,
257 namespace: namespace.filter(|ns| !ns.is_empty()),
258 })
259 }
260
261 fn get_name_and_namespace(name: &str) -> AvroResult<(String, Namespace)> {
262 validate_schema_name(name)
263 }
264
265 pub(crate) fn parse(
267 complex: &Map<String, Value>,
268 enclosing_namespace: &Namespace,
269 ) -> AvroResult<Self> {
270 let (name, namespace_from_name) = complex
271 .name()
272 .map(|name| Name::get_name_and_namespace(name.as_str()).unwrap())
273 .ok_or(Details::GetNameField)?;
274 let type_name = match complex.get("type") {
276 Some(Value::Object(complex_type)) => complex_type.name().or(None),
277 _ => None,
278 };
279
280 let namespace = namespace_from_name
281 .or_else(|| {
282 complex
283 .string("namespace")
284 .or_else(|| enclosing_namespace.clone())
285 })
286 .filter(|ns| !ns.is_empty());
287
288 if let Some(ref ns) = namespace {
289 validate_namespace(ns)?;
290 }
291
292 Ok(Self {
293 name: type_name.unwrap_or(name),
294 namespace,
295 })
296 }
297
298 pub fn fullname(&self, default_namespace: Namespace) -> String {
303 if self.name.contains('.') {
304 self.name.clone()
305 } else {
306 let namespace = self.namespace.clone().or(default_namespace);
307
308 match namespace {
309 Some(ref namespace) if !namespace.is_empty() => {
310 format!("{}.{}", namespace, self.name)
311 }
312 _ => self.name.clone(),
313 }
314 }
315 }
316
317 pub fn fully_qualified_name(&self, enclosing_namespace: &Namespace) -> Name {
331 Name {
332 name: self.name.clone(),
333 namespace: self
334 .namespace
335 .clone()
336 .or_else(|| enclosing_namespace.clone().filter(|ns| !ns.is_empty())),
337 }
338 }
339}
340
341impl From<&str> for Name {
342 fn from(name: &str) -> Self {
343 Name::new(name).unwrap()
344 }
345}
346
347impl fmt::Display for Name {
348 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
349 f.write_str(&self.fullname(None)[..])
350 }
351}
352
353impl<'de> Deserialize<'de> for Name {
354 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
355 where
356 D: serde::de::Deserializer<'de>,
357 {
358 Value::deserialize(deserializer).and_then(|value| {
359 use serde::de::Error;
360 if let Value::Object(json) = value {
361 Name::parse(&json, &None).map_err(Error::custom)
362 } else {
363 Err(Error::custom(format!("Expected a JSON object: {value:?}")))
364 }
365 })
366 }
367}
368
369#[derive(Clone, Debug, Hash, PartialEq, Eq)]
372pub struct Alias(Name);
373
374impl Alias {
375 pub fn new(name: &str) -> AvroResult<Self> {
376 Name::new(name).map(Self)
377 }
378
379 pub fn name(&self) -> String {
380 self.0.name.clone()
381 }
382
383 pub fn namespace(&self) -> Namespace {
384 self.0.namespace.clone()
385 }
386
387 pub fn fullname(&self, default_namespace: Namespace) -> String {
388 self.0.fullname(default_namespace)
389 }
390
391 pub fn fully_qualified_name(&self, default_namespace: &Namespace) -> Name {
392 self.0.fully_qualified_name(default_namespace)
393 }
394}
395
396impl From<&str> for Alias {
397 fn from(name: &str) -> Self {
398 Alias::new(name).unwrap()
399 }
400}
401
402impl Serialize for Alias {
403 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
404 where
405 S: Serializer,
406 {
407 serializer.serialize_str(&self.fullname(None))
408 }
409}
410
411#[derive(Debug)]
412pub struct ResolvedSchema<'s> {
413 names_ref: NamesRef<'s>,
414 schemata: Vec<&'s Schema>,
415}
416
417impl<'s> TryFrom<&'s Schema> for ResolvedSchema<'s> {
418 type Error = Error;
419
420 fn try_from(schema: &'s Schema) -> AvroResult<Self> {
421 let names = HashMap::new();
422 let mut rs = ResolvedSchema {
423 names_ref: names,
424 schemata: vec![schema],
425 };
426 rs.resolve(rs.get_schemata(), &None, None)?;
427 Ok(rs)
428 }
429}
430
431impl<'s> TryFrom<Vec<&'s Schema>> for ResolvedSchema<'s> {
432 type Error = Error;
433
434 fn try_from(schemata: Vec<&'s Schema>) -> AvroResult<Self> {
435 let names = HashMap::new();
436 let mut rs = ResolvedSchema {
437 names_ref: names,
438 schemata,
439 };
440 rs.resolve(rs.get_schemata(), &None, None)?;
441 Ok(rs)
442 }
443}
444
445impl<'s> ResolvedSchema<'s> {
446 pub fn get_schemata(&self) -> Vec<&'s Schema> {
447 self.schemata.clone()
448 }
449
450 pub fn get_names(&self) -> &NamesRef<'s> {
451 &self.names_ref
452 }
453
454 pub fn new_with_known_schemata<'n>(
458 schemata_to_resolve: Vec<&'s Schema>,
459 enclosing_namespace: &Namespace,
460 known_schemata: &'n NamesRef<'n>,
461 ) -> AvroResult<Self> {
462 let names = HashMap::new();
463 let mut rs = ResolvedSchema {
464 names_ref: names,
465 schemata: schemata_to_resolve,
466 };
467 rs.resolve(rs.get_schemata(), enclosing_namespace, Some(known_schemata))?;
468 Ok(rs)
469 }
470
471 fn resolve<'n>(
472 &mut self,
473 schemata: Vec<&'s Schema>,
474 enclosing_namespace: &Namespace,
475 known_schemata: Option<&'n NamesRef<'n>>,
476 ) -> AvroResult<()> {
477 for schema in schemata {
478 match schema {
479 Schema::Array(schema) => {
480 self.resolve(vec![&schema.items], enclosing_namespace, known_schemata)?
481 }
482 Schema::Map(schema) => {
483 self.resolve(vec![&schema.types], enclosing_namespace, known_schemata)?
484 }
485 Schema::Union(UnionSchema { schemas, .. }) => {
486 for schema in schemas {
487 self.resolve(vec![schema], enclosing_namespace, known_schemata)?
488 }
489 }
490 Schema::Enum(EnumSchema { name, .. }) | Schema::Fixed(FixedSchema { name, .. }) => {
491 let fully_qualified_name = name.fully_qualified_name(enclosing_namespace);
492 if self
493 .names_ref
494 .insert(fully_qualified_name.clone(), schema)
495 .is_some()
496 {
497 return Err(Details::AmbiguousSchemaDefinition(fully_qualified_name).into());
498 }
499 }
500 Schema::Record(RecordSchema { name, fields, .. }) => {
501 let fully_qualified_name = name.fully_qualified_name(enclosing_namespace);
502 if self
503 .names_ref
504 .insert(fully_qualified_name.clone(), schema)
505 .is_some()
506 {
507 return Err(Details::AmbiguousSchemaDefinition(fully_qualified_name).into());
508 } else {
509 let record_namespace = fully_qualified_name.namespace;
510 for field in fields {
511 self.resolve(vec![&field.schema], &record_namespace, known_schemata)?
512 }
513 }
514 }
515 Schema::Ref { name } => {
516 let fully_qualified_name = name.fully_qualified_name(enclosing_namespace);
517 if !self.names_ref.contains_key(&fully_qualified_name) {
519 let is_resolved_with_known_schemas = known_schemata
520 .as_ref()
521 .map(|names| names.contains_key(&fully_qualified_name))
522 .unwrap_or(false);
523 if !is_resolved_with_known_schemas {
524 return Err(Details::SchemaResolutionError(fully_qualified_name).into());
525 }
526 }
527 }
528 _ => (),
529 }
530 }
531 Ok(())
532 }
533}
534
535pub(crate) struct ResolvedOwnedSchema {
536 names: Names,
537 root_schema: Schema,
538}
539
540impl TryFrom<Schema> for ResolvedOwnedSchema {
541 type Error = Error;
542
543 fn try_from(schema: Schema) -> AvroResult<Self> {
544 let names = HashMap::new();
545 let mut rs = ResolvedOwnedSchema {
546 names,
547 root_schema: schema,
548 };
549 resolve_names(&rs.root_schema, &mut rs.names, &None)?;
550 Ok(rs)
551 }
552}
553
554impl ResolvedOwnedSchema {
555 pub(crate) fn get_root_schema(&self) -> &Schema {
556 &self.root_schema
557 }
558 pub(crate) fn get_names(&self) -> &Names {
559 &self.names
560 }
561}
562
563pub(crate) fn resolve_names(
564 schema: &Schema,
565 names: &mut Names,
566 enclosing_namespace: &Namespace,
567) -> AvroResult<()> {
568 match schema {
569 Schema::Array(schema) => resolve_names(&schema.items, names, enclosing_namespace),
570 Schema::Map(schema) => resolve_names(&schema.types, names, enclosing_namespace),
571 Schema::Union(UnionSchema { schemas, .. }) => {
572 for schema in schemas {
573 resolve_names(schema, names, enclosing_namespace)?
574 }
575 Ok(())
576 }
577 Schema::Enum(EnumSchema { name, .. }) | Schema::Fixed(FixedSchema { name, .. }) => {
578 let fully_qualified_name = name.fully_qualified_name(enclosing_namespace);
579 if names
580 .insert(fully_qualified_name.clone(), schema.clone())
581 .is_some()
582 {
583 Err(Details::AmbiguousSchemaDefinition(fully_qualified_name).into())
584 } else {
585 Ok(())
586 }
587 }
588 Schema::Record(RecordSchema { name, fields, .. }) => {
589 let fully_qualified_name = name.fully_qualified_name(enclosing_namespace);
590 if names
591 .insert(fully_qualified_name.clone(), schema.clone())
592 .is_some()
593 {
594 Err(Details::AmbiguousSchemaDefinition(fully_qualified_name).into())
595 } else {
596 let record_namespace = fully_qualified_name.namespace;
597 for field in fields {
598 resolve_names(&field.schema, names, &record_namespace)?
599 }
600 Ok(())
601 }
602 }
603 Schema::Ref { name } => {
604 let fully_qualified_name = name.fully_qualified_name(enclosing_namespace);
605 names
606 .get(&fully_qualified_name)
607 .map(|_| ())
608 .ok_or_else(|| Details::SchemaResolutionError(fully_qualified_name).into())
609 }
610 _ => Ok(()),
611 }
612}
613
614pub(crate) fn resolve_names_with_schemata(
615 schemata: &Vec<&Schema>,
616 names: &mut Names,
617 enclosing_namespace: &Namespace,
618) -> AvroResult<()> {
619 for schema in schemata {
620 resolve_names(schema, names, enclosing_namespace)?;
621 }
622 Ok(())
623}
624
625#[derive(bon::Builder, Clone, Debug, PartialEq)]
627pub struct RecordField {
628 pub name: String,
630 #[builder(default)]
632 pub doc: Documentation,
633 pub aliases: Option<Vec<String>>,
635 pub default: Option<Value>,
639 pub schema: Schema,
641 #[builder(default = RecordFieldOrder::Ignore)]
645 pub order: RecordFieldOrder,
646 #[builder(default)]
648 pub position: usize,
649 #[builder(default = BTreeMap::new())]
651 pub custom_attributes: BTreeMap<String, Value>,
652}
653
654#[derive(Clone, Debug, Eq, PartialEq, EnumString)]
656#[strum(serialize_all = "kebab_case")]
657pub enum RecordFieldOrder {
658 Ascending,
659 Descending,
660 Ignore,
661}
662
663impl RecordField {
664 fn parse(
666 field: &Map<String, Value>,
667 position: usize,
668 parser: &mut Parser,
669 enclosing_record: &Name,
670 ) -> AvroResult<Self> {
671 let name = field.name().ok_or(Details::GetNameFieldFromRecord)?;
672
673 validate_record_field_name(&name)?;
674
675 let schema = parser.parse_complex(
677 field,
678 &enclosing_record.namespace,
679 RecordSchemaParseLocation::FromField,
680 )?;
681
682 let default = field.get("default").cloned();
683 Self::resolve_default_value(
684 &schema,
685 &name,
686 &enclosing_record.fullname(None),
687 &parser.parsed_schemas,
688 &default,
689 )?;
690
691 let aliases = field.get("aliases").and_then(|aliases| {
692 aliases.as_array().map(|aliases| {
693 aliases
694 .iter()
695 .flat_map(|alias| alias.as_str())
696 .map(|alias| alias.to_string())
697 .collect::<Vec<String>>()
698 })
699 });
700
701 let order = field
702 .get("order")
703 .and_then(|order| order.as_str())
704 .and_then(|order| RecordFieldOrder::from_str(order).ok())
705 .unwrap_or(RecordFieldOrder::Ascending);
706
707 Ok(RecordField {
708 name,
709 doc: field.doc(),
710 default,
711 aliases,
712 order,
713 position,
714 custom_attributes: RecordField::get_field_custom_attributes(field, &schema),
715 schema,
716 })
717 }
718
719 fn resolve_default_value(
720 field_schema: &Schema,
721 field_name: &str,
722 record_name: &str,
723 names: &Names,
724 default: &Option<Value>,
725 ) -> AvroResult<()> {
726 if let Some(value) = default {
727 let avro_value = types::Value::from(value.clone());
728 match field_schema {
729 Schema::Union(union_schema) => {
730 let schemas = &union_schema.schemas;
731 let resolved = schemas.iter().any(|schema| {
732 avro_value
733 .to_owned()
734 .resolve_internal(schema, names, &schema.namespace(), &None)
735 .is_ok()
736 });
737
738 if !resolved {
739 let schema: Option<&Schema> = schemas.first();
740 return match schema {
741 Some(first_schema) => Err(Details::GetDefaultUnion(
742 SchemaKind::from(first_schema),
743 types::ValueKind::from(avro_value),
744 )
745 .into()),
746 None => Err(Details::EmptyUnion.into()),
747 };
748 }
749 }
750 _ => {
751 let resolved = avro_value
752 .resolve_internal(field_schema, names, &field_schema.namespace(), &None)
753 .is_ok();
754
755 if !resolved {
756 return Err(Details::GetDefaultRecordField(
757 field_name.to_string(),
758 record_name.to_string(),
759 field_schema.canonical_form(),
760 )
761 .into());
762 }
763 }
764 };
765 }
766
767 Ok(())
768 }
769
770 fn get_field_custom_attributes(
771 field: &Map<String, Value>,
772 schema: &Schema,
773 ) -> BTreeMap<String, Value> {
774 let mut custom_attributes: BTreeMap<String, Value> = BTreeMap::new();
775 for (key, value) in field {
776 match key.as_str() {
777 "type" | "name" | "doc" | "default" | "order" | "position" | "aliases"
778 | "logicalType" => continue,
779 key if key == "symbols" && matches!(schema, Schema::Enum(_)) => continue,
780 key if key == "size" && matches!(schema, Schema::Fixed(_)) => continue,
781 key if key == "items" && matches!(schema, Schema::Array(_)) => continue,
782 key if key == "values" && matches!(schema, Schema::Map(_)) => continue,
783 _ => custom_attributes.insert(key.clone(), value.clone()),
784 };
785 }
786 custom_attributes
787 }
788
789 pub fn is_nullable(&self) -> bool {
791 match self.schema {
792 Schema::Union(ref inner) => inner.is_nullable(),
793 _ => false,
794 }
795 }
796}
797
798#[derive(bon::Builder, Debug, Clone)]
800pub struct RecordSchema {
801 pub name: Name,
803 #[builder(default)]
805 pub aliases: Aliases,
806 #[builder(default)]
808 pub doc: Documentation,
809 pub fields: Vec<RecordField>,
811 pub lookup: BTreeMap<String, usize>,
814 #[builder(default = BTreeMap::new())]
816 pub attributes: BTreeMap<String, Value>,
817}
818
819#[derive(bon::Builder, Debug, Clone)]
821pub struct EnumSchema {
822 pub name: Name,
824 #[builder(default)]
826 pub aliases: Aliases,
827 #[builder(default)]
829 pub doc: Documentation,
830 pub symbols: Vec<String>,
832 pub default: Option<String>,
834 #[builder(default = BTreeMap::new())]
836 pub attributes: BTreeMap<String, Value>,
837}
838
839#[derive(bon::Builder, Debug, Clone)]
841pub struct FixedSchema {
842 pub name: Name,
844 #[builder(default)]
846 pub aliases: Aliases,
847 #[builder(default)]
849 pub doc: Documentation,
850 pub size: usize,
852 pub default: Option<String>,
854 #[builder(default = BTreeMap::new())]
856 pub attributes: BTreeMap<String, Value>,
857}
858
859impl FixedSchema {
860 fn serialize_to_map<S>(&self, mut map: S::SerializeMap) -> Result<S::SerializeMap, S::Error>
861 where
862 S: Serializer,
863 {
864 map.serialize_entry("type", "fixed")?;
865 if let Some(ref n) = self.name.namespace {
866 map.serialize_entry("namespace", n)?;
867 }
868 map.serialize_entry("name", &self.name.name)?;
869 if let Some(ref docstr) = self.doc {
870 map.serialize_entry("doc", docstr)?;
871 }
872 map.serialize_entry("size", &self.size)?;
873
874 if let Some(ref aliases) = self.aliases {
875 map.serialize_entry("aliases", aliases)?;
876 }
877
878 for attr in &self.attributes {
879 map.serialize_entry(attr.0, attr.1)?;
880 }
881
882 Ok(map)
883 }
884}
885
886#[derive(Debug, Clone)]
891pub struct DecimalSchema {
892 pub precision: DecimalMetadata,
894 pub scale: DecimalMetadata,
896 pub inner: Box<Schema>,
898}
899
900#[derive(Debug, Clone)]
902pub struct UnionSchema {
903 pub(crate) schemas: Vec<Schema>,
905 variant_index: BTreeMap<SchemaKind, usize>,
910}
911
912impl UnionSchema {
913 pub fn new(schemas: Vec<Schema>) -> AvroResult<Self> {
915 let mut vindex = BTreeMap::new();
916 for (i, schema) in schemas.iter().enumerate() {
917 if let Schema::Union(_) = schema {
918 return Err(Details::GetNestedUnion.into());
919 }
920 let kind = SchemaKind::from(schema);
921 if !kind.is_named() && vindex.insert(kind, i).is_some() {
922 return Err(Details::GetUnionDuplicate.into());
923 }
924 }
925 Ok(UnionSchema {
926 schemas,
927 variant_index: vindex,
928 })
929 }
930
931 pub fn variants(&self) -> &[Schema] {
933 &self.schemas
934 }
935
936 pub fn is_nullable(&self) -> bool {
938 self.schemas.iter().any(|x| matches!(x, Schema::Null))
939 }
940
941 pub fn find_schema_with_known_schemata<S: Borrow<Schema> + Debug>(
947 &self,
948 value: &types::Value,
949 known_schemata: Option<&HashMap<Name, S>>,
950 enclosing_namespace: &Namespace,
951 ) -> Option<(usize, &Schema)> {
952 let schema_kind = SchemaKind::from(value);
953 if let Some(&i) = self.variant_index.get(&schema_kind) {
954 Some((i, &self.schemas[i]))
956 } else {
957 let mut collected_names: HashMap<Name, &Schema> = known_schemata
961 .map(|names| {
962 names
963 .iter()
964 .map(|(name, schema)| (name.clone(), schema.borrow()))
965 .collect()
966 })
967 .unwrap_or_default();
968
969 self.schemas.iter().enumerate().find(|(_, schema)| {
970 let resolved_schema = ResolvedSchema::new_with_known_schemata(
971 vec![*schema],
972 enclosing_namespace,
973 &collected_names,
974 )
975 .expect("Schema didn't successfully parse");
976 let resolved_names = resolved_schema.names_ref;
977
978 collected_names.extend(resolved_names);
980 let namespace = &schema.namespace().or_else(|| enclosing_namespace.clone());
981
982 value
983 .clone()
984 .resolve_internal(schema, &collected_names, namespace, &None)
985 .is_ok()
986 })
987 }
988 }
989}
990
991impl PartialEq for UnionSchema {
993 fn eq(&self, other: &UnionSchema) -> bool {
994 self.schemas.eq(&other.schemas)
995 }
996}
997
998type DecimalMetadata = usize;
999pub(crate) type Precision = DecimalMetadata;
1000pub(crate) type Scale = DecimalMetadata;
1001
1002fn parse_json_integer_for_decimal(value: &serde_json::Number) -> Result<DecimalMetadata, Error> {
1003 Ok(if value.is_u64() {
1004 let num = value
1005 .as_u64()
1006 .ok_or_else(|| Details::GetU64FromJson(value.clone()))?;
1007 num.try_into()
1008 .map_err(|e| Details::ConvertU64ToUsize(e, num))?
1009 } else if value.is_i64() {
1010 let num = value
1011 .as_i64()
1012 .ok_or_else(|| Details::GetI64FromJson(value.clone()))?;
1013 num.try_into()
1014 .map_err(|e| Details::ConvertI64ToUsize(e, num))?
1015 } else {
1016 return Err(Details::GetPrecisionOrScaleFromJson(value.clone()).into());
1017 })
1018}
1019
1020#[derive(Debug, Default)]
1021enum RecordSchemaParseLocation {
1022 #[default]
1024 Root,
1025
1026 FromField,
1028}
1029
1030#[derive(Default)]
1031struct Parser {
1032 input_schemas: HashMap<Name, Value>,
1033 resolving_schemas: Names,
1037 input_order: Vec<Name>,
1038 parsed_schemas: Names,
1041}
1042
1043impl Schema {
1044 pub fn canonical_form(&self) -> String {
1049 let json = serde_json::to_value(self)
1050 .unwrap_or_else(|e| panic!("Cannot parse Schema from JSON: {e}"));
1051 let mut defined_names = HashSet::new();
1052 parsing_canonical_form(&json, &mut defined_names)
1053 }
1054
1055 pub fn independent_canonical_form(&self, schemata: &[Schema]) -> Result<String, Error> {
1061 let mut this = self.clone();
1062 this.denormalize(schemata)?;
1063 Ok(this.canonical_form())
1064 }
1065
1066 pub fn fingerprint<D: Digest>(&self) -> SchemaFingerprint {
1073 let mut d = D::new();
1074 d.update(self.canonical_form());
1075 SchemaFingerprint {
1076 bytes: d.finalize().to_vec(),
1077 }
1078 }
1079
1080 pub fn parse_str(input: &str) -> Result<Schema, Error> {
1082 let mut parser = Parser::default();
1083 parser.parse_str(input)
1084 }
1085
1086 pub fn parse_list(input: impl IntoIterator<Item = impl AsRef<str>>) -> AvroResult<Vec<Schema>> {
1094 let input = input.into_iter();
1095 let input_len = input.size_hint().0;
1096 let mut input_schemas: HashMap<Name, Value> = HashMap::with_capacity(input_len);
1097 let mut input_order: Vec<Name> = Vec::with_capacity(input_len);
1098 for json in input {
1099 let json = json.as_ref();
1100 let schema: Value = serde_json::from_str(json).map_err(Details::ParseSchemaJson)?;
1101 if let Value::Object(inner) = &schema {
1102 let name = Name::parse(inner, &None)?;
1103 let previous_value = input_schemas.insert(name.clone(), schema);
1104 if previous_value.is_some() {
1105 return Err(Details::NameCollision(name.fullname(None)).into());
1106 }
1107 input_order.push(name);
1108 } else {
1109 return Err(Details::GetNameField.into());
1110 }
1111 }
1112 let mut parser = Parser {
1113 input_schemas,
1114 resolving_schemas: HashMap::default(),
1115 input_order,
1116 parsed_schemas: HashMap::with_capacity(input_len),
1117 };
1118 parser.parse_list()
1119 }
1120
1121 pub fn parse_str_with_list(
1134 schema: &str,
1135 schemata: impl IntoIterator<Item = impl AsRef<str>>,
1136 ) -> AvroResult<(Schema, Vec<Schema>)> {
1137 let schemata = schemata.into_iter();
1138 let schemata_len = schemata.size_hint().0;
1139 let mut input_schemas: HashMap<Name, Value> = HashMap::with_capacity(schemata_len);
1140 let mut input_order: Vec<Name> = Vec::with_capacity(schemata_len);
1141 for json in schemata {
1142 let json = json.as_ref();
1143 let schema: Value = serde_json::from_str(json).map_err(Details::ParseSchemaJson)?;
1144 if let Value::Object(inner) = &schema {
1145 let name = Name::parse(inner, &None)?;
1146 if let Some(_previous) = input_schemas.insert(name.clone(), schema) {
1147 return Err(Details::NameCollision(name.fullname(None)).into());
1148 }
1149 input_order.push(name);
1150 } else {
1151 return Err(Details::GetNameField.into());
1152 }
1153 }
1154 let mut parser = Parser {
1155 input_schemas,
1156 resolving_schemas: HashMap::default(),
1157 input_order,
1158 parsed_schemas: HashMap::with_capacity(schemata_len),
1159 };
1160 parser.parse_input_schemas()?;
1161
1162 let value = serde_json::from_str(schema).map_err(Details::ParseSchemaJson)?;
1163 let schema = parser.parse(&value, &None)?;
1164 let schemata = parser.parse_list()?;
1165 Ok((schema, schemata))
1166 }
1167
1168 pub fn parse_reader(reader: &mut (impl Read + ?Sized)) -> AvroResult<Schema> {
1170 let mut buf = String::new();
1171 match reader.read_to_string(&mut buf) {
1172 Ok(_) => Self::parse_str(&buf),
1173 Err(e) => Err(Details::ReadSchemaFromReader(e).into()),
1174 }
1175 }
1176
1177 pub fn parse(value: &Value) -> AvroResult<Schema> {
1179 let mut parser = Parser::default();
1180 parser.parse(value, &None)
1181 }
1182
1183 pub(crate) fn parse_with_names(value: &Value, names: Names) -> AvroResult<Schema> {
1186 let mut parser = Parser {
1187 input_schemas: HashMap::with_capacity(1),
1188 resolving_schemas: Names::default(),
1189 input_order: Vec::with_capacity(1),
1190 parsed_schemas: names,
1191 };
1192 parser.parse(value, &None)
1193 }
1194
1195 pub fn custom_attributes(&self) -> Option<&BTreeMap<String, Value>> {
1197 match self {
1198 Schema::Record(RecordSchema { attributes, .. })
1199 | Schema::Enum(EnumSchema { attributes, .. })
1200 | Schema::Fixed(FixedSchema { attributes, .. })
1201 | Schema::Array(ArraySchema { attributes, .. })
1202 | Schema::Map(MapSchema { attributes, .. }) => Some(attributes),
1203 _ => None,
1204 }
1205 }
1206
1207 pub fn name(&self) -> Option<&Name> {
1209 match self {
1210 Schema::Ref { name, .. }
1211 | Schema::Record(RecordSchema { name, .. })
1212 | Schema::Enum(EnumSchema { name, .. })
1213 | Schema::Fixed(FixedSchema { name, .. }) => Some(name),
1214 _ => None,
1215 }
1216 }
1217
1218 pub fn namespace(&self) -> Namespace {
1220 self.name().and_then(|n| n.namespace.clone())
1221 }
1222
1223 pub fn aliases(&self) -> Option<&Vec<Alias>> {
1225 match self {
1226 Schema::Record(RecordSchema { aliases, .. })
1227 | Schema::Enum(EnumSchema { aliases, .. })
1228 | Schema::Fixed(FixedSchema { aliases, .. }) => aliases.as_ref(),
1229 _ => None,
1230 }
1231 }
1232
1233 pub fn doc(&self) -> Option<&String> {
1235 match self {
1236 Schema::Record(RecordSchema { doc, .. })
1237 | Schema::Enum(EnumSchema { doc, .. })
1238 | Schema::Fixed(FixedSchema { doc, .. }) => doc.as_ref(),
1239 _ => None,
1240 }
1241 }
1242
1243 pub fn map(types: Schema) -> Self {
1245 Schema::Map(MapSchema {
1246 types: Box::new(types),
1247 attributes: Default::default(),
1248 })
1249 }
1250
1251 pub fn map_with_attributes(types: Schema, attributes: BTreeMap<String, Value>) -> Self {
1253 Schema::Map(MapSchema {
1254 types: Box::new(types),
1255 attributes,
1256 })
1257 }
1258
1259 pub fn array(items: Schema) -> Self {
1261 Schema::Array(ArraySchema {
1262 items: Box::new(items),
1263 attributes: Default::default(),
1264 })
1265 }
1266
1267 pub fn array_with_attributes(items: Schema, attributes: BTreeMap<String, Value>) -> Self {
1269 Schema::Array(ArraySchema {
1270 items: Box::new(items),
1271 attributes,
1272 })
1273 }
1274
1275 fn denormalize(&mut self, schemata: &[Schema]) -> AvroResult<()> {
1276 match self {
1277 Schema::Ref { name } => {
1278 let replacement_schema = schemata
1279 .iter()
1280 .find(|s| s.name().map(|n| *n == *name).unwrap_or(false));
1281 if let Some(schema) = replacement_schema {
1282 let mut denorm = schema.clone();
1283 denorm.denormalize(schemata)?;
1284 *self = denorm;
1285 } else {
1286 return Err(Details::SchemaResolutionError(name.clone()).into());
1287 }
1288 }
1289 Schema::Record(record_schema) => {
1290 for field in &mut record_schema.fields {
1291 field.schema.denormalize(schemata)?;
1292 }
1293 }
1294 Schema::Array(array_schema) => {
1295 array_schema.items.denormalize(schemata)?;
1296 }
1297 Schema::Map(map_schema) => {
1298 map_schema.types.denormalize(schemata)?;
1299 }
1300 Schema::Union(union_schema) => {
1301 for schema in &mut union_schema.schemas {
1302 schema.denormalize(schemata)?;
1303 }
1304 }
1305 _ => (),
1306 }
1307 Ok(())
1308 }
1309}
1310
1311impl Parser {
1312 fn parse_str(&mut self, input: &str) -> Result<Schema, Error> {
1314 let value = serde_json::from_str(input).map_err(Details::ParseSchemaJson)?;
1315 self.parse(&value, &None)
1316 }
1317
1318 fn parse_list(&mut self) -> Result<Vec<Schema>, Error> {
1321 self.parse_input_schemas()?;
1322
1323 let mut parsed_schemas = Vec::with_capacity(self.parsed_schemas.len());
1324 for name in self.input_order.drain(0..) {
1325 let parsed = self
1326 .parsed_schemas
1327 .remove(&name)
1328 .expect("One of the input schemas was unexpectedly not parsed");
1329 parsed_schemas.push(parsed);
1330 }
1331 Ok(parsed_schemas)
1332 }
1333
1334 fn parse_input_schemas(&mut self) -> Result<(), Error> {
1336 while !self.input_schemas.is_empty() {
1337 let next_name = self
1338 .input_schemas
1339 .keys()
1340 .next()
1341 .expect("Input schemas unexpectedly empty")
1342 .to_owned();
1343 let (name, value) = self
1344 .input_schemas
1345 .remove_entry(&next_name)
1346 .expect("Key unexpectedly missing");
1347 let parsed = self.parse(&value, &None)?;
1348 self.parsed_schemas
1349 .insert(get_schema_type_name(name, value), parsed);
1350 }
1351 Ok(())
1352 }
1353
1354 fn parse(&mut self, value: &Value, enclosing_namespace: &Namespace) -> AvroResult<Schema> {
1357 match *value {
1358 Value::String(ref t) => self.parse_known_schema(t.as_str(), enclosing_namespace),
1359 Value::Object(ref data) => {
1360 self.parse_complex(data, enclosing_namespace, RecordSchemaParseLocation::Root)
1361 }
1362 Value::Array(ref data) => self.parse_union(data, enclosing_namespace),
1363 _ => Err(Details::ParseSchemaFromValidJson.into()),
1364 }
1365 }
1366
1367 fn parse_known_schema(
1371 &mut self,
1372 name: &str,
1373 enclosing_namespace: &Namespace,
1374 ) -> AvroResult<Schema> {
1375 match name {
1376 "null" => Ok(Schema::Null),
1377 "boolean" => Ok(Schema::Boolean),
1378 "int" => Ok(Schema::Int),
1379 "long" => Ok(Schema::Long),
1380 "double" => Ok(Schema::Double),
1381 "float" => Ok(Schema::Float),
1382 "bytes" => Ok(Schema::Bytes),
1383 "string" => Ok(Schema::String),
1384 _ => self.fetch_schema_ref(name, enclosing_namespace),
1385 }
1386 }
1387
1388 fn fetch_schema_ref(
1398 &mut self,
1399 name: &str,
1400 enclosing_namespace: &Namespace,
1401 ) -> AvroResult<Schema> {
1402 fn get_schema_ref(parsed: &Schema) -> Schema {
1403 match parsed {
1404 &Schema::Record(RecordSchema { ref name, .. })
1405 | &Schema::Enum(EnumSchema { ref name, .. })
1406 | &Schema::Fixed(FixedSchema { ref name, .. }) => {
1407 Schema::Ref { name: name.clone() }
1408 }
1409 _ => parsed.clone(),
1410 }
1411 }
1412
1413 let name = Name::new(name)?;
1414 let fully_qualified_name = name.fully_qualified_name(enclosing_namespace);
1415
1416 if self.parsed_schemas.contains_key(&fully_qualified_name) {
1417 return Ok(Schema::Ref {
1418 name: fully_qualified_name,
1419 });
1420 }
1421 if let Some(resolving_schema) = self.resolving_schemas.get(&fully_qualified_name) {
1422 return Ok(resolving_schema.clone());
1423 }
1424
1425 match name.name.as_str() {
1427 "record" | "enum" | "fixed" => {
1428 return Err(Details::InvalidSchemaRecord(name.to_string()).into());
1429 }
1430 _ => (),
1431 }
1432
1433 let value = self
1434 .input_schemas
1435 .remove(&fully_qualified_name)
1436 .ok_or_else(|| Details::ParsePrimitive(fully_qualified_name.fullname(None)))?;
1438
1439 let parsed = self.parse(&value, &None)?;
1441 self.parsed_schemas
1442 .insert(get_schema_type_name(name, value), parsed.clone());
1443
1444 Ok(get_schema_ref(&parsed))
1445 }
1446
1447 fn parse_precision_and_scale(
1448 complex: &Map<String, Value>,
1449 ) -> Result<(Precision, Scale), Error> {
1450 fn get_decimal_integer(
1451 complex: &Map<String, Value>,
1452 key: &'static str,
1453 ) -> Result<DecimalMetadata, Error> {
1454 match complex.get(key) {
1455 Some(Value::Number(value)) => parse_json_integer_for_decimal(value),
1456 None => {
1457 if key == "scale" {
1458 Ok(0)
1459 } else {
1460 Err(Details::GetDecimalMetadataFromJson(key).into())
1461 }
1462 }
1463 Some(value) => Err(Details::GetDecimalMetadataValueFromJson {
1464 key: key.into(),
1465 value: value.clone(),
1466 }
1467 .into()),
1468 }
1469 }
1470 let precision = get_decimal_integer(complex, "precision")?;
1471 let scale = get_decimal_integer(complex, "scale")?;
1472
1473 if precision < 1 {
1474 return Err(Details::DecimalPrecisionMuBePositive { precision }.into());
1475 }
1476
1477 if precision < scale {
1478 Err(Details::DecimalPrecisionLessThanScale { precision, scale }.into())
1479 } else {
1480 Ok((precision, scale))
1481 }
1482 }
1483
1484 fn parse_complex(
1490 &mut self,
1491 complex: &Map<String, Value>,
1492 enclosing_namespace: &Namespace,
1493 parse_location: RecordSchemaParseLocation,
1494 ) -> AvroResult<Schema> {
1495 fn parse_as_native_complex(
1497 complex: &Map<String, Value>,
1498 parser: &mut Parser,
1499 enclosing_namespace: &Namespace,
1500 ) -> AvroResult<Schema> {
1501 match complex.get("type") {
1502 Some(value) => match value {
1503 Value::String(s) if s == "fixed" => {
1504 parser.parse_fixed(complex, enclosing_namespace)
1505 }
1506 _ => parser.parse(value, enclosing_namespace),
1507 },
1508 None => Err(Details::GetLogicalTypeField.into()),
1509 }
1510 }
1511
1512 fn try_convert_to_logical_type<F>(
1519 logical_type: &str,
1520 schema: Schema,
1521 supported_schema_kinds: &[SchemaKind],
1522 convert: F,
1523 ) -> AvroResult<Schema>
1524 where
1525 F: Fn(Schema) -> AvroResult<Schema>,
1526 {
1527 let kind = SchemaKind::from(schema.clone());
1528 if supported_schema_kinds.contains(&kind) {
1529 convert(schema)
1530 } else {
1531 warn!(
1532 "Ignoring unknown logical type '{logical_type}' for schema of type: {schema:?}!"
1533 );
1534 Ok(schema)
1535 }
1536 }
1537
1538 match complex.get("logicalType") {
1539 Some(Value::String(t)) => match t.as_str() {
1540 "decimal" => {
1541 return try_convert_to_logical_type(
1542 "decimal",
1543 parse_as_native_complex(complex, self, enclosing_namespace)?,
1544 &[SchemaKind::Fixed, SchemaKind::Bytes],
1545 |inner| -> AvroResult<Schema> {
1546 match Self::parse_precision_and_scale(complex) {
1547 Ok((precision, scale)) => Ok(Schema::Decimal(DecimalSchema {
1548 precision,
1549 scale,
1550 inner: Box::new(inner),
1551 })),
1552 Err(err) => {
1553 warn!("Ignoring invalid decimal logical type: {err}");
1554 Ok(inner)
1555 }
1556 }
1557 },
1558 );
1559 }
1560 "big-decimal" => {
1561 return try_convert_to_logical_type(
1562 "big-decimal",
1563 parse_as_native_complex(complex, self, enclosing_namespace)?,
1564 &[SchemaKind::Bytes],
1565 |_| -> AvroResult<Schema> { Ok(Schema::BigDecimal) },
1566 );
1567 }
1568 "uuid" => {
1569 return try_convert_to_logical_type(
1570 "uuid",
1571 parse_as_native_complex(complex, self, enclosing_namespace)?,
1572 &[SchemaKind::String, SchemaKind::Fixed],
1573 |schema| match schema {
1574 Schema::String => Ok(Schema::Uuid),
1575 Schema::Fixed(FixedSchema { size: 16, .. }) => Ok(Schema::Uuid),
1576 Schema::Fixed(FixedSchema { size, .. }) => {
1577 warn!(
1578 "Ignoring uuid logical type for a Fixed schema because its size ({size:?}) is not 16! Schema: {schema:?}"
1579 );
1580 Ok(schema)
1581 }
1582 _ => {
1583 warn!("Ignoring invalid uuid logical type for schema: {schema:?}");
1584 Ok(schema)
1585 }
1586 },
1587 );
1588 }
1589 "date" => {
1590 return try_convert_to_logical_type(
1591 "date",
1592 parse_as_native_complex(complex, self, enclosing_namespace)?,
1593 &[SchemaKind::Int],
1594 |_| -> AvroResult<Schema> { Ok(Schema::Date) },
1595 );
1596 }
1597 "time-millis" => {
1598 return try_convert_to_logical_type(
1599 "date",
1600 parse_as_native_complex(complex, self, enclosing_namespace)?,
1601 &[SchemaKind::Int],
1602 |_| -> AvroResult<Schema> { Ok(Schema::TimeMillis) },
1603 );
1604 }
1605 "time-micros" => {
1606 return try_convert_to_logical_type(
1607 "time-micros",
1608 parse_as_native_complex(complex, self, enclosing_namespace)?,
1609 &[SchemaKind::Long],
1610 |_| -> AvroResult<Schema> { Ok(Schema::TimeMicros) },
1611 );
1612 }
1613 "timestamp-millis" => {
1614 return try_convert_to_logical_type(
1615 "timestamp-millis",
1616 parse_as_native_complex(complex, self, enclosing_namespace)?,
1617 &[SchemaKind::Long],
1618 |_| -> AvroResult<Schema> { Ok(Schema::TimestampMillis) },
1619 );
1620 }
1621 "timestamp-micros" => {
1622 return try_convert_to_logical_type(
1623 "timestamp-micros",
1624 parse_as_native_complex(complex, self, enclosing_namespace)?,
1625 &[SchemaKind::Long],
1626 |_| -> AvroResult<Schema> { Ok(Schema::TimestampMicros) },
1627 );
1628 }
1629 "timestamp-nanos" => {
1630 return try_convert_to_logical_type(
1631 "timestamp-nanos",
1632 parse_as_native_complex(complex, self, enclosing_namespace)?,
1633 &[SchemaKind::Long],
1634 |_| -> AvroResult<Schema> { Ok(Schema::TimestampNanos) },
1635 );
1636 }
1637 "local-timestamp-millis" => {
1638 return try_convert_to_logical_type(
1639 "local-timestamp-millis",
1640 parse_as_native_complex(complex, self, enclosing_namespace)?,
1641 &[SchemaKind::Long],
1642 |_| -> AvroResult<Schema> { Ok(Schema::LocalTimestampMillis) },
1643 );
1644 }
1645 "local-timestamp-micros" => {
1646 return try_convert_to_logical_type(
1647 "local-timestamp-micros",
1648 parse_as_native_complex(complex, self, enclosing_namespace)?,
1649 &[SchemaKind::Long],
1650 |_| -> AvroResult<Schema> { Ok(Schema::LocalTimestampMicros) },
1651 );
1652 }
1653 "local-timestamp-nanos" => {
1654 return try_convert_to_logical_type(
1655 "local-timestamp-nanos",
1656 parse_as_native_complex(complex, self, enclosing_namespace)?,
1657 &[SchemaKind::Long],
1658 |_| -> AvroResult<Schema> { Ok(Schema::LocalTimestampNanos) },
1659 );
1660 }
1661 "duration" => {
1662 return try_convert_to_logical_type(
1663 "duration",
1664 parse_as_native_complex(complex, self, enclosing_namespace)?,
1665 &[SchemaKind::Fixed],
1666 |_| -> AvroResult<Schema> { Ok(Schema::Duration) },
1667 );
1668 }
1669 _ => {}
1672 },
1673 Some(value) => return Err(Details::GetLogicalTypeFieldType(value.clone()).into()),
1677 _ => {}
1678 }
1679 match complex.get("type") {
1680 Some(Value::String(t)) => match t.as_str() {
1681 "record" => match parse_location {
1682 RecordSchemaParseLocation::Root => {
1683 self.parse_record(complex, enclosing_namespace)
1684 }
1685 RecordSchemaParseLocation::FromField => {
1686 self.fetch_schema_ref(t, enclosing_namespace)
1687 }
1688 },
1689 "enum" => self.parse_enum(complex, enclosing_namespace),
1690 "array" => self.parse_array(complex, enclosing_namespace),
1691 "map" => self.parse_map(complex, enclosing_namespace),
1692 "fixed" => self.parse_fixed(complex, enclosing_namespace),
1693 other => self.parse_known_schema(other, enclosing_namespace),
1694 },
1695 Some(Value::Object(data)) => {
1696 self.parse_complex(data, enclosing_namespace, RecordSchemaParseLocation::Root)
1697 }
1698 Some(Value::Array(variants)) => self.parse_union(variants, enclosing_namespace),
1699 Some(unknown) => Err(Details::GetComplexType(unknown.clone()).into()),
1700 None => Err(Details::GetComplexTypeField.into()),
1701 }
1702 }
1703
1704 fn register_resolving_schema(&mut self, name: &Name, aliases: &Aliases) {
1705 let resolving_schema = Schema::Ref { name: name.clone() };
1706 self.resolving_schemas
1707 .insert(name.clone(), resolving_schema.clone());
1708
1709 let namespace = &name.namespace;
1710
1711 if let Some(aliases) = aliases {
1712 aliases.iter().for_each(|alias| {
1713 let alias_fullname = alias.fully_qualified_name(namespace);
1714 self.resolving_schemas
1715 .insert(alias_fullname, resolving_schema.clone());
1716 });
1717 }
1718 }
1719
1720 fn register_parsed_schema(
1721 &mut self,
1722 fully_qualified_name: &Name,
1723 schema: &Schema,
1724 aliases: &Aliases,
1725 ) {
1726 self.parsed_schemas
1729 .insert(fully_qualified_name.clone(), schema.clone());
1730 self.resolving_schemas.remove(fully_qualified_name);
1731
1732 let namespace = &fully_qualified_name.namespace;
1733
1734 if let Some(aliases) = aliases {
1735 aliases.iter().for_each(|alias| {
1736 let alias_fullname = alias.fully_qualified_name(namespace);
1737 self.resolving_schemas.remove(&alias_fullname);
1738 self.parsed_schemas.insert(alias_fullname, schema.clone());
1739 });
1740 }
1741 }
1742
1743 fn get_already_seen_schema(
1745 &self,
1746 complex: &Map<String, Value>,
1747 enclosing_namespace: &Namespace,
1748 ) -> Option<&Schema> {
1749 match complex.get("type") {
1750 Some(Value::String(typ)) => {
1751 let name = Name::new(typ.as_str())
1752 .unwrap()
1753 .fully_qualified_name(enclosing_namespace);
1754 self.resolving_schemas
1755 .get(&name)
1756 .or_else(|| self.parsed_schemas.get(&name))
1757 }
1758 _ => None,
1759 }
1760 }
1761
1762 fn parse_record(
1765 &mut self,
1766 complex: &Map<String, Value>,
1767 enclosing_namespace: &Namespace,
1768 ) -> AvroResult<Schema> {
1769 let fields_opt = complex.get("fields");
1770
1771 if fields_opt.is_none() {
1772 if let Some(seen) = self.get_already_seen_schema(complex, enclosing_namespace) {
1773 return Ok(seen.clone());
1774 }
1775 }
1776
1777 let fully_qualified_name = Name::parse(complex, enclosing_namespace)?;
1778 let aliases = fix_aliases_namespace(complex.aliases(), &fully_qualified_name.namespace);
1779
1780 let mut lookup = BTreeMap::new();
1781
1782 self.register_resolving_schema(&fully_qualified_name, &aliases);
1783
1784 debug!("Going to parse record schema: {:?}", &fully_qualified_name);
1785
1786 let fields: Vec<RecordField> = fields_opt
1787 .and_then(|fields| fields.as_array())
1788 .ok_or_else(|| Error::new(Details::GetRecordFieldsJson))
1789 .and_then(|fields| {
1790 fields
1791 .iter()
1792 .filter_map(|field| field.as_object())
1793 .enumerate()
1794 .map(|(position, field)| {
1795 RecordField::parse(field, position, self, &fully_qualified_name)
1796 })
1797 .collect::<Result<_, _>>()
1798 })?;
1799
1800 for field in &fields {
1801 if let Some(_old) = lookup.insert(field.name.clone(), field.position) {
1802 return Err(Details::FieldNameDuplicate(field.name.clone()).into());
1803 }
1804
1805 if let Some(ref field_aliases) = field.aliases {
1806 for alias in field_aliases {
1807 lookup.insert(alias.clone(), field.position);
1808 }
1809 }
1810 }
1811
1812 let schema = Schema::Record(RecordSchema {
1813 name: fully_qualified_name.clone(),
1814 aliases: aliases.clone(),
1815 doc: complex.doc(),
1816 fields,
1817 lookup,
1818 attributes: self.get_custom_attributes(complex, vec!["fields"]),
1819 });
1820
1821 self.register_parsed_schema(&fully_qualified_name, &schema, &aliases);
1822 Ok(schema)
1823 }
1824
1825 fn get_custom_attributes(
1826 &self,
1827 complex: &Map<String, Value>,
1828 excluded: Vec<&'static str>,
1829 ) -> BTreeMap<String, Value> {
1830 let mut custom_attributes: BTreeMap<String, Value> = BTreeMap::new();
1831 for (key, value) in complex {
1832 match key.as_str() {
1833 "type" | "name" | "namespace" | "doc" | "aliases" => continue,
1834 candidate if excluded.contains(&candidate) => continue,
1835 _ => custom_attributes.insert(key.clone(), value.clone()),
1836 };
1837 }
1838 custom_attributes
1839 }
1840
1841 fn parse_enum(
1844 &mut self,
1845 complex: &Map<String, Value>,
1846 enclosing_namespace: &Namespace,
1847 ) -> AvroResult<Schema> {
1848 let symbols_opt = complex.get("symbols");
1849
1850 if symbols_opt.is_none() {
1851 if let Some(seen) = self.get_already_seen_schema(complex, enclosing_namespace) {
1852 return Ok(seen.clone());
1853 }
1854 }
1855
1856 let fully_qualified_name = Name::parse(complex, enclosing_namespace)?;
1857 let aliases = fix_aliases_namespace(complex.aliases(), &fully_qualified_name.namespace);
1858
1859 let symbols: Vec<String> = symbols_opt
1860 .and_then(|v| v.as_array())
1861 .ok_or(Details::GetEnumSymbolsField)
1862 .and_then(|symbols| {
1863 symbols
1864 .iter()
1865 .map(|symbol| symbol.as_str().map(|s| s.to_string()))
1866 .collect::<Option<_>>()
1867 .ok_or(Details::GetEnumSymbols)
1868 })?;
1869
1870 let mut existing_symbols: HashSet<&String> = HashSet::with_capacity(symbols.len());
1871 for symbol in symbols.iter() {
1872 validate_enum_symbol_name(symbol)?;
1873
1874 if existing_symbols.contains(&symbol) {
1876 return Err(Details::EnumSymbolDuplicate(symbol.to_string()).into());
1877 }
1878
1879 existing_symbols.insert(symbol);
1880 }
1881
1882 let mut default: Option<String> = None;
1883 if let Some(value) = complex.get("default") {
1884 if let Value::String(ref s) = *value {
1885 default = Some(s.clone());
1886 } else {
1887 return Err(Details::EnumDefaultWrongType(value.clone()).into());
1888 }
1889 }
1890
1891 if let Some(ref value) = default {
1892 let resolved = types::Value::from(value.clone())
1893 .resolve_enum(&symbols, &Some(value.to_string()), &None)
1894 .is_ok();
1895 if !resolved {
1896 return Err(Details::GetEnumDefault {
1897 symbol: value.to_string(),
1898 symbols,
1899 }
1900 .into());
1901 }
1902 }
1903
1904 let schema = Schema::Enum(EnumSchema {
1905 name: fully_qualified_name.clone(),
1906 aliases: aliases.clone(),
1907 doc: complex.doc(),
1908 symbols,
1909 default,
1910 attributes: self.get_custom_attributes(complex, vec!["symbols"]),
1911 });
1912
1913 self.register_parsed_schema(&fully_qualified_name, &schema, &aliases);
1914
1915 Ok(schema)
1916 }
1917
1918 fn parse_array(
1921 &mut self,
1922 complex: &Map<String, Value>,
1923 enclosing_namespace: &Namespace,
1924 ) -> AvroResult<Schema> {
1925 complex
1926 .get("items")
1927 .ok_or_else(|| Details::GetArrayItemsField.into())
1928 .and_then(|items| self.parse(items, enclosing_namespace))
1929 .map(|items| {
1930 Schema::array_with_attributes(
1931 items,
1932 self.get_custom_attributes(complex, vec!["items"]),
1933 )
1934 })
1935 }
1936
1937 fn parse_map(
1940 &mut self,
1941 complex: &Map<String, Value>,
1942 enclosing_namespace: &Namespace,
1943 ) -> AvroResult<Schema> {
1944 complex
1945 .get("values")
1946 .ok_or_else(|| Details::GetMapValuesField.into())
1947 .and_then(|items| self.parse(items, enclosing_namespace))
1948 .map(|items| {
1949 Schema::map_with_attributes(
1950 items,
1951 self.get_custom_attributes(complex, vec!["values"]),
1952 )
1953 })
1954 }
1955
1956 fn parse_union(
1959 &mut self,
1960 items: &[Value],
1961 enclosing_namespace: &Namespace,
1962 ) -> AvroResult<Schema> {
1963 items
1964 .iter()
1965 .map(|v| self.parse(v, enclosing_namespace))
1966 .collect::<Result<Vec<_>, _>>()
1967 .and_then(|schemas| {
1968 if schemas.is_empty() {
1969 error!(
1970 "Union schemas should have at least two members! \
1971 Please enable debug logging to find out which Record schema \
1972 declares the union with 'RUST_LOG=apache_avro::schema=debug'."
1973 );
1974 } else if schemas.len() == 1 {
1975 warn!(
1976 "Union schema with just one member! Consider dropping the union! \
1977 Please enable debug logging to find out which Record schema \
1978 declares the union with 'RUST_LOG=apache_avro::schema=debug'."
1979 );
1980 }
1981 Ok(Schema::Union(UnionSchema::new(schemas)?))
1982 })
1983 }
1984
1985 fn parse_fixed(
1988 &mut self,
1989 complex: &Map<String, Value>,
1990 enclosing_namespace: &Namespace,
1991 ) -> AvroResult<Schema> {
1992 let size_opt = complex.get("size");
1993 if size_opt.is_none() {
1994 if let Some(seen) = self.get_already_seen_schema(complex, enclosing_namespace) {
1995 return Ok(seen.clone());
1996 }
1997 }
1998
1999 let doc = complex.get("doc").and_then(|v| match &v {
2000 &Value::String(docstr) => Some(docstr.clone()),
2001 _ => None,
2002 });
2003
2004 let size = match size_opt {
2005 Some(size) => size
2006 .as_u64()
2007 .ok_or_else(|| Details::GetFixedSizeFieldPositive(size.clone())),
2008 None => Err(Details::GetFixedSizeField),
2009 }?;
2010
2011 let default = complex.get("default").and_then(|v| match &v {
2012 &Value::String(default) => Some(default.clone()),
2013 _ => None,
2014 });
2015
2016 if default.is_some() {
2017 let len = default.clone().unwrap().len();
2018 if len != size as usize {
2019 return Err(Details::FixedDefaultLenSizeMismatch(len, size).into());
2020 }
2021 }
2022
2023 let fully_qualified_name = Name::parse(complex, enclosing_namespace)?;
2024 let aliases = fix_aliases_namespace(complex.aliases(), &fully_qualified_name.namespace);
2025
2026 let schema = Schema::Fixed(FixedSchema {
2027 name: fully_qualified_name.clone(),
2028 aliases: aliases.clone(),
2029 doc,
2030 size: size as usize,
2031 default,
2032 attributes: self.get_custom_attributes(complex, vec!["size"]),
2033 });
2034
2035 self.register_parsed_schema(&fully_qualified_name, &schema, &aliases);
2036
2037 Ok(schema)
2038 }
2039}
2040
2041fn fix_aliases_namespace(aliases: Option<Vec<String>>, namespace: &Namespace) -> Aliases {
2047 aliases.map(|aliases| {
2048 aliases
2049 .iter()
2050 .map(|alias| {
2051 if alias.find('.').is_none() {
2052 match namespace {
2053 Some(ns) => format!("{ns}.{alias}"),
2054 None => alias.clone(),
2055 }
2056 } else {
2057 alias.clone()
2058 }
2059 })
2060 .map(|alias| Alias::new(alias.as_str()).unwrap())
2061 .collect()
2062 })
2063}
2064
2065fn get_schema_type_name(name: Name, value: Value) -> Name {
2066 match value.get("type") {
2067 Some(Value::Object(complex_type)) => match complex_type.name() {
2068 Some(name) => Name::new(name.as_str()).unwrap(),
2069 _ => name,
2070 },
2071 _ => name,
2072 }
2073}
2074
2075impl Serialize for Schema {
2076 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
2077 where
2078 S: Serializer,
2079 {
2080 match *self {
2081 Schema::Ref { ref name } => serializer.serialize_str(&name.fullname(None)),
2082 Schema::Null => serializer.serialize_str("null"),
2083 Schema::Boolean => serializer.serialize_str("boolean"),
2084 Schema::Int => serializer.serialize_str("int"),
2085 Schema::Long => serializer.serialize_str("long"),
2086 Schema::Float => serializer.serialize_str("float"),
2087 Schema::Double => serializer.serialize_str("double"),
2088 Schema::Bytes => serializer.serialize_str("bytes"),
2089 Schema::String => serializer.serialize_str("string"),
2090 Schema::Array(ref inner) => {
2091 let mut map = serializer.serialize_map(Some(2 + inner.attributes.len()))?;
2092 map.serialize_entry("type", "array")?;
2093 map.serialize_entry("items", &*inner.items.clone())?;
2094 for attr in &inner.attributes {
2095 map.serialize_entry(attr.0, attr.1)?;
2096 }
2097 map.end()
2098 }
2099 Schema::Map(ref inner) => {
2100 let mut map = serializer.serialize_map(Some(2 + inner.attributes.len()))?;
2101 map.serialize_entry("type", "map")?;
2102 map.serialize_entry("values", &*inner.types.clone())?;
2103 for attr in &inner.attributes {
2104 map.serialize_entry(attr.0, attr.1)?;
2105 }
2106 map.end()
2107 }
2108 Schema::Union(ref inner) => {
2109 let variants = inner.variants();
2110 let mut seq = serializer.serialize_seq(Some(variants.len()))?;
2111 for v in variants {
2112 seq.serialize_element(v)?;
2113 }
2114 seq.end()
2115 }
2116 Schema::Record(RecordSchema {
2117 ref name,
2118 ref aliases,
2119 ref doc,
2120 ref fields,
2121 ref attributes,
2122 ..
2123 }) => {
2124 let mut map = serializer.serialize_map(None)?;
2125 map.serialize_entry("type", "record")?;
2126 if let Some(ref n) = name.namespace {
2127 map.serialize_entry("namespace", n)?;
2128 }
2129 map.serialize_entry("name", &name.name)?;
2130 if let Some(docstr) = doc {
2131 map.serialize_entry("doc", docstr)?;
2132 }
2133 if let Some(aliases) = aliases {
2134 map.serialize_entry("aliases", aliases)?;
2135 }
2136 map.serialize_entry("fields", fields)?;
2137 for attr in attributes {
2138 map.serialize_entry(attr.0, attr.1)?;
2139 }
2140 map.end()
2141 }
2142 Schema::Enum(EnumSchema {
2143 ref name,
2144 ref symbols,
2145 ref aliases,
2146 ref attributes,
2147 ..
2148 }) => {
2149 let mut map = serializer.serialize_map(None)?;
2150 map.serialize_entry("type", "enum")?;
2151 if let Some(ref n) = name.namespace {
2152 map.serialize_entry("namespace", n)?;
2153 }
2154 map.serialize_entry("name", &name.name)?;
2155 map.serialize_entry("symbols", symbols)?;
2156
2157 if let Some(aliases) = aliases {
2158 map.serialize_entry("aliases", aliases)?;
2159 }
2160 for attr in attributes {
2161 map.serialize_entry(attr.0, attr.1)?;
2162 }
2163 map.end()
2164 }
2165 Schema::Fixed(ref fixed_schema) => {
2166 let mut map = serializer.serialize_map(None)?;
2167 map = fixed_schema.serialize_to_map::<S>(map)?;
2168 map.end()
2169 }
2170 Schema::Decimal(DecimalSchema {
2171 ref scale,
2172 ref precision,
2173 ref inner,
2174 }) => {
2175 let mut map = serializer.serialize_map(None)?;
2176 match inner.as_ref() {
2177 Schema::Fixed(fixed_schema) => {
2178 map = fixed_schema.serialize_to_map::<S>(map)?;
2179 }
2180 Schema::Bytes => {
2181 map.serialize_entry("type", "bytes")?;
2182 }
2183 others => {
2184 return Err(serde::ser::Error::custom(format!(
2185 "DecimalSchema inner type must be Fixed or Bytes, got {:?}",
2186 SchemaKind::from(others)
2187 )));
2188 }
2189 }
2190 map.serialize_entry("logicalType", "decimal")?;
2191 map.serialize_entry("scale", scale)?;
2192 map.serialize_entry("precision", precision)?;
2193 map.end()
2194 }
2195
2196 Schema::BigDecimal => {
2197 let mut map = serializer.serialize_map(None)?;
2198 map.serialize_entry("type", "bytes")?;
2199 map.serialize_entry("logicalType", "big-decimal")?;
2200 map.end()
2201 }
2202 Schema::Uuid => {
2203 let mut map = serializer.serialize_map(None)?;
2204 map.serialize_entry("type", "string")?;
2205 map.serialize_entry("logicalType", "uuid")?;
2206 map.end()
2207 }
2208 Schema::Date => {
2209 let mut map = serializer.serialize_map(None)?;
2210 map.serialize_entry("type", "int")?;
2211 map.serialize_entry("logicalType", "date")?;
2212 map.end()
2213 }
2214 Schema::TimeMillis => {
2215 let mut map = serializer.serialize_map(None)?;
2216 map.serialize_entry("type", "int")?;
2217 map.serialize_entry("logicalType", "time-millis")?;
2218 map.end()
2219 }
2220 Schema::TimeMicros => {
2221 let mut map = serializer.serialize_map(None)?;
2222 map.serialize_entry("type", "long")?;
2223 map.serialize_entry("logicalType", "time-micros")?;
2224 map.end()
2225 }
2226 Schema::TimestampMillis => {
2227 let mut map = serializer.serialize_map(None)?;
2228 map.serialize_entry("type", "long")?;
2229 map.serialize_entry("logicalType", "timestamp-millis")?;
2230 map.end()
2231 }
2232 Schema::TimestampMicros => {
2233 let mut map = serializer.serialize_map(None)?;
2234 map.serialize_entry("type", "long")?;
2235 map.serialize_entry("logicalType", "timestamp-micros")?;
2236 map.end()
2237 }
2238 Schema::TimestampNanos => {
2239 let mut map = serializer.serialize_map(None)?;
2240 map.serialize_entry("type", "long")?;
2241 map.serialize_entry("logicalType", "timestamp-nanos")?;
2242 map.end()
2243 }
2244 Schema::LocalTimestampMillis => {
2245 let mut map = serializer.serialize_map(None)?;
2246 map.serialize_entry("type", "long")?;
2247 map.serialize_entry("logicalType", "local-timestamp-millis")?;
2248 map.end()
2249 }
2250 Schema::LocalTimestampMicros => {
2251 let mut map = serializer.serialize_map(None)?;
2252 map.serialize_entry("type", "long")?;
2253 map.serialize_entry("logicalType", "local-timestamp-micros")?;
2254 map.end()
2255 }
2256 Schema::LocalTimestampNanos => {
2257 let mut map = serializer.serialize_map(None)?;
2258 map.serialize_entry("type", "long")?;
2259 map.serialize_entry("logicalType", "local-timestamp-nanos")?;
2260 map.end()
2261 }
2262 Schema::Duration => {
2263 let mut map = serializer.serialize_map(None)?;
2264
2265 let inner = Schema::Fixed(FixedSchema {
2268 name: Name::new("duration").unwrap(),
2269 aliases: None,
2270 doc: None,
2271 size: 12,
2272 default: None,
2273 attributes: Default::default(),
2274 });
2275 map.serialize_entry("type", &inner)?;
2276 map.serialize_entry("logicalType", "duration")?;
2277 map.end()
2278 }
2279 }
2280 }
2281}
2282
2283impl Serialize for RecordField {
2284 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
2285 where
2286 S: Serializer,
2287 {
2288 let mut map = serializer.serialize_map(None)?;
2289 map.serialize_entry("name", &self.name)?;
2290 map.serialize_entry("type", &self.schema)?;
2291
2292 if let Some(ref default) = self.default {
2293 map.serialize_entry("default", default)?;
2294 }
2295
2296 if let Some(ref aliases) = self.aliases {
2297 map.serialize_entry("aliases", aliases)?;
2298 }
2299
2300 for attr in &self.custom_attributes {
2301 map.serialize_entry(attr.0, attr.1)?;
2302 }
2303
2304 map.end()
2305 }
2306}
2307
2308fn parsing_canonical_form(schema: &Value, defined_names: &mut HashSet<String>) -> String {
2311 match schema {
2312 Value::Object(map) => pcf_map(map, defined_names),
2313 Value::String(s) => pcf_string(s),
2314 Value::Array(v) => pcf_array(v, defined_names),
2315 json => panic!("got invalid JSON value for canonical form of schema: {json}"),
2316 }
2317}
2318
2319fn pcf_map(schema: &Map<String, Value>, defined_names: &mut HashSet<String>) -> String {
2320 let typ = schema.get("type").and_then(|v| v.as_str());
2321 let name = if is_named_type(typ) {
2322 let ns = schema.get("namespace").and_then(|v| v.as_str());
2323 let raw_name = schema.get("name").and_then(|v| v.as_str());
2324 Some(format!(
2325 "{}{}",
2326 ns.map_or("".to_string(), |n| { format!("{n}.") }),
2327 raw_name.unwrap_or_default()
2328 ))
2329 } else {
2330 None
2331 };
2332
2333 if let Some(ref n) = name {
2335 if defined_names.contains(n) {
2336 return pcf_string(n);
2337 } else {
2338 defined_names.insert(n.clone());
2339 }
2340 }
2341
2342 let mut fields = Vec::new();
2343 for (k, v) in schema {
2344 if schema.len() == 1 && k == "type" {
2346 if let Value::String(s) = v {
2348 return pcf_string(s);
2349 }
2350 }
2351
2352 if field_ordering_position(k).is_none()
2354 || k == "default"
2355 || k == "doc"
2356 || k == "aliases"
2357 || k == "logicalType"
2358 {
2359 continue;
2360 }
2361
2362 if k == "name" {
2364 if let Some(ref n) = name {
2365 fields.push(("name", format!("{}:{}", pcf_string(k), pcf_string(n))));
2366 continue;
2367 }
2368 }
2369
2370 if k == "size" || k == "precision" || k == "scale" {
2372 let i = match v.as_str() {
2373 Some(s) => s.parse::<i64>().expect("Only valid schemas are accepted!"),
2374 None => v.as_i64().unwrap(),
2375 };
2376 fields.push((k, format!("{}:{}", pcf_string(k), i)));
2377 continue;
2378 }
2379
2380 fields.push((
2382 k,
2383 format!(
2384 "{}:{}",
2385 pcf_string(k),
2386 parsing_canonical_form(v, defined_names)
2387 ),
2388 ));
2389 }
2390
2391 fields.sort_unstable_by_key(|(k, _)| field_ordering_position(k).unwrap());
2393 let inter = fields
2394 .into_iter()
2395 .map(|(_, v)| v)
2396 .collect::<Vec<_>>()
2397 .join(",");
2398 format!("{{{inter}}}")
2399}
2400
2401fn is_named_type(typ: Option<&str>) -> bool {
2402 matches!(
2403 typ,
2404 Some("record") | Some("enum") | Some("fixed") | Some("ref")
2405 )
2406}
2407
2408fn pcf_array(arr: &[Value], defined_names: &mut HashSet<String>) -> String {
2409 let inter = arr
2410 .iter()
2411 .map(|a| parsing_canonical_form(a, defined_names))
2412 .collect::<Vec<String>>()
2413 .join(",");
2414 format!("[{inter}]")
2415}
2416
2417fn pcf_string(s: &str) -> String {
2418 format!("\"{s}\"")
2419}
2420
2421const RESERVED_FIELDS: &[&str] = &[
2422 "name",
2423 "type",
2424 "fields",
2425 "symbols",
2426 "items",
2427 "values",
2428 "size",
2429 "logicalType",
2430 "order",
2431 "doc",
2432 "aliases",
2433 "default",
2434 "precision",
2435 "scale",
2436];
2437
2438fn field_ordering_position(field: &str) -> Option<usize> {
2440 RESERVED_FIELDS
2441 .iter()
2442 .position(|&f| f == field)
2443 .map(|pos| pos + 1)
2444}
2445
2446pub trait AvroSchema {
2451 fn get_schema() -> Schema;
2452}
2453
2454#[cfg(feature = "derive")]
2455pub mod derive {
2456 use super::*;
2457 use std::borrow::Cow;
2458
2459 pub trait AvroSchemaComponent {
2508 fn get_schema_in_ctxt(named_schemas: &mut Names, enclosing_namespace: &Namespace)
2509 -> Schema;
2510 }
2511
2512 impl<T> AvroSchema for T
2513 where
2514 T: AvroSchemaComponent,
2515 {
2516 fn get_schema() -> Schema {
2517 T::get_schema_in_ctxt(&mut HashMap::default(), &None)
2518 }
2519 }
2520
2521 macro_rules! impl_schema (
2522 ($type:ty, $variant_constructor:expr) => (
2523 impl AvroSchemaComponent for $type {
2524 fn get_schema_in_ctxt(_: &mut Names, _: &Namespace) -> Schema {
2525 $variant_constructor
2526 }
2527 }
2528 );
2529 );
2530
2531 impl_schema!(bool, Schema::Boolean);
2532 impl_schema!(i8, Schema::Int);
2533 impl_schema!(i16, Schema::Int);
2534 impl_schema!(i32, Schema::Int);
2535 impl_schema!(i64, Schema::Long);
2536 impl_schema!(u8, Schema::Int);
2537 impl_schema!(u16, Schema::Int);
2538 impl_schema!(u32, Schema::Long);
2539 impl_schema!(f32, Schema::Float);
2540 impl_schema!(f64, Schema::Double);
2541 impl_schema!(String, Schema::String);
2542 impl_schema!(uuid::Uuid, Schema::Uuid);
2543 impl_schema!(core::time::Duration, Schema::Duration);
2544
2545 impl<T> AvroSchemaComponent for Vec<T>
2546 where
2547 T: AvroSchemaComponent,
2548 {
2549 fn get_schema_in_ctxt(
2550 named_schemas: &mut Names,
2551 enclosing_namespace: &Namespace,
2552 ) -> Schema {
2553 Schema::array(T::get_schema_in_ctxt(named_schemas, enclosing_namespace))
2554 }
2555 }
2556
2557 impl<T> AvroSchemaComponent for Option<T>
2558 where
2559 T: AvroSchemaComponent,
2560 {
2561 fn get_schema_in_ctxt(
2562 named_schemas: &mut Names,
2563 enclosing_namespace: &Namespace,
2564 ) -> Schema {
2565 let inner_schema = T::get_schema_in_ctxt(named_schemas, enclosing_namespace);
2566 Schema::Union(UnionSchema {
2567 schemas: vec![Schema::Null, inner_schema.clone()],
2568 variant_index: [Schema::Null, inner_schema]
2569 .iter()
2570 .enumerate()
2571 .map(|(idx, s)| (SchemaKind::from(s), idx))
2572 .collect(),
2573 })
2574 }
2575 }
2576
2577 impl<T> AvroSchemaComponent for Map<String, T>
2578 where
2579 T: AvroSchemaComponent,
2580 {
2581 fn get_schema_in_ctxt(
2582 named_schemas: &mut Names,
2583 enclosing_namespace: &Namespace,
2584 ) -> Schema {
2585 Schema::map(T::get_schema_in_ctxt(named_schemas, enclosing_namespace))
2586 }
2587 }
2588
2589 impl<T> AvroSchemaComponent for HashMap<String, T>
2590 where
2591 T: AvroSchemaComponent,
2592 {
2593 fn get_schema_in_ctxt(
2594 named_schemas: &mut Names,
2595 enclosing_namespace: &Namespace,
2596 ) -> Schema {
2597 Schema::map(T::get_schema_in_ctxt(named_schemas, enclosing_namespace))
2598 }
2599 }
2600
2601 impl<T> AvroSchemaComponent for Box<T>
2602 where
2603 T: AvroSchemaComponent,
2604 {
2605 fn get_schema_in_ctxt(
2606 named_schemas: &mut Names,
2607 enclosing_namespace: &Namespace,
2608 ) -> Schema {
2609 T::get_schema_in_ctxt(named_schemas, enclosing_namespace)
2610 }
2611 }
2612
2613 impl<T> AvroSchemaComponent for std::sync::Mutex<T>
2614 where
2615 T: AvroSchemaComponent,
2616 {
2617 fn get_schema_in_ctxt(
2618 named_schemas: &mut Names,
2619 enclosing_namespace: &Namespace,
2620 ) -> Schema {
2621 T::get_schema_in_ctxt(named_schemas, enclosing_namespace)
2622 }
2623 }
2624
2625 impl<T> AvroSchemaComponent for Cow<'_, T>
2626 where
2627 T: AvroSchemaComponent + Clone,
2628 {
2629 fn get_schema_in_ctxt(
2630 named_schemas: &mut Names,
2631 enclosing_namespace: &Namespace,
2632 ) -> Schema {
2633 T::get_schema_in_ctxt(named_schemas, enclosing_namespace)
2634 }
2635 }
2636}
2637
2638#[cfg(test)]
2639mod tests {
2640 use super::*;
2641 use crate::{error::Details, rabin::Rabin};
2642 use apache_avro_test_helper::{
2643 TestResult,
2644 logger::{assert_logged, assert_not_logged},
2645 };
2646 use serde_json::json;
2647
2648 #[test]
2649 fn test_invalid_schema() {
2650 assert!(Schema::parse_str("invalid").is_err());
2651 }
2652
2653 #[test]
2654 fn test_primitive_schema() -> TestResult {
2655 assert_eq!(Schema::Null, Schema::parse_str("\"null\"")?);
2656 assert_eq!(Schema::Int, Schema::parse_str("\"int\"")?);
2657 assert_eq!(Schema::Double, Schema::parse_str("\"double\"")?);
2658 Ok(())
2659 }
2660
2661 #[test]
2662 fn test_array_schema() -> TestResult {
2663 let schema = Schema::parse_str(r#"{"type": "array", "items": "string"}"#)?;
2664 assert_eq!(Schema::array(Schema::String), schema);
2665 Ok(())
2666 }
2667
2668 #[test]
2669 fn test_map_schema() -> TestResult {
2670 let schema = Schema::parse_str(r#"{"type": "map", "values": "double"}"#)?;
2671 assert_eq!(Schema::map(Schema::Double), schema);
2672 Ok(())
2673 }
2674
2675 #[test]
2676 fn test_union_schema() -> TestResult {
2677 let schema = Schema::parse_str(r#"["null", "int"]"#)?;
2678 assert_eq!(
2679 Schema::Union(UnionSchema::new(vec![Schema::Null, Schema::Int])?),
2680 schema
2681 );
2682 Ok(())
2683 }
2684
2685 #[test]
2686 fn test_union_unsupported_schema() {
2687 let schema = Schema::parse_str(r#"["null", ["null", "int"], "string"]"#);
2688 assert!(schema.is_err());
2689 }
2690
2691 #[test]
2692 fn test_multi_union_schema() -> TestResult {
2693 let schema = Schema::parse_str(r#"["null", "int", "float", "string", "bytes"]"#);
2694 assert!(schema.is_ok());
2695 let schema = schema?;
2696 assert_eq!(SchemaKind::from(&schema), SchemaKind::Union);
2697 let union_schema = match schema {
2698 Schema::Union(u) => u,
2699 _ => unreachable!(),
2700 };
2701 assert_eq!(union_schema.variants().len(), 5);
2702 let mut variants = union_schema.variants().iter();
2703 assert_eq!(SchemaKind::from(variants.next().unwrap()), SchemaKind::Null);
2704 assert_eq!(SchemaKind::from(variants.next().unwrap()), SchemaKind::Int);
2705 assert_eq!(
2706 SchemaKind::from(variants.next().unwrap()),
2707 SchemaKind::Float
2708 );
2709 assert_eq!(
2710 SchemaKind::from(variants.next().unwrap()),
2711 SchemaKind::String
2712 );
2713 assert_eq!(
2714 SchemaKind::from(variants.next().unwrap()),
2715 SchemaKind::Bytes
2716 );
2717 assert_eq!(variants.next(), None);
2718
2719 Ok(())
2720 }
2721
2722 #[test]
2723 fn test_avro_3621_nullable_record_field() -> TestResult {
2724 let nullable_record_field = RecordField::builder()
2725 .name("next".to_string())
2726 .schema(Schema::Union(UnionSchema::new(vec![
2727 Schema::Null,
2728 Schema::Ref {
2729 name: Name {
2730 name: "LongList".to_owned(),
2731 namespace: None,
2732 },
2733 },
2734 ])?))
2735 .order(RecordFieldOrder::Ascending)
2736 .position(1)
2737 .build();
2738
2739 assert!(nullable_record_field.is_nullable());
2740
2741 let non_nullable_record_field = RecordField::builder()
2742 .name("next".to_string())
2743 .default(json!(2))
2744 .schema(Schema::Long)
2745 .order(RecordFieldOrder::Ascending)
2746 .position(1)
2747 .build();
2748
2749 assert!(!non_nullable_record_field.is_nullable());
2750 Ok(())
2751 }
2752
2753 #[test]
2755 fn test_union_of_records() -> TestResult {
2756 use std::iter::FromIterator;
2757
2758 let schema_str_a = r#"{
2760 "name": "A",
2761 "type": "record",
2762 "fields": [
2763 {"name": "field_one", "type": "float"}
2764 ]
2765 }"#;
2766
2767 let schema_str_b = r#"{
2768 "name": "B",
2769 "type": "record",
2770 "fields": [
2771 {"name": "field_one", "type": "float"}
2772 ]
2773 }"#;
2774
2775 let schema_str_c = r#"{
2777 "name": "C",
2778 "type": "record",
2779 "fields": [
2780 {"name": "field_one", "type": ["A", "B"]}
2781 ]
2782 }"#;
2783
2784 let schema_c = Schema::parse_list([schema_str_a, schema_str_b, schema_str_c])?
2785 .last()
2786 .unwrap()
2787 .clone();
2788
2789 let schema_c_expected = Schema::Record(
2790 RecordSchema::builder()
2791 .name(Name::new("C")?)
2792 .fields(vec![
2793 RecordField::builder()
2794 .name("field_one".to_string())
2795 .schema(Schema::Union(UnionSchema::new(vec![
2796 Schema::Ref {
2797 name: Name::new("A")?,
2798 },
2799 Schema::Ref {
2800 name: Name::new("B")?,
2801 },
2802 ])?))
2803 .build(),
2804 ])
2805 .lookup(BTreeMap::from_iter(vec![("field_one".to_string(), 0)]))
2806 .build(),
2807 );
2808
2809 assert_eq!(schema_c, schema_c_expected);
2810 Ok(())
2811 }
2812
2813 #[test]
2814 fn avro_rs_104_test_root_union_of_records() -> TestResult {
2815 let schema_str_a = r#"{
2817 "name": "A",
2818 "type": "record",
2819 "fields": [
2820 {"name": "field_one", "type": "float"}
2821 ]
2822 }"#;
2823
2824 let schema_str_b = r#"{
2825 "name": "B",
2826 "type": "record",
2827 "fields": [
2828 {"name": "field_one", "type": "float"}
2829 ]
2830 }"#;
2831
2832 let schema_str_c = r#"["A", "B"]"#;
2833
2834 let (schema_c, schemata) =
2835 Schema::parse_str_with_list(schema_str_c, [schema_str_a, schema_str_b])?;
2836
2837 let schema_a_expected = Schema::Record(RecordSchema {
2838 name: Name::new("A")?,
2839 aliases: None,
2840 doc: None,
2841 fields: vec![RecordField {
2842 name: "field_one".to_string(),
2843 doc: None,
2844 default: None,
2845 aliases: None,
2846 schema: Schema::Float,
2847 order: RecordFieldOrder::Ignore,
2848 position: 0,
2849 custom_attributes: Default::default(),
2850 }],
2851 lookup: BTreeMap::from_iter(vec![("field_one".to_string(), 0)]),
2852 attributes: Default::default(),
2853 });
2854
2855 let schema_b_expected = Schema::Record(RecordSchema {
2856 name: Name::new("B")?,
2857 aliases: None,
2858 doc: None,
2859 fields: vec![RecordField {
2860 name: "field_one".to_string(),
2861 doc: None,
2862 default: None,
2863 aliases: None,
2864 schema: Schema::Float,
2865 order: RecordFieldOrder::Ignore,
2866 position: 0,
2867 custom_attributes: Default::default(),
2868 }],
2869 lookup: BTreeMap::from_iter(vec![("field_one".to_string(), 0)]),
2870 attributes: Default::default(),
2871 });
2872
2873 let schema_c_expected = Schema::Union(UnionSchema::new(vec![
2874 Schema::Ref {
2875 name: Name::new("A")?,
2876 },
2877 Schema::Ref {
2878 name: Name::new("B")?,
2879 },
2880 ])?);
2881
2882 assert_eq!(schema_c, schema_c_expected);
2883 assert_eq!(schemata[0], schema_a_expected);
2884 assert_eq!(schemata[1], schema_b_expected);
2885
2886 Ok(())
2887 }
2888
2889 #[test]
2890 fn avro_rs_104_test_root_union_of_records_name_collision() -> TestResult {
2891 let schema_str_a1 = r#"{
2893 "name": "A",
2894 "type": "record",
2895 "fields": [
2896 {"name": "field_one", "type": "float"}
2897 ]
2898 }"#;
2899
2900 let schema_str_a2 = r#"{
2901 "name": "A",
2902 "type": "record",
2903 "fields": [
2904 {"name": "field_one", "type": "float"}
2905 ]
2906 }"#;
2907
2908 let schema_str_c = r#"["A", "A"]"#;
2909
2910 match Schema::parse_str_with_list(schema_str_c, [schema_str_a1, schema_str_a2]) {
2911 Ok(_) => unreachable!("Expected an error that the name is already defined"),
2912 Err(e) => assert_eq!(
2913 e.to_string(),
2914 "Two schemas with the same fullname were given: \"A\""
2915 ),
2916 }
2917
2918 Ok(())
2919 }
2920
2921 #[test]
2922 fn avro_rs_104_test_root_union_of_records_no_name() -> TestResult {
2923 let schema_str_a = r#"{
2924 "name": "A",
2925 "type": "record",
2926 "fields": [
2927 {"name": "field_one", "type": "float"}
2928 ]
2929 }"#;
2930
2931 let schema_str_b = r#"{
2933 "type": "record",
2934 "fields": [
2935 {"name": "field_one", "type": "float"}
2936 ]
2937 }"#;
2938
2939 let schema_str_c = r#"["A", "A"]"#;
2940
2941 match Schema::parse_str_with_list(schema_str_c, [schema_str_a, schema_str_b]) {
2942 Ok(_) => unreachable!("Expected an error that schema_str_b is missing a name field"),
2943 Err(e) => assert_eq!(e.to_string(), "No `name` field"),
2944 }
2945
2946 Ok(())
2947 }
2948
2949 #[test]
2950 fn avro_3584_test_recursion_records() -> TestResult {
2951 let schema_str_a = r#"{
2953 "name": "A",
2954 "type": "record",
2955 "fields": [ {"name": "field_one", "type": "B"} ]
2956 }"#;
2957
2958 let schema_str_b = r#"{
2959 "name": "B",
2960 "type": "record",
2961 "fields": [ {"name": "field_one", "type": "A"} ]
2962 }"#;
2963
2964 let list = Schema::parse_list([schema_str_a, schema_str_b])?;
2965
2966 let schema_a = list.first().unwrap().clone();
2967
2968 match schema_a {
2969 Schema::Record(RecordSchema { fields, .. }) => {
2970 let f1 = fields.first();
2971
2972 let ref_schema = Schema::Ref {
2973 name: Name::new("B")?,
2974 };
2975 assert_eq!(ref_schema, f1.unwrap().schema);
2976 }
2977 _ => panic!("Expected a record schema!"),
2978 }
2979
2980 Ok(())
2981 }
2982
2983 #[test]
2984 fn test_avro_3248_nullable_record() -> TestResult {
2985 use std::iter::FromIterator;
2986
2987 let schema_str_a = r#"{
2988 "name": "A",
2989 "type": "record",
2990 "fields": [
2991 {"name": "field_one", "type": "float"}
2992 ]
2993 }"#;
2994
2995 let schema_str_option_a = r#"{
2997 "name": "OptionA",
2998 "type": "record",
2999 "fields": [
3000 {"name": "field_one", "type": ["null", "A"], "default": null}
3001 ]
3002 }"#;
3003
3004 let schema_option_a = Schema::parse_list([schema_str_a, schema_str_option_a])?
3005 .last()
3006 .unwrap()
3007 .clone();
3008
3009 let schema_option_a_expected = Schema::Record(RecordSchema {
3010 name: Name::new("OptionA")?,
3011 aliases: None,
3012 doc: None,
3013 fields: vec![RecordField {
3014 name: "field_one".to_string(),
3015 doc: None,
3016 default: Some(Value::Null),
3017 aliases: None,
3018 schema: Schema::Union(UnionSchema::new(vec![
3019 Schema::Null,
3020 Schema::Ref {
3021 name: Name::new("A")?,
3022 },
3023 ])?),
3024 order: RecordFieldOrder::Ignore,
3025 position: 0,
3026 custom_attributes: Default::default(),
3027 }],
3028 lookup: BTreeMap::from_iter(vec![("field_one".to_string(), 0)]),
3029 attributes: Default::default(),
3030 });
3031
3032 assert_eq!(schema_option_a, schema_option_a_expected);
3033
3034 Ok(())
3035 }
3036
3037 #[test]
3038 fn test_record_schema() -> TestResult {
3039 let parsed = Schema::parse_str(
3040 r#"
3041 {
3042 "type": "record",
3043 "name": "test",
3044 "fields": [
3045 {"name": "a", "type": "long", "default": 42},
3046 {"name": "b", "type": "string"}
3047 ]
3048 }
3049 "#,
3050 )?;
3051
3052 let mut lookup = BTreeMap::new();
3053 lookup.insert("a".to_owned(), 0);
3054 lookup.insert("b".to_owned(), 1);
3055
3056 let expected = Schema::Record(RecordSchema {
3057 name: Name::new("test")?,
3058 aliases: None,
3059 doc: None,
3060 fields: vec![
3061 RecordField {
3062 name: "a".to_string(),
3063 doc: None,
3064 default: Some(Value::Number(42i64.into())),
3065 aliases: None,
3066 schema: Schema::Long,
3067 order: RecordFieldOrder::Ascending,
3068 position: 0,
3069 custom_attributes: Default::default(),
3070 },
3071 RecordField {
3072 name: "b".to_string(),
3073 doc: None,
3074 default: None,
3075 aliases: None,
3076 schema: Schema::String,
3077 order: RecordFieldOrder::Ascending,
3078 position: 1,
3079 custom_attributes: Default::default(),
3080 },
3081 ],
3082 lookup,
3083 attributes: Default::default(),
3084 });
3085
3086 assert_eq!(parsed, expected);
3087
3088 Ok(())
3089 }
3090
3091 #[test]
3092 fn test_avro_3302_record_schema_with_currently_parsing_schema() -> TestResult {
3093 let schema = Schema::parse_str(
3094 r#"
3095 {
3096 "type": "record",
3097 "name": "test",
3098 "fields": [{
3099 "name": "recordField",
3100 "type": {
3101 "type": "record",
3102 "name": "Node",
3103 "fields": [
3104 {"name": "label", "type": "string"},
3105 {"name": "children", "type": {"type": "array", "items": "Node"}}
3106 ]
3107 }
3108 }]
3109 }
3110 "#,
3111 )?;
3112
3113 let mut lookup = BTreeMap::new();
3114 lookup.insert("recordField".to_owned(), 0);
3115
3116 let mut node_lookup = BTreeMap::new();
3117 node_lookup.insert("children".to_owned(), 1);
3118 node_lookup.insert("label".to_owned(), 0);
3119
3120 let expected = Schema::Record(RecordSchema {
3121 name: Name::new("test")?,
3122 aliases: None,
3123 doc: None,
3124 fields: vec![RecordField {
3125 name: "recordField".to_string(),
3126 doc: None,
3127 default: None,
3128 aliases: None,
3129 schema: Schema::Record(RecordSchema {
3130 name: Name::new("Node")?,
3131 aliases: None,
3132 doc: None,
3133 fields: vec![
3134 RecordField {
3135 name: "label".to_string(),
3136 doc: None,
3137 default: None,
3138 aliases: None,
3139 schema: Schema::String,
3140 order: RecordFieldOrder::Ascending,
3141 position: 0,
3142 custom_attributes: Default::default(),
3143 },
3144 RecordField {
3145 name: "children".to_string(),
3146 doc: None,
3147 default: None,
3148 aliases: None,
3149 schema: Schema::array(Schema::Ref {
3150 name: Name::new("Node")?,
3151 }),
3152 order: RecordFieldOrder::Ascending,
3153 position: 1,
3154 custom_attributes: Default::default(),
3155 },
3156 ],
3157 lookup: node_lookup,
3158 attributes: Default::default(),
3159 }),
3160 order: RecordFieldOrder::Ascending,
3161 position: 0,
3162 custom_attributes: Default::default(),
3163 }],
3164 lookup,
3165 attributes: Default::default(),
3166 });
3167 assert_eq!(schema, expected);
3168
3169 let canonical_form = &schema.canonical_form();
3170 let expected = r#"{"name":"test","type":"record","fields":[{"name":"recordField","type":{"name":"Node","type":"record","fields":[{"name":"label","type":"string"},{"name":"children","type":{"type":"array","items":"Node"}}]}}]}"#;
3171 assert_eq!(canonical_form, &expected);
3172
3173 Ok(())
3174 }
3175
3176 #[test]
3178 fn test_parsing_of_recursive_type_enum() -> TestResult {
3179 let schema = r#"
3180 {
3181 "type": "record",
3182 "name": "User",
3183 "namespace": "office",
3184 "fields": [
3185 {
3186 "name": "details",
3187 "type": [
3188 {
3189 "type": "record",
3190 "name": "Employee",
3191 "fields": [
3192 {
3193 "name": "gender",
3194 "type": {
3195 "type": "enum",
3196 "name": "Gender",
3197 "symbols": [
3198 "male",
3199 "female"
3200 ]
3201 },
3202 "default": "female"
3203 }
3204 ]
3205 },
3206 {
3207 "type": "record",
3208 "name": "Manager",
3209 "fields": [
3210 {
3211 "name": "gender",
3212 "type": "Gender"
3213 }
3214 ]
3215 }
3216 ]
3217 }
3218 ]
3219 }
3220 "#;
3221
3222 let schema = Schema::parse_str(schema)?;
3223 let schema_str = schema.canonical_form();
3224 let expected = r#"{"name":"office.User","type":"record","fields":[{"name":"details","type":[{"name":"office.Employee","type":"record","fields":[{"name":"gender","type":{"name":"office.Gender","type":"enum","symbols":["male","female"]}}]},{"name":"office.Manager","type":"record","fields":[{"name":"gender","type":"office.Gender"}]}]}]}"#;
3225 assert_eq!(schema_str, expected);
3226
3227 Ok(())
3228 }
3229
3230 #[test]
3231 fn test_parsing_of_recursive_type_fixed() -> TestResult {
3232 let schema = r#"
3233 {
3234 "type": "record",
3235 "name": "User",
3236 "namespace": "office",
3237 "fields": [
3238 {
3239 "name": "details",
3240 "type": [
3241 {
3242 "type": "record",
3243 "name": "Employee",
3244 "fields": [
3245 {
3246 "name": "id",
3247 "type": {
3248 "type": "fixed",
3249 "name": "EmployeeId",
3250 "size": 16
3251 },
3252 "default": "female"
3253 }
3254 ]
3255 },
3256 {
3257 "type": "record",
3258 "name": "Manager",
3259 "fields": [
3260 {
3261 "name": "id",
3262 "type": "EmployeeId"
3263 }
3264 ]
3265 }
3266 ]
3267 }
3268 ]
3269 }
3270 "#;
3271
3272 let schema = Schema::parse_str(schema)?;
3273 let schema_str = schema.canonical_form();
3274 let expected = r#"{"name":"office.User","type":"record","fields":[{"name":"details","type":[{"name":"office.Employee","type":"record","fields":[{"name":"id","type":{"name":"office.EmployeeId","type":"fixed","size":16}}]},{"name":"office.Manager","type":"record","fields":[{"name":"id","type":"office.EmployeeId"}]}]}]}"#;
3275 assert_eq!(schema_str, expected);
3276
3277 Ok(())
3278 }
3279
3280 #[test]
3281 fn test_avro_3302_record_schema_with_currently_parsing_schema_aliases() -> TestResult {
3282 let schema = Schema::parse_str(
3283 r#"
3284 {
3285 "type": "record",
3286 "name": "LongList",
3287 "aliases": ["LinkedLongs"],
3288 "fields" : [
3289 {"name": "value", "type": "long"},
3290 {"name": "next", "type": ["null", "LinkedLongs"]}
3291 ]
3292 }
3293 "#,
3294 )?;
3295
3296 let mut lookup = BTreeMap::new();
3297 lookup.insert("value".to_owned(), 0);
3298 lookup.insert("next".to_owned(), 1);
3299
3300 let expected = Schema::Record(RecordSchema {
3301 name: Name {
3302 name: "LongList".to_owned(),
3303 namespace: None,
3304 },
3305 aliases: Some(vec![Alias::new("LinkedLongs").unwrap()]),
3306 doc: None,
3307 fields: vec![
3308 RecordField {
3309 name: "value".to_string(),
3310 doc: None,
3311 default: None,
3312 aliases: None,
3313 schema: Schema::Long,
3314 order: RecordFieldOrder::Ascending,
3315 position: 0,
3316 custom_attributes: Default::default(),
3317 },
3318 RecordField {
3319 name: "next".to_string(),
3320 doc: None,
3321 default: None,
3322 aliases: None,
3323 schema: Schema::Union(UnionSchema::new(vec![
3324 Schema::Null,
3325 Schema::Ref {
3326 name: Name {
3327 name: "LongList".to_owned(),
3328 namespace: None,
3329 },
3330 },
3331 ])?),
3332 order: RecordFieldOrder::Ascending,
3333 position: 1,
3334 custom_attributes: Default::default(),
3335 },
3336 ],
3337 lookup,
3338 attributes: Default::default(),
3339 });
3340 assert_eq!(schema, expected);
3341
3342 let canonical_form = &schema.canonical_form();
3343 let expected = r#"{"name":"LongList","type":"record","fields":[{"name":"value","type":"long"},{"name":"next","type":["null","LongList"]}]}"#;
3344 assert_eq!(canonical_form, &expected);
3345
3346 Ok(())
3347 }
3348
3349 #[test]
3350 fn test_avro_3370_record_schema_with_currently_parsing_schema_named_record() -> TestResult {
3351 let schema = Schema::parse_str(
3352 r#"
3353 {
3354 "type" : "record",
3355 "name" : "record",
3356 "fields" : [
3357 { "name" : "value", "type" : "long" },
3358 { "name" : "next", "type" : "record" }
3359 ]
3360 }
3361 "#,
3362 )?;
3363
3364 let mut lookup = BTreeMap::new();
3365 lookup.insert("value".to_owned(), 0);
3366 lookup.insert("next".to_owned(), 1);
3367
3368 let expected = Schema::Record(RecordSchema {
3369 name: Name {
3370 name: "record".to_owned(),
3371 namespace: None,
3372 },
3373 aliases: None,
3374 doc: None,
3375 fields: vec![
3376 RecordField {
3377 name: "value".to_string(),
3378 doc: None,
3379 default: None,
3380 aliases: None,
3381 schema: Schema::Long,
3382 order: RecordFieldOrder::Ascending,
3383 position: 0,
3384 custom_attributes: Default::default(),
3385 },
3386 RecordField {
3387 name: "next".to_string(),
3388 doc: None,
3389 default: None,
3390 aliases: None,
3391 schema: Schema::Ref {
3392 name: Name {
3393 name: "record".to_owned(),
3394 namespace: None,
3395 },
3396 },
3397 order: RecordFieldOrder::Ascending,
3398 position: 1,
3399 custom_attributes: Default::default(),
3400 },
3401 ],
3402 lookup,
3403 attributes: Default::default(),
3404 });
3405 assert_eq!(schema, expected);
3406
3407 let canonical_form = &schema.canonical_form();
3408 let expected = r#"{"name":"record","type":"record","fields":[{"name":"value","type":"long"},{"name":"next","type":"record"}]}"#;
3409 assert_eq!(canonical_form, &expected);
3410
3411 Ok(())
3412 }
3413
3414 #[test]
3415 fn test_avro_3370_record_schema_with_currently_parsing_schema_named_enum() -> TestResult {
3416 let schema = Schema::parse_str(
3417 r#"
3418 {
3419 "type" : "record",
3420 "name" : "record",
3421 "fields" : [
3422 {
3423 "type" : "enum",
3424 "name" : "enum",
3425 "symbols": ["one", "two", "three"]
3426 },
3427 { "name" : "next", "type" : "enum" }
3428 ]
3429 }
3430 "#,
3431 )?;
3432
3433 let mut lookup = BTreeMap::new();
3434 lookup.insert("enum".to_owned(), 0);
3435 lookup.insert("next".to_owned(), 1);
3436
3437 let expected = Schema::Record(RecordSchema {
3438 name: Name {
3439 name: "record".to_owned(),
3440 namespace: None,
3441 },
3442 aliases: None,
3443 doc: None,
3444 fields: vec![
3445 RecordField {
3446 name: "enum".to_string(),
3447 doc: None,
3448 default: None,
3449 aliases: None,
3450 schema: Schema::Enum(
3451 EnumSchema::builder()
3452 .name(Name::new("enum")?)
3453 .symbols(vec![
3454 "one".to_string(),
3455 "two".to_string(),
3456 "three".to_string(),
3457 ])
3458 .build(),
3459 ),
3460 order: RecordFieldOrder::Ascending,
3461 position: 0,
3462 custom_attributes: Default::default(),
3463 },
3464 RecordField {
3465 name: "next".to_string(),
3466 doc: None,
3467 default: None,
3468 aliases: None,
3469 schema: Schema::Enum(EnumSchema {
3470 name: Name {
3471 name: "enum".to_owned(),
3472 namespace: None,
3473 },
3474 aliases: None,
3475 doc: None,
3476 symbols: vec!["one".to_string(), "two".to_string(), "three".to_string()],
3477 default: None,
3478 attributes: Default::default(),
3479 }),
3480 order: RecordFieldOrder::Ascending,
3481 position: 1,
3482 custom_attributes: Default::default(),
3483 },
3484 ],
3485 lookup,
3486 attributes: Default::default(),
3487 });
3488 assert_eq!(schema, expected);
3489
3490 let canonical_form = &schema.canonical_form();
3491 let expected = r#"{"name":"record","type":"record","fields":[{"name":"enum","type":{"name":"enum","type":"enum","symbols":["one","two","three"]}},{"name":"next","type":"enum"}]}"#;
3492 assert_eq!(canonical_form, &expected);
3493
3494 Ok(())
3495 }
3496
3497 #[test]
3498 fn test_avro_3370_record_schema_with_currently_parsing_schema_named_fixed() -> TestResult {
3499 let schema = Schema::parse_str(
3500 r#"
3501 {
3502 "type" : "record",
3503 "name" : "record",
3504 "fields" : [
3505 {
3506 "type" : "fixed",
3507 "name" : "fixed",
3508 "size": 456
3509 },
3510 { "name" : "next", "type" : "fixed" }
3511 ]
3512 }
3513 "#,
3514 )?;
3515
3516 let mut lookup = BTreeMap::new();
3517 lookup.insert("fixed".to_owned(), 0);
3518 lookup.insert("next".to_owned(), 1);
3519
3520 let expected = Schema::Record(RecordSchema {
3521 name: Name {
3522 name: "record".to_owned(),
3523 namespace: None,
3524 },
3525 aliases: None,
3526 doc: None,
3527 fields: vec![
3528 RecordField {
3529 name: "fixed".to_string(),
3530 doc: None,
3531 default: None,
3532 aliases: None,
3533 schema: Schema::Fixed(FixedSchema {
3534 name: Name {
3535 name: "fixed".to_owned(),
3536 namespace: None,
3537 },
3538 aliases: None,
3539 doc: None,
3540 size: 456,
3541 default: None,
3542 attributes: Default::default(),
3543 }),
3544 order: RecordFieldOrder::Ascending,
3545 position: 0,
3546 custom_attributes: Default::default(),
3547 },
3548 RecordField {
3549 name: "next".to_string(),
3550 doc: None,
3551 default: None,
3552 aliases: None,
3553 schema: Schema::Fixed(FixedSchema {
3554 name: Name {
3555 name: "fixed".to_owned(),
3556 namespace: None,
3557 },
3558 aliases: None,
3559 doc: None,
3560 size: 456,
3561 default: None,
3562 attributes: Default::default(),
3563 }),
3564 order: RecordFieldOrder::Ascending,
3565 position: 1,
3566 custom_attributes: Default::default(),
3567 },
3568 ],
3569 lookup,
3570 attributes: Default::default(),
3571 });
3572 assert_eq!(schema, expected);
3573
3574 let canonical_form = &schema.canonical_form();
3575 let expected = r#"{"name":"record","type":"record","fields":[{"name":"fixed","type":{"name":"fixed","type":"fixed","size":456}},{"name":"next","type":"fixed"}]}"#;
3576 assert_eq!(canonical_form, &expected);
3577
3578 Ok(())
3579 }
3580
3581 #[test]
3582 fn test_enum_schema() -> TestResult {
3583 let schema = Schema::parse_str(
3584 r#"{"type": "enum", "name": "Suit", "symbols": ["diamonds", "spades", "clubs", "hearts"]}"#,
3585 )?;
3586
3587 let expected = Schema::Enum(EnumSchema {
3588 name: Name::new("Suit")?,
3589 aliases: None,
3590 doc: None,
3591 symbols: vec![
3592 "diamonds".to_owned(),
3593 "spades".to_owned(),
3594 "clubs".to_owned(),
3595 "hearts".to_owned(),
3596 ],
3597 default: None,
3598 attributes: Default::default(),
3599 });
3600
3601 assert_eq!(expected, schema);
3602
3603 Ok(())
3604 }
3605
3606 #[test]
3607 fn test_enum_schema_duplicate() -> TestResult {
3608 let schema = Schema::parse_str(
3610 r#"{"type": "enum", "name": "Suit", "symbols": ["diamonds", "spades", "clubs", "diamonds"]}"#,
3611 );
3612 assert!(schema.is_err());
3613
3614 Ok(())
3615 }
3616
3617 #[test]
3618 fn test_enum_schema_name() -> TestResult {
3619 let schema = Schema::parse_str(
3621 r#"{"type": "enum", "name": "Enum", "symbols": ["0000", "variant"]}"#,
3622 );
3623 assert!(schema.is_err());
3624
3625 Ok(())
3626 }
3627
3628 #[test]
3629 fn test_fixed_schema() -> TestResult {
3630 let schema = Schema::parse_str(r#"{"type": "fixed", "name": "test", "size": 16}"#)?;
3631
3632 let expected = Schema::Fixed(FixedSchema {
3633 name: Name::new("test")?,
3634 aliases: None,
3635 doc: None,
3636 size: 16_usize,
3637 default: None,
3638 attributes: Default::default(),
3639 });
3640
3641 assert_eq!(expected, schema);
3642
3643 Ok(())
3644 }
3645
3646 #[test]
3647 fn test_fixed_schema_with_documentation() -> TestResult {
3648 let schema = Schema::parse_str(
3649 r#"{"type": "fixed", "name": "test", "size": 16, "doc": "FixedSchema documentation"}"#,
3650 )?;
3651
3652 let expected = Schema::Fixed(FixedSchema {
3653 name: Name::new("test")?,
3654 aliases: None,
3655 doc: Some(String::from("FixedSchema documentation")),
3656 size: 16_usize,
3657 default: None,
3658 attributes: Default::default(),
3659 });
3660
3661 assert_eq!(expected, schema);
3662
3663 Ok(())
3664 }
3665
3666 #[test]
3667 fn test_no_documentation() -> TestResult {
3668 let schema = Schema::parse_str(
3669 r#"{"type": "enum", "name": "Coin", "symbols": ["heads", "tails"]}"#,
3670 )?;
3671
3672 let doc = match schema {
3673 Schema::Enum(EnumSchema { doc, .. }) => doc,
3674 _ => unreachable!(),
3675 };
3676
3677 assert!(doc.is_none());
3678
3679 Ok(())
3680 }
3681
3682 #[test]
3683 fn test_documentation() -> TestResult {
3684 let schema = Schema::parse_str(
3685 r#"{"type": "enum", "name": "Coin", "doc": "Some documentation", "symbols": ["heads", "tails"]}"#,
3686 )?;
3687
3688 let doc = match schema {
3689 Schema::Enum(EnumSchema { doc, .. }) => doc,
3690 _ => None,
3691 };
3692
3693 assert_eq!("Some documentation".to_owned(), doc.unwrap());
3694
3695 Ok(())
3696 }
3697
3698 #[test]
3701 fn test_schema_is_send() {
3702 fn send<S: Send>(_s: S) {}
3703
3704 let schema = Schema::Null;
3705 send(schema);
3706 }
3707
3708 #[test]
3709 fn test_schema_is_sync() {
3710 fn sync<S: Sync>(_s: S) {}
3711
3712 let schema = Schema::Null;
3713 sync(&schema);
3714 sync(schema);
3715 }
3716
3717 #[test]
3718 fn test_schema_fingerprint() -> TestResult {
3719 use crate::rabin::Rabin;
3720 use md5::Md5;
3721 use sha2::Sha256;
3722
3723 let raw_schema = r#"
3724 {
3725 "type": "record",
3726 "name": "test",
3727 "fields": [
3728 {"name": "a", "type": "long", "default": 42},
3729 {"name": "b", "type": "string"},
3730 {"name": "c", "type": "long", "logicalType": "timestamp-micros"}
3731 ]
3732 }
3733"#;
3734
3735 let schema = Schema::parse_str(raw_schema)?;
3736 assert_eq!(
3737 "7eb3b28d73dfc99bdd9af1848298b40804a2f8ad5d2642be2ecc2ad34842b987",
3738 format!("{}", schema.fingerprint::<Sha256>())
3739 );
3740
3741 assert_eq!(
3742 "cb11615e412ee5d872620d8df78ff6ae",
3743 format!("{}", schema.fingerprint::<Md5>())
3744 );
3745 assert_eq!(
3746 "92f2ccef718c6754",
3747 format!("{}", schema.fingerprint::<Rabin>())
3748 );
3749
3750 Ok(())
3751 }
3752
3753 #[test]
3754 fn test_logical_types() -> TestResult {
3755 let schema = Schema::parse_str(r#"{"type": "int", "logicalType": "date"}"#)?;
3756 assert_eq!(schema, Schema::Date);
3757
3758 let schema = Schema::parse_str(r#"{"type": "long", "logicalType": "timestamp-micros"}"#)?;
3759 assert_eq!(schema, Schema::TimestampMicros);
3760
3761 Ok(())
3762 }
3763
3764 #[test]
3765 fn test_nullable_logical_type() -> TestResult {
3766 let schema = Schema::parse_str(
3767 r#"{"type": ["null", {"type": "long", "logicalType": "timestamp-micros"}]}"#,
3768 )?;
3769 assert_eq!(
3770 schema,
3771 Schema::Union(UnionSchema::new(vec![
3772 Schema::Null,
3773 Schema::TimestampMicros,
3774 ])?)
3775 );
3776
3777 Ok(())
3778 }
3779
3780 #[test]
3781 fn record_field_order_from_str() -> TestResult {
3782 use std::str::FromStr;
3783
3784 assert_eq!(
3785 RecordFieldOrder::from_str("ascending").unwrap(),
3786 RecordFieldOrder::Ascending
3787 );
3788 assert_eq!(
3789 RecordFieldOrder::from_str("descending").unwrap(),
3790 RecordFieldOrder::Descending
3791 );
3792 assert_eq!(
3793 RecordFieldOrder::from_str("ignore").unwrap(),
3794 RecordFieldOrder::Ignore
3795 );
3796 assert!(RecordFieldOrder::from_str("not an ordering").is_err());
3797
3798 Ok(())
3799 }
3800
3801 #[test]
3802 fn test_avro_3374_preserve_namespace_for_primitive() -> TestResult {
3803 let schema = Schema::parse_str(
3804 r#"
3805 {
3806 "type" : "record",
3807 "name" : "ns.int",
3808 "fields" : [
3809 {"name" : "value", "type" : "int"},
3810 {"name" : "next", "type" : [ "null", "ns.int" ]}
3811 ]
3812 }
3813 "#,
3814 )?;
3815
3816 let json = schema.canonical_form();
3817 assert_eq!(
3818 json,
3819 r#"{"name":"ns.int","type":"record","fields":[{"name":"value","type":"int"},{"name":"next","type":["null","ns.int"]}]}"#
3820 );
3821
3822 Ok(())
3823 }
3824
3825 #[test]
3826 fn test_avro_3433_preserve_schema_refs_in_json() -> TestResult {
3827 let schema = r#"
3828 {
3829 "name": "test.test",
3830 "type": "record",
3831 "fields": [
3832 {
3833 "name": "bar",
3834 "type": { "name": "test.foo", "type": "record", "fields": [{ "name": "id", "type": "long" }] }
3835 },
3836 { "name": "baz", "type": "test.foo" }
3837 ]
3838 }
3839 "#;
3840
3841 let schema = Schema::parse_str(schema)?;
3842
3843 let expected = r#"{"name":"test.test","type":"record","fields":[{"name":"bar","type":{"name":"test.foo","type":"record","fields":[{"name":"id","type":"long"}]}},{"name":"baz","type":"test.foo"}]}"#;
3844 assert_eq!(schema.canonical_form(), expected);
3845
3846 Ok(())
3847 }
3848
3849 #[test]
3850 fn test_read_namespace_from_name() -> TestResult {
3851 let schema = r#"
3852 {
3853 "name": "space.name",
3854 "type": "record",
3855 "fields": [
3856 {
3857 "name": "num",
3858 "type": "int"
3859 }
3860 ]
3861 }
3862 "#;
3863
3864 let schema = Schema::parse_str(schema)?;
3865 if let Schema::Record(RecordSchema { name, .. }) = schema {
3866 assert_eq!(name.name, "name");
3867 assert_eq!(name.namespace, Some("space".to_string()));
3868 } else {
3869 panic!("Expected a record schema!");
3870 }
3871
3872 Ok(())
3873 }
3874
3875 #[test]
3876 fn test_namespace_from_name_has_priority_over_from_field() -> TestResult {
3877 let schema = r#"
3878 {
3879 "name": "space1.name",
3880 "namespace": "space2",
3881 "type": "record",
3882 "fields": [
3883 {
3884 "name": "num",
3885 "type": "int"
3886 }
3887 ]
3888 }
3889 "#;
3890
3891 let schema = Schema::parse_str(schema)?;
3892 if let Schema::Record(RecordSchema { name, .. }) = schema {
3893 assert_eq!(name.namespace, Some("space1".to_string()));
3894 } else {
3895 panic!("Expected a record schema!");
3896 }
3897
3898 Ok(())
3899 }
3900
3901 #[test]
3902 fn test_namespace_from_field() -> TestResult {
3903 let schema = r#"
3904 {
3905 "name": "name",
3906 "namespace": "space2",
3907 "type": "record",
3908 "fields": [
3909 {
3910 "name": "num",
3911 "type": "int"
3912 }
3913 ]
3914 }
3915 "#;
3916
3917 let schema = Schema::parse_str(schema)?;
3918 if let Schema::Record(RecordSchema { name, .. }) = schema {
3919 assert_eq!(name.namespace, Some("space2".to_string()));
3920 } else {
3921 panic!("Expected a record schema!");
3922 }
3923
3924 Ok(())
3925 }
3926
3927 #[test]
3928 fn test_namespace_from_name_with_empty_value() -> TestResult {
3930 let name = Name::new(".name")?;
3931 assert_eq!(name.name, "name");
3932 assert_eq!(name.namespace, None);
3933
3934 Ok(())
3935 }
3936
3937 #[test]
3938 fn test_name_with_whitespace_value() {
3940 match Name::new(" ").map_err(Error::into_details) {
3941 Err(Details::InvalidSchemaName(_, _)) => {}
3942 _ => panic!("Expected an Details::InvalidSchemaName!"),
3943 }
3944 }
3945
3946 #[test]
3947 fn test_name_with_no_name_part() {
3949 match Name::new("space.").map_err(Error::into_details) {
3950 Err(Details::InvalidSchemaName(_, _)) => {}
3951 _ => panic!("Expected an Details::InvalidSchemaName!"),
3952 }
3953 }
3954
3955 #[test]
3956 fn avro_3448_test_proper_resolution_inner_record_inherited_namespace() -> TestResult {
3957 let schema = r#"
3958 {
3959 "name": "record_name",
3960 "namespace": "space",
3961 "type": "record",
3962 "fields": [
3963 {
3964 "name": "outer_field_1",
3965 "type": [
3966 "null",
3967 {
3968 "type":"record",
3969 "name":"inner_record_name",
3970 "fields":[
3971 {
3972 "name":"inner_field_1",
3973 "type":"double"
3974 }
3975 ]
3976 }
3977 ]
3978 },
3979 {
3980 "name": "outer_field_2",
3981 "type" : "inner_record_name"
3982 }
3983 ]
3984 }
3985 "#;
3986 let schema = Schema::parse_str(schema)?;
3987 let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
3988 assert_eq!(rs.get_names().len(), 2);
3989 for s in &["space.record_name", "space.inner_record_name"] {
3990 assert!(rs.get_names().contains_key(&Name::new(s)?));
3991 }
3992
3993 Ok(())
3994 }
3995
3996 #[test]
3997 fn avro_3448_test_proper_resolution_inner_record_qualified_namespace() -> TestResult {
3998 let schema = r#"
3999 {
4000 "name": "record_name",
4001 "namespace": "space",
4002 "type": "record",
4003 "fields": [
4004 {
4005 "name": "outer_field_1",
4006 "type": [
4007 "null",
4008 {
4009 "type":"record",
4010 "name":"inner_record_name",
4011 "fields":[
4012 {
4013 "name":"inner_field_1",
4014 "type":"double"
4015 }
4016 ]
4017 }
4018 ]
4019 },
4020 {
4021 "name": "outer_field_2",
4022 "type" : "space.inner_record_name"
4023 }
4024 ]
4025 }
4026 "#;
4027 let schema = Schema::parse_str(schema)?;
4028 let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
4029 assert_eq!(rs.get_names().len(), 2);
4030 for s in &["space.record_name", "space.inner_record_name"] {
4031 assert!(rs.get_names().contains_key(&Name::new(s)?));
4032 }
4033
4034 Ok(())
4035 }
4036
4037 #[test]
4038 fn avro_3448_test_proper_resolution_inner_enum_inherited_namespace() -> TestResult {
4039 let schema = r#"
4040 {
4041 "name": "record_name",
4042 "namespace": "space",
4043 "type": "record",
4044 "fields": [
4045 {
4046 "name": "outer_field_1",
4047 "type": [
4048 "null",
4049 {
4050 "type":"enum",
4051 "name":"inner_enum_name",
4052 "symbols":["Extensive","Testing"]
4053 }
4054 ]
4055 },
4056 {
4057 "name": "outer_field_2",
4058 "type" : "inner_enum_name"
4059 }
4060 ]
4061 }
4062 "#;
4063 let schema = Schema::parse_str(schema)?;
4064 let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
4065 assert_eq!(rs.get_names().len(), 2);
4066 for s in &["space.record_name", "space.inner_enum_name"] {
4067 assert!(rs.get_names().contains_key(&Name::new(s)?));
4068 }
4069
4070 Ok(())
4071 }
4072
4073 #[test]
4074 fn avro_3448_test_proper_resolution_inner_enum_qualified_namespace() -> TestResult {
4075 let schema = r#"
4076 {
4077 "name": "record_name",
4078 "namespace": "space",
4079 "type": "record",
4080 "fields": [
4081 {
4082 "name": "outer_field_1",
4083 "type": [
4084 "null",
4085 {
4086 "type":"enum",
4087 "name":"inner_enum_name",
4088 "symbols":["Extensive","Testing"]
4089 }
4090 ]
4091 },
4092 {
4093 "name": "outer_field_2",
4094 "type" : "space.inner_enum_name"
4095 }
4096 ]
4097 }
4098 "#;
4099 let schema = Schema::parse_str(schema)?;
4100 let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
4101 assert_eq!(rs.get_names().len(), 2);
4102 for s in &["space.record_name", "space.inner_enum_name"] {
4103 assert!(rs.get_names().contains_key(&Name::new(s)?));
4104 }
4105
4106 Ok(())
4107 }
4108
4109 #[test]
4110 fn avro_3448_test_proper_resolution_inner_fixed_inherited_namespace() -> TestResult {
4111 let schema = r#"
4112 {
4113 "name": "record_name",
4114 "namespace": "space",
4115 "type": "record",
4116 "fields": [
4117 {
4118 "name": "outer_field_1",
4119 "type": [
4120 "null",
4121 {
4122 "type":"fixed",
4123 "name":"inner_fixed_name",
4124 "size": 16
4125 }
4126 ]
4127 },
4128 {
4129 "name": "outer_field_2",
4130 "type" : "inner_fixed_name"
4131 }
4132 ]
4133 }
4134 "#;
4135 let schema = Schema::parse_str(schema)?;
4136 let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
4137 assert_eq!(rs.get_names().len(), 2);
4138 for s in &["space.record_name", "space.inner_fixed_name"] {
4139 assert!(rs.get_names().contains_key(&Name::new(s)?));
4140 }
4141
4142 Ok(())
4143 }
4144
4145 #[test]
4146 fn avro_3448_test_proper_resolution_inner_fixed_qualified_namespace() -> TestResult {
4147 let schema = r#"
4148 {
4149 "name": "record_name",
4150 "namespace": "space",
4151 "type": "record",
4152 "fields": [
4153 {
4154 "name": "outer_field_1",
4155 "type": [
4156 "null",
4157 {
4158 "type":"fixed",
4159 "name":"inner_fixed_name",
4160 "size": 16
4161 }
4162 ]
4163 },
4164 {
4165 "name": "outer_field_2",
4166 "type" : "space.inner_fixed_name"
4167 }
4168 ]
4169 }
4170 "#;
4171 let schema = Schema::parse_str(schema)?;
4172 let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
4173 assert_eq!(rs.get_names().len(), 2);
4174 for s in &["space.record_name", "space.inner_fixed_name"] {
4175 assert!(rs.get_names().contains_key(&Name::new(s)?));
4176 }
4177
4178 Ok(())
4179 }
4180
4181 #[test]
4182 fn avro_3448_test_proper_resolution_inner_record_inner_namespace() -> TestResult {
4183 let schema = r#"
4184 {
4185 "name": "record_name",
4186 "namespace": "space",
4187 "type": "record",
4188 "fields": [
4189 {
4190 "name": "outer_field_1",
4191 "type": [
4192 "null",
4193 {
4194 "type":"record",
4195 "name":"inner_record_name",
4196 "namespace":"inner_space",
4197 "fields":[
4198 {
4199 "name":"inner_field_1",
4200 "type":"double"
4201 }
4202 ]
4203 }
4204 ]
4205 },
4206 {
4207 "name": "outer_field_2",
4208 "type" : "inner_space.inner_record_name"
4209 }
4210 ]
4211 }
4212 "#;
4213 let schema = Schema::parse_str(schema)?;
4214 let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
4215 assert_eq!(rs.get_names().len(), 2);
4216 for s in &["space.record_name", "inner_space.inner_record_name"] {
4217 assert!(rs.get_names().contains_key(&Name::new(s)?));
4218 }
4219
4220 Ok(())
4221 }
4222
4223 #[test]
4224 fn avro_3448_test_proper_resolution_inner_enum_inner_namespace() -> TestResult {
4225 let schema = r#"
4226 {
4227 "name": "record_name",
4228 "namespace": "space",
4229 "type": "record",
4230 "fields": [
4231 {
4232 "name": "outer_field_1",
4233 "type": [
4234 "null",
4235 {
4236 "type":"enum",
4237 "name":"inner_enum_name",
4238 "namespace": "inner_space",
4239 "symbols":["Extensive","Testing"]
4240 }
4241 ]
4242 },
4243 {
4244 "name": "outer_field_2",
4245 "type" : "inner_space.inner_enum_name"
4246 }
4247 ]
4248 }
4249 "#;
4250 let schema = Schema::parse_str(schema)?;
4251 let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
4252 assert_eq!(rs.get_names().len(), 2);
4253 for s in &["space.record_name", "inner_space.inner_enum_name"] {
4254 assert!(rs.get_names().contains_key(&Name::new(s)?));
4255 }
4256
4257 Ok(())
4258 }
4259
4260 #[test]
4261 fn avro_3448_test_proper_resolution_inner_fixed_inner_namespace() -> TestResult {
4262 let schema = r#"
4263 {
4264 "name": "record_name",
4265 "namespace": "space",
4266 "type": "record",
4267 "fields": [
4268 {
4269 "name": "outer_field_1",
4270 "type": [
4271 "null",
4272 {
4273 "type":"fixed",
4274 "name":"inner_fixed_name",
4275 "namespace": "inner_space",
4276 "size": 16
4277 }
4278 ]
4279 },
4280 {
4281 "name": "outer_field_2",
4282 "type" : "inner_space.inner_fixed_name"
4283 }
4284 ]
4285 }
4286 "#;
4287 let schema = Schema::parse_str(schema)?;
4288 let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
4289 assert_eq!(rs.get_names().len(), 2);
4290 for s in &["space.record_name", "inner_space.inner_fixed_name"] {
4291 assert!(rs.get_names().contains_key(&Name::new(s)?));
4292 }
4293
4294 Ok(())
4295 }
4296
4297 #[test]
4298 fn avro_3448_test_proper_multi_level_resolution_inner_record_outer_namespace() -> TestResult {
4299 let schema = r#"
4300 {
4301 "name": "record_name",
4302 "namespace": "space",
4303 "type": "record",
4304 "fields": [
4305 {
4306 "name": "outer_field_1",
4307 "type": [
4308 "null",
4309 {
4310 "type":"record",
4311 "name":"middle_record_name",
4312 "fields":[
4313 {
4314 "name":"middle_field_1",
4315 "type":[
4316 "null",
4317 {
4318 "type":"record",
4319 "name":"inner_record_name",
4320 "fields":[
4321 {
4322 "name":"inner_field_1",
4323 "type":"double"
4324 }
4325 ]
4326 }
4327 ]
4328 }
4329 ]
4330 }
4331 ]
4332 },
4333 {
4334 "name": "outer_field_2",
4335 "type" : "space.inner_record_name"
4336 }
4337 ]
4338 }
4339 "#;
4340 let schema = Schema::parse_str(schema)?;
4341 let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
4342 assert_eq!(rs.get_names().len(), 3);
4343 for s in &[
4344 "space.record_name",
4345 "space.middle_record_name",
4346 "space.inner_record_name",
4347 ] {
4348 assert!(rs.get_names().contains_key(&Name::new(s)?));
4349 }
4350
4351 Ok(())
4352 }
4353
4354 #[test]
4355 fn avro_3448_test_proper_multi_level_resolution_inner_record_middle_namespace() -> TestResult {
4356 let schema = r#"
4357 {
4358 "name": "record_name",
4359 "namespace": "space",
4360 "type": "record",
4361 "fields": [
4362 {
4363 "name": "outer_field_1",
4364 "type": [
4365 "null",
4366 {
4367 "type":"record",
4368 "name":"middle_record_name",
4369 "namespace":"middle_namespace",
4370 "fields":[
4371 {
4372 "name":"middle_field_1",
4373 "type":[
4374 "null",
4375 {
4376 "type":"record",
4377 "name":"inner_record_name",
4378 "fields":[
4379 {
4380 "name":"inner_field_1",
4381 "type":"double"
4382 }
4383 ]
4384 }
4385 ]
4386 }
4387 ]
4388 }
4389 ]
4390 },
4391 {
4392 "name": "outer_field_2",
4393 "type" : "middle_namespace.inner_record_name"
4394 }
4395 ]
4396 }
4397 "#;
4398 let schema = Schema::parse_str(schema)?;
4399 let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
4400 assert_eq!(rs.get_names().len(), 3);
4401 for s in &[
4402 "space.record_name",
4403 "middle_namespace.middle_record_name",
4404 "middle_namespace.inner_record_name",
4405 ] {
4406 assert!(rs.get_names().contains_key(&Name::new(s)?));
4407 }
4408
4409 Ok(())
4410 }
4411
4412 #[test]
4413 fn avro_3448_test_proper_multi_level_resolution_inner_record_inner_namespace() -> TestResult {
4414 let schema = r#"
4415 {
4416 "name": "record_name",
4417 "namespace": "space",
4418 "type": "record",
4419 "fields": [
4420 {
4421 "name": "outer_field_1",
4422 "type": [
4423 "null",
4424 {
4425 "type":"record",
4426 "name":"middle_record_name",
4427 "namespace":"middle_namespace",
4428 "fields":[
4429 {
4430 "name":"middle_field_1",
4431 "type":[
4432 "null",
4433 {
4434 "type":"record",
4435 "name":"inner_record_name",
4436 "namespace":"inner_namespace",
4437 "fields":[
4438 {
4439 "name":"inner_field_1",
4440 "type":"double"
4441 }
4442 ]
4443 }
4444 ]
4445 }
4446 ]
4447 }
4448 ]
4449 },
4450 {
4451 "name": "outer_field_2",
4452 "type" : "inner_namespace.inner_record_name"
4453 }
4454 ]
4455 }
4456 "#;
4457 let schema = Schema::parse_str(schema)?;
4458 let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
4459 assert_eq!(rs.get_names().len(), 3);
4460 for s in &[
4461 "space.record_name",
4462 "middle_namespace.middle_record_name",
4463 "inner_namespace.inner_record_name",
4464 ] {
4465 assert!(rs.get_names().contains_key(&Name::new(s)?));
4466 }
4467
4468 Ok(())
4469 }
4470
4471 #[test]
4472 fn avro_3448_test_proper_in_array_resolution_inherited_namespace() -> TestResult {
4473 let schema = r#"
4474 {
4475 "name": "record_name",
4476 "namespace": "space",
4477 "type": "record",
4478 "fields": [
4479 {
4480 "name": "outer_field_1",
4481 "type": {
4482 "type":"array",
4483 "items":{
4484 "type":"record",
4485 "name":"in_array_record",
4486 "fields": [
4487 {
4488 "name":"array_record_field",
4489 "type":"string"
4490 }
4491 ]
4492 }
4493 }
4494 },
4495 {
4496 "name":"outer_field_2",
4497 "type":"in_array_record"
4498 }
4499 ]
4500 }
4501 "#;
4502 let schema = Schema::parse_str(schema)?;
4503 let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
4504 assert_eq!(rs.get_names().len(), 2);
4505 for s in &["space.record_name", "space.in_array_record"] {
4506 assert!(rs.get_names().contains_key(&Name::new(s)?));
4507 }
4508
4509 Ok(())
4510 }
4511
4512 #[test]
4513 fn avro_3448_test_proper_in_map_resolution_inherited_namespace() -> TestResult {
4514 let schema = r#"
4515 {
4516 "name": "record_name",
4517 "namespace": "space",
4518 "type": "record",
4519 "fields": [
4520 {
4521 "name": "outer_field_1",
4522 "type": {
4523 "type":"map",
4524 "values":{
4525 "type":"record",
4526 "name":"in_map_record",
4527 "fields": [
4528 {
4529 "name":"map_record_field",
4530 "type":"string"
4531 }
4532 ]
4533 }
4534 }
4535 },
4536 {
4537 "name":"outer_field_2",
4538 "type":"in_map_record"
4539 }
4540 ]
4541 }
4542 "#;
4543 let schema = Schema::parse_str(schema)?;
4544 let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
4545 assert_eq!(rs.get_names().len(), 2);
4546 for s in &["space.record_name", "space.in_map_record"] {
4547 assert!(rs.get_names().contains_key(&Name::new(s)?));
4548 }
4549
4550 Ok(())
4551 }
4552
4553 #[test]
4554 fn avro_3466_test_to_json_inner_enum_inner_namespace() -> TestResult {
4555 let schema = r#"
4556 {
4557 "name": "record_name",
4558 "namespace": "space",
4559 "type": "record",
4560 "fields": [
4561 {
4562 "name": "outer_field_1",
4563 "type": [
4564 "null",
4565 {
4566 "type":"enum",
4567 "name":"inner_enum_name",
4568 "namespace": "inner_space",
4569 "symbols":["Extensive","Testing"]
4570 }
4571 ]
4572 },
4573 {
4574 "name": "outer_field_2",
4575 "type" : "inner_space.inner_enum_name"
4576 }
4577 ]
4578 }
4579 "#;
4580 let schema = Schema::parse_str(schema)?;
4581 let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
4582
4583 assert_eq!(rs.get_names().len(), 2);
4585 for s in &["space.record_name", "inner_space.inner_enum_name"] {
4586 assert!(rs.get_names().contains_key(&Name::new(s)?));
4587 }
4588
4589 let schema_str = serde_json::to_string(&schema).expect("test failed");
4591 let _schema = Schema::parse_str(&schema_str).expect("test failed");
4592 assert_eq!(schema, _schema);
4593
4594 Ok(())
4595 }
4596
4597 #[test]
4598 fn avro_3466_test_to_json_inner_fixed_inner_namespace() -> TestResult {
4599 let schema = r#"
4600 {
4601 "name": "record_name",
4602 "namespace": "space",
4603 "type": "record",
4604 "fields": [
4605 {
4606 "name": "outer_field_1",
4607 "type": [
4608 "null",
4609 {
4610 "type":"fixed",
4611 "name":"inner_fixed_name",
4612 "namespace": "inner_space",
4613 "size":54
4614 }
4615 ]
4616 },
4617 {
4618 "name": "outer_field_2",
4619 "type" : "inner_space.inner_fixed_name"
4620 }
4621 ]
4622 }
4623 "#;
4624 let schema = Schema::parse_str(schema)?;
4625 let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
4626
4627 assert_eq!(rs.get_names().len(), 2);
4629 for s in &["space.record_name", "inner_space.inner_fixed_name"] {
4630 assert!(rs.get_names().contains_key(&Name::new(s)?));
4631 }
4632
4633 let schema_str = serde_json::to_string(&schema).expect("test failed");
4635 let _schema = Schema::parse_str(&schema_str).expect("test failed");
4636 assert_eq!(schema, _schema);
4637
4638 Ok(())
4639 }
4640
4641 fn assert_avro_3512_aliases(aliases: &Aliases) {
4642 match aliases {
4643 Some(aliases) => {
4644 assert_eq!(aliases.len(), 3);
4645 assert_eq!(aliases[0], Alias::new("space.b").unwrap());
4646 assert_eq!(aliases[1], Alias::new("x.y").unwrap());
4647 assert_eq!(aliases[2], Alias::new(".c").unwrap());
4648 }
4649 None => {
4650 panic!("'aliases' must be Some");
4651 }
4652 }
4653 }
4654
4655 #[test]
4656 fn avro_3512_alias_with_null_namespace_record() -> TestResult {
4657 let schema = Schema::parse_str(
4658 r#"
4659 {
4660 "type": "record",
4661 "name": "a",
4662 "namespace": "space",
4663 "aliases": ["b", "x.y", ".c"],
4664 "fields" : [
4665 {"name": "time", "type": "long"}
4666 ]
4667 }
4668 "#,
4669 )?;
4670
4671 if let Schema::Record(RecordSchema { ref aliases, .. }) = schema {
4672 assert_avro_3512_aliases(aliases);
4673 } else {
4674 panic!("The Schema should be a record: {schema:?}");
4675 }
4676
4677 Ok(())
4678 }
4679
4680 #[test]
4681 fn avro_3512_alias_with_null_namespace_enum() -> TestResult {
4682 let schema = Schema::parse_str(
4683 r#"
4684 {
4685 "type": "enum",
4686 "name": "a",
4687 "namespace": "space",
4688 "aliases": ["b", "x.y", ".c"],
4689 "symbols" : [
4690 "symbol1", "symbol2"
4691 ]
4692 }
4693 "#,
4694 )?;
4695
4696 if let Schema::Enum(EnumSchema { ref aliases, .. }) = schema {
4697 assert_avro_3512_aliases(aliases);
4698 } else {
4699 panic!("The Schema should be an enum: {schema:?}");
4700 }
4701
4702 Ok(())
4703 }
4704
4705 #[test]
4706 fn avro_3512_alias_with_null_namespace_fixed() -> TestResult {
4707 let schema = Schema::parse_str(
4708 r#"
4709 {
4710 "type": "fixed",
4711 "name": "a",
4712 "namespace": "space",
4713 "aliases": ["b", "x.y", ".c"],
4714 "size" : 12
4715 }
4716 "#,
4717 )?;
4718
4719 if let Schema::Fixed(FixedSchema { ref aliases, .. }) = schema {
4720 assert_avro_3512_aliases(aliases);
4721 } else {
4722 panic!("The Schema should be a fixed: {schema:?}");
4723 }
4724
4725 Ok(())
4726 }
4727
4728 #[test]
4729 fn avro_3518_serialize_aliases_record() -> TestResult {
4730 let schema = Schema::parse_str(
4731 r#"
4732 {
4733 "type": "record",
4734 "name": "a",
4735 "namespace": "space",
4736 "aliases": ["b", "x.y", ".c"],
4737 "fields" : [
4738 {
4739 "name": "time",
4740 "type": "long",
4741 "doc": "The documentation is not serialized",
4742 "default": 123,
4743 "aliases": ["time1", "ns.time2"]
4744 }
4745 ]
4746 }
4747 "#,
4748 )?;
4749
4750 let value = serde_json::to_value(&schema)?;
4751 let serialized = serde_json::to_string(&value)?;
4752 assert_eq!(
4753 r#"{"aliases":["space.b","x.y","c"],"fields":[{"aliases":["time1","ns.time2"],"default":123,"name":"time","type":"long"}],"name":"a","namespace":"space","type":"record"}"#,
4754 &serialized
4755 );
4756 assert_eq!(schema, Schema::parse_str(&serialized)?);
4757
4758 Ok(())
4759 }
4760
4761 #[test]
4762 fn avro_3518_serialize_aliases_enum() -> TestResult {
4763 let schema = Schema::parse_str(
4764 r#"
4765 {
4766 "type": "enum",
4767 "name": "a",
4768 "namespace": "space",
4769 "aliases": ["b", "x.y", ".c"],
4770 "symbols" : [
4771 "symbol1", "symbol2"
4772 ]
4773 }
4774 "#,
4775 )?;
4776
4777 let value = serde_json::to_value(&schema)?;
4778 let serialized = serde_json::to_string(&value)?;
4779 assert_eq!(
4780 r#"{"aliases":["space.b","x.y","c"],"name":"a","namespace":"space","symbols":["symbol1","symbol2"],"type":"enum"}"#,
4781 &serialized
4782 );
4783 assert_eq!(schema, Schema::parse_str(&serialized)?);
4784
4785 Ok(())
4786 }
4787
4788 #[test]
4789 fn avro_3518_serialize_aliases_fixed() -> TestResult {
4790 let schema = Schema::parse_str(
4791 r#"
4792 {
4793 "type": "fixed",
4794 "name": "a",
4795 "namespace": "space",
4796 "aliases": ["b", "x.y", ".c"],
4797 "size" : 12
4798 }
4799 "#,
4800 )?;
4801
4802 let value = serde_json::to_value(&schema)?;
4803 let serialized = serde_json::to_string(&value)?;
4804 assert_eq!(
4805 r#"{"aliases":["space.b","x.y","c"],"name":"a","namespace":"space","size":12,"type":"fixed"}"#,
4806 &serialized
4807 );
4808 assert_eq!(schema, Schema::parse_str(&serialized)?);
4809
4810 Ok(())
4811 }
4812
4813 #[test]
4814 fn avro_3130_parse_anonymous_union_type() -> TestResult {
4815 let schema_str = r#"
4816 {
4817 "type": "record",
4818 "name": "AccountEvent",
4819 "fields": [
4820 {"type":
4821 ["null",
4822 { "name": "accountList",
4823 "type": {
4824 "type": "array",
4825 "items": "long"
4826 }
4827 }
4828 ],
4829 "name":"NullableLongArray"
4830 }
4831 ]
4832 }
4833 "#;
4834 let schema = Schema::parse_str(schema_str)?;
4835
4836 if let Schema::Record(RecordSchema { name, fields, .. }) = schema {
4837 assert_eq!(name, Name::new("AccountEvent")?);
4838
4839 let field = &fields[0];
4840 assert_eq!(&field.name, "NullableLongArray");
4841
4842 if let Schema::Union(ref union) = field.schema {
4843 assert_eq!(union.schemas[0], Schema::Null);
4844
4845 if let Schema::Array(ref array_schema) = union.schemas[1] {
4846 if let Schema::Long = *array_schema.items {
4847 } else {
4849 panic!("Expected a Schema::Array of type Long");
4850 }
4851 } else {
4852 panic!("Expected Schema::Array");
4853 }
4854 } else {
4855 panic!("Expected Schema::Union");
4856 }
4857 } else {
4858 panic!("Expected Schema::Record");
4859 }
4860
4861 Ok(())
4862 }
4863
4864 #[test]
4865 fn avro_custom_attributes_schema_without_attributes() -> TestResult {
4866 let schemata_str = [
4867 r#"
4868 {
4869 "type": "record",
4870 "name": "Rec",
4871 "doc": "A Record schema without custom attributes",
4872 "fields": []
4873 }
4874 "#,
4875 r#"
4876 {
4877 "type": "enum",
4878 "name": "Enum",
4879 "doc": "An Enum schema without custom attributes",
4880 "symbols": []
4881 }
4882 "#,
4883 r#"
4884 {
4885 "type": "fixed",
4886 "name": "Fixed",
4887 "doc": "A Fixed schema without custom attributes",
4888 "size": 0
4889 }
4890 "#,
4891 ];
4892 for schema_str in schemata_str.iter() {
4893 let schema = Schema::parse_str(schema_str)?;
4894 assert_eq!(schema.custom_attributes(), Some(&Default::default()));
4895 }
4896
4897 Ok(())
4898 }
4899
4900 const CUSTOM_ATTRS_SUFFIX: &str = r#"
4901 "string_key": "value",
4902 "number_key": 1.23,
4903 "null_key": null,
4904 "array_key": [1, 2, 3],
4905 "object_key": {
4906 "key": "value"
4907 }
4908 "#;
4909
4910 #[test]
4911 fn avro_3609_custom_attributes_schema_with_attributes() -> TestResult {
4912 let schemata_str = [
4913 r#"
4914 {
4915 "type": "record",
4916 "name": "Rec",
4917 "namespace": "ns",
4918 "doc": "A Record schema with custom attributes",
4919 "fields": [],
4920 {{{}}}
4921 }
4922 "#,
4923 r#"
4924 {
4925 "type": "enum",
4926 "name": "Enum",
4927 "namespace": "ns",
4928 "doc": "An Enum schema with custom attributes",
4929 "symbols": [],
4930 {{{}}}
4931 }
4932 "#,
4933 r#"
4934 {
4935 "type": "fixed",
4936 "name": "Fixed",
4937 "namespace": "ns",
4938 "doc": "A Fixed schema with custom attributes",
4939 "size": 2,
4940 {{{}}}
4941 }
4942 "#,
4943 ];
4944
4945 for schema_str in schemata_str.iter() {
4946 let schema = Schema::parse_str(
4947 schema_str
4948 .to_owned()
4949 .replace("{{{}}}", CUSTOM_ATTRS_SUFFIX)
4950 .as_str(),
4951 )?;
4952
4953 assert_eq!(
4954 schema.custom_attributes(),
4955 Some(&expected_custom_attributes())
4956 );
4957 }
4958
4959 Ok(())
4960 }
4961
4962 fn expected_custom_attributes() -> BTreeMap<String, Value> {
4963 let mut expected_attributes: BTreeMap<String, Value> = Default::default();
4964 expected_attributes.insert("string_key".to_string(), Value::String("value".to_string()));
4965 expected_attributes.insert("number_key".to_string(), json!(1.23));
4966 expected_attributes.insert("null_key".to_string(), Value::Null);
4967 expected_attributes.insert(
4968 "array_key".to_string(),
4969 Value::Array(vec![json!(1), json!(2), json!(3)]),
4970 );
4971 let mut object_value: HashMap<String, Value> = HashMap::new();
4972 object_value.insert("key".to_string(), Value::String("value".to_string()));
4973 expected_attributes.insert("object_key".to_string(), json!(object_value));
4974 expected_attributes
4975 }
4976
4977 #[test]
4978 fn avro_3609_custom_attributes_record_field_without_attributes() -> TestResult {
4979 let schema_str = String::from(
4980 r#"
4981 {
4982 "type": "record",
4983 "name": "Rec",
4984 "doc": "A Record schema without custom attributes",
4985 "fields": [
4986 {
4987 "name": "field_one",
4988 "type": "float",
4989 {{{}}}
4990 }
4991 ]
4992 }
4993 "#,
4994 );
4995
4996 let schema = Schema::parse_str(schema_str.replace("{{{}}}", CUSTOM_ATTRS_SUFFIX).as_str())?;
4997
4998 match schema {
4999 Schema::Record(RecordSchema { name, fields, .. }) => {
5000 assert_eq!(name, Name::new("Rec")?);
5001 assert_eq!(fields.len(), 1);
5002 let field = &fields[0];
5003 assert_eq!(&field.name, "field_one");
5004 assert_eq!(field.custom_attributes, expected_custom_attributes());
5005 }
5006 _ => panic!("Expected Schema::Record"),
5007 }
5008
5009 Ok(())
5010 }
5011
5012 #[test]
5013 fn avro_3625_null_is_first() -> TestResult {
5014 let schema_str = String::from(
5015 r#"
5016 {
5017 "type": "record",
5018 "name": "union_schema_test",
5019 "fields": [
5020 {"name": "a", "type": ["null", "long"], "default": null}
5021 ]
5022 }
5023 "#,
5024 );
5025
5026 let schema = Schema::parse_str(&schema_str)?;
5027
5028 match schema {
5029 Schema::Record(RecordSchema { name, fields, .. }) => {
5030 assert_eq!(name, Name::new("union_schema_test")?);
5031 assert_eq!(fields.len(), 1);
5032 let field = &fields[0];
5033 assert_eq!(&field.name, "a");
5034 assert_eq!(&field.default, &Some(Value::Null));
5035 match &field.schema {
5036 Schema::Union(union) => {
5037 assert_eq!(union.variants().len(), 2);
5038 assert!(union.is_nullable());
5039 assert_eq!(union.variants()[0], Schema::Null);
5040 assert_eq!(union.variants()[1], Schema::Long);
5041 }
5042 _ => panic!("Expected Schema::Union"),
5043 }
5044 }
5045 _ => panic!("Expected Schema::Record"),
5046 }
5047
5048 Ok(())
5049 }
5050
5051 #[test]
5052 fn avro_3625_null_is_last() -> TestResult {
5053 let schema_str = String::from(
5054 r#"
5055 {
5056 "type": "record",
5057 "name": "union_schema_test",
5058 "fields": [
5059 {"name": "a", "type": ["long","null"], "default": 123}
5060 ]
5061 }
5062 "#,
5063 );
5064
5065 let schema = Schema::parse_str(&schema_str)?;
5066
5067 match schema {
5068 Schema::Record(RecordSchema { name, fields, .. }) => {
5069 assert_eq!(name, Name::new("union_schema_test")?);
5070 assert_eq!(fields.len(), 1);
5071 let field = &fields[0];
5072 assert_eq!(&field.name, "a");
5073 assert_eq!(&field.default, &Some(json!(123)));
5074 match &field.schema {
5075 Schema::Union(union) => {
5076 assert_eq!(union.variants().len(), 2);
5077 assert_eq!(union.variants()[0], Schema::Long);
5078 assert_eq!(union.variants()[1], Schema::Null);
5079 }
5080 _ => panic!("Expected Schema::Union"),
5081 }
5082 }
5083 _ => panic!("Expected Schema::Record"),
5084 }
5085
5086 Ok(())
5087 }
5088
5089 #[test]
5090 fn avro_3625_null_is_the_middle() -> TestResult {
5091 let schema_str = String::from(
5092 r#"
5093 {
5094 "type": "record",
5095 "name": "union_schema_test",
5096 "fields": [
5097 {"name": "a", "type": ["long","null","int"], "default": 123}
5098 ]
5099 }
5100 "#,
5101 );
5102
5103 let schema = Schema::parse_str(&schema_str)?;
5104
5105 match schema {
5106 Schema::Record(RecordSchema { name, fields, .. }) => {
5107 assert_eq!(name, Name::new("union_schema_test")?);
5108 assert_eq!(fields.len(), 1);
5109 let field = &fields[0];
5110 assert_eq!(&field.name, "a");
5111 assert_eq!(&field.default, &Some(json!(123)));
5112 match &field.schema {
5113 Schema::Union(union) => {
5114 assert_eq!(union.variants().len(), 3);
5115 assert_eq!(union.variants()[0], Schema::Long);
5116 assert_eq!(union.variants()[1], Schema::Null);
5117 assert_eq!(union.variants()[2], Schema::Int);
5118 }
5119 _ => panic!("Expected Schema::Union"),
5120 }
5121 }
5122 _ => panic!("Expected Schema::Record"),
5123 }
5124
5125 Ok(())
5126 }
5127
5128 #[test]
5129 fn avro_3649_default_notintfirst() -> TestResult {
5130 let schema_str = String::from(
5131 r#"
5132 {
5133 "type": "record",
5134 "name": "union_schema_test",
5135 "fields": [
5136 {"name": "a", "type": ["string", "int"], "default": 123}
5137 ]
5138 }
5139 "#,
5140 );
5141
5142 let schema = Schema::parse_str(&schema_str)?;
5143
5144 match schema {
5145 Schema::Record(RecordSchema { name, fields, .. }) => {
5146 assert_eq!(name, Name::new("union_schema_test")?);
5147 assert_eq!(fields.len(), 1);
5148 let field = &fields[0];
5149 assert_eq!(&field.name, "a");
5150 assert_eq!(&field.default, &Some(json!(123)));
5151 match &field.schema {
5152 Schema::Union(union) => {
5153 assert_eq!(union.variants().len(), 2);
5154 assert_eq!(union.variants()[0], Schema::String);
5155 assert_eq!(union.variants()[1], Schema::Int);
5156 }
5157 _ => panic!("Expected Schema::Union"),
5158 }
5159 }
5160 _ => panic!("Expected Schema::Record"),
5161 }
5162
5163 Ok(())
5164 }
5165
5166 #[test]
5167 fn avro_3709_parsing_of_record_field_aliases() -> TestResult {
5168 let schema = r#"
5169 {
5170 "name": "rec",
5171 "type": "record",
5172 "fields": [
5173 {
5174 "name": "num",
5175 "type": "int",
5176 "aliases": ["num1", "num2"]
5177 }
5178 ]
5179 }
5180 "#;
5181
5182 let schema = Schema::parse_str(schema)?;
5183 if let Schema::Record(RecordSchema { fields, .. }) = schema {
5184 let num_field = &fields[0];
5185 assert_eq!(num_field.name, "num");
5186 assert_eq!(num_field.aliases, Some(vec!("num1".into(), "num2".into())));
5187 } else {
5188 panic!("Expected a record schema!");
5189 }
5190
5191 Ok(())
5192 }
5193
5194 #[test]
5195 fn avro_3735_parse_enum_namespace() -> TestResult {
5196 let schema = r#"
5197 {
5198 "type": "record",
5199 "name": "Foo",
5200 "namespace": "name.space",
5201 "fields":
5202 [
5203 {
5204 "name": "barInit",
5205 "type":
5206 {
5207 "type": "enum",
5208 "name": "Bar",
5209 "symbols":
5210 [
5211 "bar0",
5212 "bar1"
5213 ]
5214 }
5215 },
5216 {
5217 "name": "barUse",
5218 "type": "Bar"
5219 }
5220 ]
5221 }
5222 "#;
5223
5224 #[derive(
5225 Debug, PartialEq, Eq, Hash, PartialOrd, Ord, Clone, serde::Deserialize, serde::Serialize,
5226 )]
5227 pub enum Bar {
5228 #[serde(rename = "bar0")]
5229 Bar0,
5230 #[serde(rename = "bar1")]
5231 Bar1,
5232 }
5233
5234 #[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize)]
5235 pub struct Foo {
5236 #[serde(rename = "barInit")]
5237 pub bar_init: Bar,
5238 #[serde(rename = "barUse")]
5239 pub bar_use: Bar,
5240 }
5241
5242 let schema = Schema::parse_str(schema)?;
5243
5244 let foo = Foo {
5245 bar_init: Bar::Bar0,
5246 bar_use: Bar::Bar1,
5247 };
5248
5249 let avro_value = crate::to_value(foo)?;
5250 assert!(avro_value.validate(&schema));
5251
5252 let mut writer = crate::Writer::new(&schema, Vec::new())?;
5253
5254 writer.append(avro_value)?;
5256
5257 Ok(())
5258 }
5259
5260 #[test]
5261 fn avro_3755_deserialize() -> TestResult {
5262 #[derive(
5263 Debug, PartialEq, Eq, Hash, PartialOrd, Ord, Clone, serde::Deserialize, serde::Serialize,
5264 )]
5265 pub enum Bar {
5266 #[serde(rename = "bar0")]
5267 Bar0,
5268 #[serde(rename = "bar1")]
5269 Bar1,
5270 #[serde(rename = "bar2")]
5271 Bar2,
5272 }
5273
5274 #[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize)]
5275 pub struct Foo {
5276 #[serde(rename = "barInit")]
5277 pub bar_init: Bar,
5278 #[serde(rename = "barUse")]
5279 pub bar_use: Bar,
5280 }
5281
5282 let writer_schema = r#"{
5283 "type": "record",
5284 "name": "Foo",
5285 "fields":
5286 [
5287 {
5288 "name": "barInit",
5289 "type":
5290 {
5291 "type": "enum",
5292 "name": "Bar",
5293 "symbols":
5294 [
5295 "bar0",
5296 "bar1"
5297 ]
5298 }
5299 },
5300 {
5301 "name": "barUse",
5302 "type": "Bar"
5303 }
5304 ]
5305 }"#;
5306
5307 let reader_schema = r#"{
5308 "type": "record",
5309 "name": "Foo",
5310 "namespace": "name.space",
5311 "fields":
5312 [
5313 {
5314 "name": "barInit",
5315 "type":
5316 {
5317 "type": "enum",
5318 "name": "Bar",
5319 "symbols":
5320 [
5321 "bar0",
5322 "bar1",
5323 "bar2"
5324 ]
5325 }
5326 },
5327 {
5328 "name": "barUse",
5329 "type": "Bar"
5330 }
5331 ]
5332 }"#;
5333
5334 let writer_schema = Schema::parse_str(writer_schema)?;
5335 let foo = Foo {
5336 bar_init: Bar::Bar0,
5337 bar_use: Bar::Bar1,
5338 };
5339 let avro_value = crate::to_value(foo)?;
5340 assert!(
5341 avro_value.validate(&writer_schema),
5342 "value is valid for schema",
5343 );
5344 let datum = crate::to_avro_datum(&writer_schema, avro_value)?;
5345 let mut x = &datum[..];
5346 let reader_schema = Schema::parse_str(reader_schema)?;
5347 let deser_value = crate::from_avro_datum(&writer_schema, &mut x, Some(&reader_schema))?;
5348 match deser_value {
5349 types::Value::Record(fields) => {
5350 assert_eq!(fields.len(), 2);
5351 assert_eq!(fields[0].0, "barInit");
5352 assert_eq!(fields[0].1, types::Value::Enum(0, "bar0".to_string()));
5353 assert_eq!(fields[1].0, "barUse");
5354 assert_eq!(fields[1].1, types::Value::Enum(1, "bar1".to_string()));
5355 }
5356 _ => panic!("Expected Value::Record"),
5357 }
5358
5359 Ok(())
5360 }
5361
5362 #[test]
5363 fn test_avro_3780_decimal_schema_type_with_fixed() -> TestResult {
5364 let schema = json!(
5365 {
5366 "type": "record",
5367 "name": "recordWithDecimal",
5368 "fields": [
5369 {
5370 "name": "decimal",
5371 "type": "fixed",
5372 "name": "nestedFixed",
5373 "size": 8,
5374 "logicalType": "decimal",
5375 "precision": 4
5376 }
5377 ]
5378 });
5379
5380 let parse_result = Schema::parse(&schema);
5381 assert!(
5382 parse_result.is_ok(),
5383 "parse result must be ok, got: {parse_result:?}"
5384 );
5385
5386 Ok(())
5387 }
5388
5389 #[test]
5390 fn test_avro_3772_enum_default_wrong_type() -> TestResult {
5391 let schema = r#"
5392 {
5393 "type": "record",
5394 "name": "test",
5395 "fields": [
5396 {"name": "a", "type": "long", "default": 42},
5397 {"name": "b", "type": "string"},
5398 {
5399 "name": "c",
5400 "type": {
5401 "type": "enum",
5402 "name": "suit",
5403 "symbols": ["diamonds", "spades", "clubs", "hearts"],
5404 "default": 123
5405 }
5406 }
5407 ]
5408 }
5409 "#;
5410
5411 match Schema::parse_str(schema) {
5412 Err(err) => {
5413 assert_eq!(
5414 err.to_string(),
5415 "Default value for enum must be a string! Got: 123"
5416 );
5417 }
5418 _ => panic!("Expected an error"),
5419 }
5420 Ok(())
5421 }
5422
5423 #[test]
5424 fn test_avro_3812_handle_null_namespace_properly() -> TestResult {
5425 let schema_str = r#"
5426 {
5427 "namespace": "",
5428 "type": "record",
5429 "name": "my_schema",
5430 "fields": [
5431 {
5432 "name": "a",
5433 "type": {
5434 "type": "enum",
5435 "name": "my_enum",
5436 "namespace": "",
5437 "symbols": ["a", "b"]
5438 }
5439 }, {
5440 "name": "b",
5441 "type": {
5442 "type": "fixed",
5443 "name": "my_fixed",
5444 "namespace": "",
5445 "size": 10
5446 }
5447 }
5448 ]
5449 }
5450 "#;
5451
5452 let expected = r#"{"name":"my_schema","type":"record","fields":[{"name":"a","type":{"name":"my_enum","type":"enum","symbols":["a","b"]}},{"name":"b","type":{"name":"my_fixed","type":"fixed","size":10}}]}"#;
5453 let schema = Schema::parse_str(schema_str)?;
5454 let canonical_form = schema.canonical_form();
5455 assert_eq!(canonical_form, expected);
5456
5457 let name = Name::new("my_name")?;
5458 let fullname = name.fullname(Some("".to_string()));
5459 assert_eq!(fullname, "my_name");
5460 let qname = name.fully_qualified_name(&Some("".to_string())).to_string();
5461 assert_eq!(qname, "my_name");
5462
5463 Ok(())
5464 }
5465
5466 #[test]
5467 fn test_avro_3818_inherit_enclosing_namespace() -> TestResult {
5468 let schema_str = r#"
5470 {
5471 "namespace": "my_ns",
5472 "type": "record",
5473 "name": "my_schema",
5474 "fields": [
5475 {
5476 "name": "f1",
5477 "type": {
5478 "name": "enum1",
5479 "type": "enum",
5480 "symbols": ["a"]
5481 }
5482 }, {
5483 "name": "f2",
5484 "type": {
5485 "name": "fixed1",
5486 "type": "fixed",
5487 "size": 1
5488 }
5489 }
5490 ]
5491 }
5492 "#;
5493
5494 let expected = r#"{"name":"my_ns.my_schema","type":"record","fields":[{"name":"f1","type":{"name":"my_ns.enum1","type":"enum","symbols":["a"]}},{"name":"f2","type":{"name":"my_ns.fixed1","type":"fixed","size":1}}]}"#;
5495 let schema = Schema::parse_str(schema_str)?;
5496 let canonical_form = schema.canonical_form();
5497 assert_eq!(canonical_form, expected);
5498
5499 let schema_str = r#"
5502 {
5503 "namespace": "my_ns",
5504 "type": "record",
5505 "name": "my_schema",
5506 "fields": [
5507 {
5508 "name": "f1",
5509 "type": {
5510 "name": "enum1",
5511 "type": "enum",
5512 "namespace": "",
5513 "symbols": ["a"]
5514 }
5515 }, {
5516 "name": "f2",
5517 "type": {
5518 "name": "fixed1",
5519 "type": "fixed",
5520 "namespace": "",
5521 "size": 1
5522 }
5523 }
5524 ]
5525 }
5526 "#;
5527
5528 let expected = r#"{"name":"my_ns.my_schema","type":"record","fields":[{"name":"f1","type":{"name":"enum1","type":"enum","symbols":["a"]}},{"name":"f2","type":{"name":"fixed1","type":"fixed","size":1}}]}"#;
5529 let schema = Schema::parse_str(schema_str)?;
5530 let canonical_form = schema.canonical_form();
5531 assert_eq!(canonical_form, expected);
5532
5533 let schema_str = r#"
5535 {
5536 "namespace": "",
5537 "type": "record",
5538 "name": "my_schema",
5539 "fields": [
5540 {
5541 "name": "f1",
5542 "type": {
5543 "name": "enum1",
5544 "type": "enum",
5545 "namespace": "f1.ns",
5546 "symbols": ["a"]
5547 }
5548 }, {
5549 "name": "f2",
5550 "type": {
5551 "name": "f2.ns.fixed1",
5552 "type": "fixed",
5553 "size": 1
5554 }
5555 }
5556 ]
5557 }
5558 "#;
5559
5560 let expected = r#"{"name":"my_schema","type":"record","fields":[{"name":"f1","type":{"name":"f1.ns.enum1","type":"enum","symbols":["a"]}},{"name":"f2","type":{"name":"f2.ns.fixed1","type":"fixed","size":1}}]}"#;
5561 let schema = Schema::parse_str(schema_str)?;
5562 let canonical_form = schema.canonical_form();
5563 assert_eq!(canonical_form, expected);
5564
5565 let schema_str = r#"
5567 {
5568 "type": "record",
5569 "name": "my_ns.my_schema",
5570 "fields": [
5571 {
5572 "name": "f1",
5573 "type": {
5574 "name": "inner_record1",
5575 "type": "record",
5576 "fields": [
5577 {
5578 "name": "f1_1",
5579 "type": {
5580 "name": "enum1",
5581 "type": "enum",
5582 "symbols": ["a"]
5583 }
5584 }
5585 ]
5586 }
5587 }, {
5588 "name": "f2",
5589 "type": {
5590 "name": "inner_record2",
5591 "type": "record",
5592 "namespace": "inner_ns",
5593 "fields": [
5594 {
5595 "name": "f2_1",
5596 "type": {
5597 "name": "enum2",
5598 "type": "enum",
5599 "symbols": ["a"]
5600 }
5601 }
5602 ]
5603 }
5604 }
5605 ]
5606 }
5607 "#;
5608
5609 let expected = r#"{"name":"my_ns.my_schema","type":"record","fields":[{"name":"f1","type":{"name":"my_ns.inner_record1","type":"record","fields":[{"name":"f1_1","type":{"name":"my_ns.enum1","type":"enum","symbols":["a"]}}]}},{"name":"f2","type":{"name":"inner_ns.inner_record2","type":"record","fields":[{"name":"f2_1","type":{"name":"inner_ns.enum2","type":"enum","symbols":["a"]}}]}}]}"#;
5610 let schema = Schema::parse_str(schema_str)?;
5611 let canonical_form = schema.canonical_form();
5612 assert_eq!(canonical_form, expected);
5613
5614 Ok(())
5615 }
5616
5617 #[test]
5618 fn test_avro_3779_bigdecimal_schema() -> TestResult {
5619 let schema = json!(
5620 {
5621 "name": "decimal",
5622 "type": "bytes",
5623 "logicalType": "big-decimal"
5624 }
5625 );
5626
5627 let parse_result = Schema::parse(&schema);
5628 assert!(
5629 parse_result.is_ok(),
5630 "parse result must be ok, got: {parse_result:?}"
5631 );
5632 match parse_result? {
5633 Schema::BigDecimal => (),
5634 other => panic!("Expected Schema::BigDecimal but got: {other:?}"),
5635 }
5636
5637 Ok(())
5638 }
5639
5640 #[test]
5641 fn test_avro_3820_deny_invalid_field_names() -> TestResult {
5642 let schema_str = r#"
5643 {
5644 "name": "my_record",
5645 "type": "record",
5646 "fields": [
5647 {
5648 "name": "f1.x",
5649 "type": {
5650 "name": "my_enum",
5651 "type": "enum",
5652 "symbols": ["a"]
5653 }
5654 }, {
5655 "name": "f2",
5656 "type": {
5657 "name": "my_fixed",
5658 "type": "fixed",
5659 "size": 1
5660 }
5661 }
5662 ]
5663 }
5664 "#;
5665
5666 match Schema::parse_str(schema_str).map_err(Error::into_details) {
5667 Err(Details::FieldName(x)) if x == "f1.x" => Ok(()),
5668 other => Err(format!("Expected Details::FieldName, got {other:?}").into()),
5669 }
5670 }
5671
5672 #[test]
5673 fn test_avro_3827_disallow_duplicate_field_names() -> TestResult {
5674 let schema_str = r#"
5675 {
5676 "name": "my_schema",
5677 "type": "record",
5678 "fields": [
5679 {
5680 "name": "f1",
5681 "type": {
5682 "name": "a",
5683 "type": "record",
5684 "fields": []
5685 }
5686 }, {
5687 "name": "f1",
5688 "type": {
5689 "name": "b",
5690 "type": "record",
5691 "fields": []
5692 }
5693 }
5694 ]
5695 }
5696 "#;
5697
5698 match Schema::parse_str(schema_str).map_err(Error::into_details) {
5699 Err(Details::FieldNameDuplicate(_)) => (),
5700 other => {
5701 return Err(format!("Expected Details::FieldNameDuplicate, got {other:?}").into());
5702 }
5703 };
5704
5705 let schema_str = r#"
5706 {
5707 "name": "my_schema",
5708 "type": "record",
5709 "fields": [
5710 {
5711 "name": "f1",
5712 "type": {
5713 "name": "a",
5714 "type": "record",
5715 "fields": [
5716 {
5717 "name": "f1",
5718 "type": {
5719 "name": "b",
5720 "type": "record",
5721 "fields": []
5722 }
5723 }
5724 ]
5725 }
5726 }
5727 ]
5728 }
5729 "#;
5730
5731 let expected = r#"{"name":"my_schema","type":"record","fields":[{"name":"f1","type":{"name":"a","type":"record","fields":[{"name":"f1","type":{"name":"b","type":"record","fields":[]}}]}}]}"#;
5732 let schema = Schema::parse_str(schema_str)?;
5733 let canonical_form = schema.canonical_form();
5734 assert_eq!(canonical_form, expected);
5735
5736 Ok(())
5737 }
5738
5739 #[test]
5740 fn test_avro_3830_null_namespace_in_fully_qualified_names() -> TestResult {
5741 let schema_str = r#"
5744 {
5745 "name": ".record1",
5746 "namespace": "ns1",
5747 "type": "record",
5748 "fields": [
5749 {
5750 "name": "f1",
5751 "type": {
5752 "name": ".enum1",
5753 "namespace": "ns2",
5754 "type": "enum",
5755 "symbols": ["a"]
5756 }
5757 }, {
5758 "name": "f2",
5759 "type": {
5760 "name": ".fxed1",
5761 "namespace": "ns3",
5762 "type": "fixed",
5763 "size": 1
5764 }
5765 }
5766 ]
5767 }
5768 "#;
5769
5770 let expected = r#"{"name":"record1","type":"record","fields":[{"name":"f1","type":{"name":"enum1","type":"enum","symbols":["a"]}},{"name":"f2","type":{"name":"fxed1","type":"fixed","size":1}}]}"#;
5771 let schema = Schema::parse_str(schema_str)?;
5772 let canonical_form = schema.canonical_form();
5773 assert_eq!(canonical_form, expected);
5774
5775 let schema_str = r#"
5777 {
5778 "name": ".record1",
5779 "namespace": "ns1",
5780 "type": "record",
5781 "fields": [
5782 {
5783 "name": "f1",
5784 "type": {
5785 "name": "enum1",
5786 "type": "enum",
5787 "symbols": ["a"]
5788 }
5789 }, {
5790 "name": "f2",
5791 "type": {
5792 "name": "fxed1",
5793 "type": "fixed",
5794 "size": 1
5795 }
5796 }
5797 ]
5798 }
5799 "#;
5800
5801 let expected = r#"{"name":"record1","type":"record","fields":[{"name":"f1","type":{"name":"enum1","type":"enum","symbols":["a"]}},{"name":"f2","type":{"name":"fxed1","type":"fixed","size":1}}]}"#;
5802 let schema = Schema::parse_str(schema_str)?;
5803 let canonical_form = schema.canonical_form();
5804 assert_eq!(canonical_form, expected);
5805
5806 let name = Name::new(".my_name")?;
5807 let fullname = name.fullname(None);
5808 assert_eq!(fullname, "my_name");
5809 let qname = name.fully_qualified_name(&None).to_string();
5810 assert_eq!(qname, "my_name");
5811
5812 Ok(())
5813 }
5814
5815 #[test]
5816 fn test_avro_3814_schema_resolution_failure() -> TestResult {
5817 let reader_schema = json!(
5819 {
5820 "type": "record",
5821 "name": "MyOuterRecord",
5822 "fields": [
5823 {
5824 "name": "inner_record",
5825 "type": [
5826 "null",
5827 {
5828 "type": "record",
5829 "name": "MyRecord",
5830 "fields": [
5831 {"name": "a", "type": "string"}
5832 ]
5833 }
5834 ],
5835 "default": null
5836 }
5837 ]
5838 }
5839 );
5840
5841 let writer_schema = json!(
5844 {
5845 "type": "record",
5846 "name": "MyOuterRecord",
5847 "fields": [
5848 {
5849 "name": "inner_record",
5850 "type": [
5851 "null",
5852 {
5853 "type": "record",
5854 "name": "MyRecord",
5855 "fields": [
5856 {"name": "a", "type": "string"},
5857 {
5858 "name": "b",
5859 "type": [
5860 "null",
5861 {
5862 "type": "enum",
5863 "name": "MyEnum",
5864 "symbols": ["A", "B", "C"],
5865 "default": "C"
5866 }
5867 ],
5868 "default": null
5869 },
5870 ]
5871 }
5872 ]
5873 }
5874 ],
5875 "default": null
5876 }
5877 );
5878
5879 #[derive(Serialize, Deserialize, Debug)]
5882 struct MyInnerRecordReader {
5883 a: String,
5884 }
5885
5886 #[derive(Serialize, Deserialize, Debug)]
5887 struct MyRecordReader {
5888 inner_record: Option<MyInnerRecordReader>,
5889 }
5890
5891 #[derive(Serialize, Deserialize, Debug)]
5892 enum MyEnum {
5893 A,
5894 B,
5895 C,
5896 }
5897
5898 #[derive(Serialize, Deserialize, Debug)]
5899 struct MyInnerRecordWriter {
5900 a: String,
5901 b: Option<MyEnum>,
5902 }
5903
5904 #[derive(Serialize, Deserialize, Debug)]
5905 struct MyRecordWriter {
5906 inner_record: Option<MyInnerRecordWriter>,
5907 }
5908
5909 let s = MyRecordWriter {
5910 inner_record: Some(MyInnerRecordWriter {
5911 a: "foo".to_string(),
5912 b: None,
5913 }),
5914 };
5915
5916 let writer_schema = Schema::parse(&writer_schema)?;
5918 let avro_value = crate::to_value(s)?;
5919 assert!(
5920 avro_value.validate(&writer_schema),
5921 "value is valid for schema",
5922 );
5923 let datum = crate::to_avro_datum(&writer_schema, avro_value)?;
5924
5925 let reader_schema = Schema::parse(&reader_schema)?;
5927 let mut x = &datum[..];
5928
5929 let deser_value = crate::from_avro_datum(&writer_schema, &mut x, Some(&reader_schema))?;
5931 assert!(deser_value.validate(&reader_schema));
5932
5933 let d: MyRecordReader = crate::from_value(&deser_value)?;
5935 assert_eq!(d.inner_record.unwrap().a, "foo".to_string());
5936 Ok(())
5937 }
5938
5939 #[test]
5940 fn test_avro_3837_disallow_invalid_namespace() -> TestResult {
5941 let schema_str = r#"
5943 {
5944 "name": "record1",
5945 "namespace": "ns1",
5946 "type": "record",
5947 "fields": []
5948 }
5949 "#;
5950
5951 let expected = r#"{"name":"ns1.record1","type":"record","fields":[]}"#;
5952 let schema = Schema::parse_str(schema_str)?;
5953 let canonical_form = schema.canonical_form();
5954 assert_eq!(canonical_form, expected);
5955
5956 let schema_str = r#"
5958 {
5959 "name": "enum1",
5960 "namespace": "ns1.foo.bar",
5961 "type": "enum",
5962 "symbols": ["a"]
5963 }
5964 "#;
5965
5966 let expected = r#"{"name":"ns1.foo.bar.enum1","type":"enum","symbols":["a"]}"#;
5967 let schema = Schema::parse_str(schema_str)?;
5968 let canonical_form = schema.canonical_form();
5969 assert_eq!(canonical_form, expected);
5970
5971 let schema_str = r#"
5973 {
5974 "name": "fixed1",
5975 "namespace": ".ns1.a.b",
5976 "type": "fixed",
5977 "size": 1
5978 }
5979 "#;
5980
5981 match Schema::parse_str(schema_str).map_err(Error::into_details) {
5982 Err(Details::InvalidNamespace(_, _)) => (),
5983 other => {
5984 return Err(format!("Expected Details::InvalidNamespace, got {other:?}").into());
5985 }
5986 };
5987
5988 let schema_str = r#"
5990 {
5991 "name": "record1",
5992 "namespace": "ns1.a*b.c",
5993 "type": "record",
5994 "fields": []
5995 }
5996 "#;
5997
5998 match Schema::parse_str(schema_str).map_err(Error::into_details) {
5999 Err(Details::InvalidNamespace(_, _)) => (),
6000 other => {
6001 return Err(format!("Expected Details::InvalidNamespace, got {other:?}").into());
6002 }
6003 };
6004
6005 let schema_str = r#"
6007 {
6008 "name": "fixed1",
6009 "namespace": "ns1.1a.b",
6010 "type": "fixed",
6011 "size": 1
6012 }
6013 "#;
6014
6015 match Schema::parse_str(schema_str).map_err(Error::into_details) {
6016 Err(Details::InvalidNamespace(_, _)) => (),
6017 other => {
6018 return Err(format!("Expected Details::InvalidNamespace, got {other:?}").into());
6019 }
6020 };
6021
6022 let schema_str = r#"
6024 {
6025 "name": "fixed1",
6026 "namespace": "ns1..a",
6027 "type": "fixed",
6028 "size": 1
6029 }
6030 "#;
6031
6032 match Schema::parse_str(schema_str).map_err(Error::into_details) {
6033 Err(Details::InvalidNamespace(_, _)) => (),
6034 other => {
6035 return Err(format!("Expected Details::InvalidNamespace, got {other:?}").into());
6036 }
6037 };
6038
6039 let schema_str = r#"
6041 {
6042 "name": "fixed1",
6043 "namespace": "ns1.a.",
6044 "type": "fixed",
6045 "size": 1
6046 }
6047 "#;
6048
6049 match Schema::parse_str(schema_str).map_err(Error::into_details) {
6050 Err(Details::InvalidNamespace(_, _)) => (),
6051 other => {
6052 return Err(format!("Expected Details::InvalidNamespace, got {other:?}").into());
6053 }
6054 };
6055
6056 Ok(())
6057 }
6058
6059 #[test]
6060 fn test_avro_3851_validate_default_value_of_simple_record_field() -> TestResult {
6061 let schema_str = r#"
6062 {
6063 "name": "record1",
6064 "namespace": "ns",
6065 "type": "record",
6066 "fields": [
6067 {
6068 "name": "f1",
6069 "type": "int",
6070 "default": "invalid"
6071 }
6072 ]
6073 }
6074 "#;
6075 let expected = Details::GetDefaultRecordField(
6076 "f1".to_string(),
6077 "ns.record1".to_string(),
6078 r#""int""#.to_string(),
6079 )
6080 .to_string();
6081 let result = Schema::parse_str(schema_str);
6082 assert!(result.is_err());
6083 let err = result
6084 .map_err(|e| e.to_string())
6085 .err()
6086 .unwrap_or_else(|| "unexpected".to_string());
6087 assert_eq!(expected, err);
6088
6089 Ok(())
6090 }
6091
6092 #[test]
6093 fn test_avro_3851_validate_default_value_of_nested_record_field() -> TestResult {
6094 let schema_str = r#"
6095 {
6096 "name": "record1",
6097 "namespace": "ns",
6098 "type": "record",
6099 "fields": [
6100 {
6101 "name": "f1",
6102 "type": {
6103 "name": "record2",
6104 "type": "record",
6105 "fields": [
6106 {
6107 "name": "f1_1",
6108 "type": "int"
6109 }
6110 ]
6111 },
6112 "default": "invalid"
6113 }
6114 ]
6115 }
6116 "#;
6117 let expected = Details::GetDefaultRecordField(
6118 "f1".to_string(),
6119 "ns.record1".to_string(),
6120 r#"{"name":"ns.record2","type":"record","fields":[{"name":"f1_1","type":"int"}]}"#
6121 .to_string(),
6122 )
6123 .to_string();
6124 let result = Schema::parse_str(schema_str);
6125 assert!(result.is_err());
6126 let err = result
6127 .map_err(|e| e.to_string())
6128 .err()
6129 .unwrap_or_else(|| "unexpected".to_string());
6130 assert_eq!(expected, err);
6131
6132 Ok(())
6133 }
6134
6135 #[test]
6136 fn test_avro_3851_validate_default_value_of_enum_record_field() -> TestResult {
6137 let schema_str = r#"
6138 {
6139 "name": "record1",
6140 "namespace": "ns",
6141 "type": "record",
6142 "fields": [
6143 {
6144 "name": "f1",
6145 "type": {
6146 "name": "enum1",
6147 "type": "enum",
6148 "symbols": ["a", "b", "c"]
6149 },
6150 "default": "invalid"
6151 }
6152 ]
6153 }
6154 "#;
6155 let expected = Details::GetDefaultRecordField(
6156 "f1".to_string(),
6157 "ns.record1".to_string(),
6158 r#"{"name":"ns.enum1","type":"enum","symbols":["a","b","c"]}"#.to_string(),
6159 )
6160 .to_string();
6161 let result = Schema::parse_str(schema_str);
6162 assert!(result.is_err());
6163 let err = result
6164 .map_err(|e| e.to_string())
6165 .err()
6166 .unwrap_or_else(|| "unexpected".to_string());
6167 assert_eq!(expected, err);
6168
6169 Ok(())
6170 }
6171
6172 #[test]
6173 fn test_avro_3851_validate_default_value_of_fixed_record_field() -> TestResult {
6174 let schema_str = r#"
6175 {
6176 "name": "record1",
6177 "namespace": "ns",
6178 "type": "record",
6179 "fields": [
6180 {
6181 "name": "f1",
6182 "type": {
6183 "name": "fixed1",
6184 "type": "fixed",
6185 "size": 3
6186 },
6187 "default": 100
6188 }
6189 ]
6190 }
6191 "#;
6192 let expected = Details::GetDefaultRecordField(
6193 "f1".to_string(),
6194 "ns.record1".to_string(),
6195 r#"{"name":"ns.fixed1","type":"fixed","size":3}"#.to_string(),
6196 )
6197 .to_string();
6198 let result = Schema::parse_str(schema_str);
6199 assert!(result.is_err());
6200 let err = result
6201 .map_err(|e| e.to_string())
6202 .err()
6203 .unwrap_or_else(|| "unexpected".to_string());
6204 assert_eq!(expected, err);
6205
6206 Ok(())
6207 }
6208
6209 #[test]
6210 fn test_avro_3851_validate_default_value_of_array_record_field() -> TestResult {
6211 let schema_str = r#"
6212 {
6213 "name": "record1",
6214 "namespace": "ns",
6215 "type": "record",
6216 "fields": [
6217 {
6218 "name": "f1",
6219 "type": "array",
6220 "items": "int",
6221 "default": "invalid"
6222 }
6223 ]
6224 }
6225 "#;
6226 let expected = Details::GetDefaultRecordField(
6227 "f1".to_string(),
6228 "ns.record1".to_string(),
6229 r#"{"type":"array","items":"int"}"#.to_string(),
6230 )
6231 .to_string();
6232 let result = Schema::parse_str(schema_str);
6233 assert!(result.is_err());
6234 let err = result
6235 .map_err(|e| e.to_string())
6236 .err()
6237 .unwrap_or_else(|| "unexpected".to_string());
6238 assert_eq!(expected, err);
6239
6240 Ok(())
6241 }
6242
6243 #[test]
6244 fn test_avro_3851_validate_default_value_of_map_record_field() -> TestResult {
6245 let schema_str = r#"
6246 {
6247 "name": "record1",
6248 "namespace": "ns",
6249 "type": "record",
6250 "fields": [
6251 {
6252 "name": "f1",
6253 "type": "map",
6254 "values": "string",
6255 "default": "invalid"
6256 }
6257 ]
6258 }
6259 "#;
6260 let expected = Details::GetDefaultRecordField(
6261 "f1".to_string(),
6262 "ns.record1".to_string(),
6263 r#"{"type":"map","values":"string"}"#.to_string(),
6264 )
6265 .to_string();
6266 let result = Schema::parse_str(schema_str);
6267 assert!(result.is_err());
6268 let err = result
6269 .map_err(|e| e.to_string())
6270 .err()
6271 .unwrap_or_else(|| "unexpected".to_string());
6272 assert_eq!(expected, err);
6273
6274 Ok(())
6275 }
6276
6277 #[test]
6278 fn test_avro_3851_validate_default_value_of_ref_record_field() -> TestResult {
6279 let schema_str = r#"
6280 {
6281 "name": "record1",
6282 "namespace": "ns",
6283 "type": "record",
6284 "fields": [
6285 {
6286 "name": "f1",
6287 "type": {
6288 "name": "record2",
6289 "type": "record",
6290 "fields": [
6291 {
6292 "name": "f1_1",
6293 "type": "int"
6294 }
6295 ]
6296 }
6297 }, {
6298 "name": "f2",
6299 "type": "ns.record2",
6300 "default": { "f1_1": true }
6301 }
6302 ]
6303 }
6304 "#;
6305 let expected = Details::GetDefaultRecordField(
6306 "f2".to_string(),
6307 "ns.record1".to_string(),
6308 r#""ns.record2""#.to_string(),
6309 )
6310 .to_string();
6311 let result = Schema::parse_str(schema_str);
6312 assert!(result.is_err());
6313 let err = result
6314 .map_err(|e| e.to_string())
6315 .err()
6316 .unwrap_or_else(|| "unexpected".to_string());
6317 assert_eq!(expected, err);
6318
6319 Ok(())
6320 }
6321
6322 #[test]
6323 fn test_avro_3851_validate_default_value_of_enum() -> TestResult {
6324 let schema_str = r#"
6325 {
6326 "name": "enum1",
6327 "namespace": "ns",
6328 "type": "enum",
6329 "symbols": ["a", "b", "c"],
6330 "default": 100
6331 }
6332 "#;
6333 let expected = Details::EnumDefaultWrongType(100.into()).to_string();
6334 let result = Schema::parse_str(schema_str);
6335 assert!(result.is_err());
6336 let err = result
6337 .map_err(|e| e.to_string())
6338 .err()
6339 .unwrap_or_else(|| "unexpected".to_string());
6340 assert_eq!(expected, err);
6341
6342 let schema_str = r#"
6343 {
6344 "name": "enum1",
6345 "namespace": "ns",
6346 "type": "enum",
6347 "symbols": ["a", "b", "c"],
6348 "default": "d"
6349 }
6350 "#;
6351 let expected = Details::GetEnumDefault {
6352 symbol: "d".to_string(),
6353 symbols: vec!["a".to_string(), "b".to_string(), "c".to_string()],
6354 }
6355 .to_string();
6356 let result = Schema::parse_str(schema_str);
6357 assert!(result.is_err());
6358 let err = result
6359 .map_err(|e| e.to_string())
6360 .err()
6361 .unwrap_or_else(|| "unexpected".to_string());
6362 assert_eq!(expected, err);
6363
6364 Ok(())
6365 }
6366
6367 #[test]
6368 fn test_avro_3862_get_aliases() -> TestResult {
6369 let schema_str = r#"
6371 {
6372 "name": "record1",
6373 "namespace": "ns1",
6374 "type": "record",
6375 "aliases": ["r1", "ns2.r2"],
6376 "fields": [
6377 { "name": "f1", "type": "int" },
6378 { "name": "f2", "type": "string" }
6379 ]
6380 }
6381 "#;
6382 let schema = Schema::parse_str(schema_str)?;
6383 let expected = vec![Alias::new("ns1.r1")?, Alias::new("ns2.r2")?];
6384 match schema.aliases() {
6385 Some(aliases) => assert_eq!(aliases, &expected),
6386 None => panic!("Expected Some({expected:?}), got None"),
6387 }
6388
6389 let schema_str = r#"
6390 {
6391 "name": "record1",
6392 "namespace": "ns1",
6393 "type": "record",
6394 "fields": [
6395 { "name": "f1", "type": "int" },
6396 { "name": "f2", "type": "string" }
6397 ]
6398 }
6399 "#;
6400 let schema = Schema::parse_str(schema_str)?;
6401 match schema.aliases() {
6402 None => (),
6403 some => panic!("Expected None, got {some:?}"),
6404 }
6405
6406 let schema_str = r#"
6408 {
6409 "name": "enum1",
6410 "namespace": "ns1",
6411 "type": "enum",
6412 "aliases": ["en1", "ns2.en2"],
6413 "symbols": ["a", "b", "c"]
6414 }
6415 "#;
6416 let schema = Schema::parse_str(schema_str)?;
6417 let expected = vec![Alias::new("ns1.en1")?, Alias::new("ns2.en2")?];
6418 match schema.aliases() {
6419 Some(aliases) => assert_eq!(aliases, &expected),
6420 None => panic!("Expected Some({expected:?}), got None"),
6421 }
6422
6423 let schema_str = r#"
6424 {
6425 "name": "enum1",
6426 "namespace": "ns1",
6427 "type": "enum",
6428 "symbols": ["a", "b", "c"]
6429 }
6430 "#;
6431 let schema = Schema::parse_str(schema_str)?;
6432 match schema.aliases() {
6433 None => (),
6434 some => panic!("Expected None, got {some:?}"),
6435 }
6436
6437 let schema_str = r#"
6439 {
6440 "name": "fixed1",
6441 "namespace": "ns1",
6442 "type": "fixed",
6443 "aliases": ["fx1", "ns2.fx2"],
6444 "size": 10
6445 }
6446 "#;
6447 let schema = Schema::parse_str(schema_str)?;
6448 let expected = vec![Alias::new("ns1.fx1")?, Alias::new("ns2.fx2")?];
6449 match schema.aliases() {
6450 Some(aliases) => assert_eq!(aliases, &expected),
6451 None => panic!("Expected Some({expected:?}), got None"),
6452 }
6453
6454 let schema_str = r#"
6455 {
6456 "name": "fixed1",
6457 "namespace": "ns1",
6458 "type": "fixed",
6459 "size": 10
6460 }
6461 "#;
6462 let schema = Schema::parse_str(schema_str)?;
6463 match schema.aliases() {
6464 None => (),
6465 some => panic!("Expected None, got {some:?}"),
6466 }
6467
6468 let schema = Schema::Int;
6470 match schema.aliases() {
6471 None => (),
6472 some => panic!("Expected None, got {some:?}"),
6473 }
6474
6475 Ok(())
6476 }
6477
6478 #[test]
6479 fn test_avro_3862_get_doc() -> TestResult {
6480 let schema_str = r#"
6482 {
6483 "name": "record1",
6484 "type": "record",
6485 "doc": "Record Document",
6486 "fields": [
6487 { "name": "f1", "type": "int" },
6488 { "name": "f2", "type": "string" }
6489 ]
6490 }
6491 "#;
6492 let schema = Schema::parse_str(schema_str)?;
6493 let expected = "Record Document";
6494 match schema.doc() {
6495 Some(doc) => assert_eq!(doc, expected),
6496 None => panic!("Expected Some({expected:?}), got None"),
6497 }
6498
6499 let schema_str = r#"
6500 {
6501 "name": "record1",
6502 "type": "record",
6503 "fields": [
6504 { "name": "f1", "type": "int" },
6505 { "name": "f2", "type": "string" }
6506 ]
6507 }
6508 "#;
6509 let schema = Schema::parse_str(schema_str)?;
6510 match schema.doc() {
6511 None => (),
6512 some => panic!("Expected None, got {some:?}"),
6513 }
6514
6515 let schema_str = r#"
6517 {
6518 "name": "enum1",
6519 "type": "enum",
6520 "doc": "Enum Document",
6521 "symbols": ["a", "b", "c"]
6522 }
6523 "#;
6524 let schema = Schema::parse_str(schema_str)?;
6525 let expected = "Enum Document";
6526 match schema.doc() {
6527 Some(doc) => assert_eq!(doc, expected),
6528 None => panic!("Expected Some({expected:?}), got None"),
6529 }
6530
6531 let schema_str = r#"
6532 {
6533 "name": "enum1",
6534 "type": "enum",
6535 "symbols": ["a", "b", "c"]
6536 }
6537 "#;
6538 let schema = Schema::parse_str(schema_str)?;
6539 match schema.doc() {
6540 None => (),
6541 some => panic!("Expected None, got {some:?}"),
6542 }
6543
6544 let schema_str = r#"
6546 {
6547 "name": "fixed1",
6548 "type": "fixed",
6549 "doc": "Fixed Document",
6550 "size": 10
6551 }
6552 "#;
6553 let schema = Schema::parse_str(schema_str)?;
6554 let expected = "Fixed Document";
6555 match schema.doc() {
6556 Some(doc) => assert_eq!(doc, expected),
6557 None => panic!("Expected Some({expected:?}), got None"),
6558 }
6559
6560 let schema_str = r#"
6561 {
6562 "name": "fixed1",
6563 "type": "fixed",
6564 "size": 10
6565 }
6566 "#;
6567 let schema = Schema::parse_str(schema_str)?;
6568 match schema.doc() {
6569 None => (),
6570 some => panic!("Expected None, got {some:?}"),
6571 }
6572
6573 let schema = Schema::Int;
6575 match schema.doc() {
6576 None => (),
6577 some => panic!("Expected None, got {some:?}"),
6578 }
6579
6580 Ok(())
6581 }
6582
6583 #[test]
6584 fn avro_3886_serialize_attributes() -> TestResult {
6585 let attributes = BTreeMap::from([
6586 ("string_key".into(), "value".into()),
6587 ("number_key".into(), 1.23.into()),
6588 ("null_key".into(), Value::Null),
6589 (
6590 "array_key".into(),
6591 Value::Array(vec![1.into(), 2.into(), 3.into()]),
6592 ),
6593 ("object_key".into(), Value::Object(Map::default())),
6594 ]);
6595
6596 let schema = Schema::Enum(EnumSchema {
6598 name: Name::new("a")?,
6599 aliases: None,
6600 doc: None,
6601 symbols: vec![],
6602 default: None,
6603 attributes: attributes.clone(),
6604 });
6605 let serialized = serde_json::to_string(&schema)?;
6606 assert_eq!(
6607 r#"{"type":"enum","name":"a","symbols":[],"array_key":[1,2,3],"null_key":null,"number_key":1.23,"object_key":{},"string_key":"value"}"#,
6608 &serialized
6609 );
6610
6611 let schema = Schema::Fixed(FixedSchema {
6613 name: Name::new("a")?,
6614 aliases: None,
6615 doc: None,
6616 size: 1,
6617 default: None,
6618 attributes: attributes.clone(),
6619 });
6620 let serialized = serde_json::to_string(&schema)?;
6621 assert_eq!(
6622 r#"{"type":"fixed","name":"a","size":1,"array_key":[1,2,3],"null_key":null,"number_key":1.23,"object_key":{},"string_key":"value"}"#,
6623 &serialized
6624 );
6625
6626 let schema = Schema::Record(RecordSchema {
6628 name: Name::new("a")?,
6629 aliases: None,
6630 doc: None,
6631 fields: vec![],
6632 lookup: BTreeMap::new(),
6633 attributes,
6634 });
6635 let serialized = serde_json::to_string(&schema)?;
6636 assert_eq!(
6637 r#"{"type":"record","name":"a","fields":[],"array_key":[1,2,3],"null_key":null,"number_key":1.23,"object_key":{},"string_key":"value"}"#,
6638 &serialized
6639 );
6640
6641 Ok(())
6642 }
6643
6644 #[test]
6647 fn test_avro_3897_funny_valid_names_and_namespaces() -> TestResult {
6648 for funny_name in ["_", "_._", "__._", "_.__", "_._._"] {
6649 let name = Name::new(funny_name);
6650 assert!(name.is_ok());
6651 }
6652 Ok(())
6653 }
6654
6655 #[test]
6656 fn test_avro_3896_decimal_schema() -> TestResult {
6657 let schema = json!(
6659 {
6660 "type": "bytes",
6661 "name": "BytesDecimal",
6662 "logicalType": "decimal",
6663 "size": 38,
6664 "precision": 9,
6665 "scale": 2
6666 });
6667 let parse_result = Schema::parse(&schema)?;
6668 assert!(matches!(
6669 parse_result,
6670 Schema::Decimal(DecimalSchema {
6671 precision: 9,
6672 scale: 2,
6673 ..
6674 })
6675 ));
6676
6677 let schema = json!(
6679 {
6680 "type": "long",
6681 "name": "LongDecimal",
6682 "logicalType": "decimal"
6683 });
6684 let parse_result = Schema::parse(&schema)?;
6685 assert_eq!(parse_result, Schema::Long);
6687
6688 Ok(())
6689 }
6690
6691 #[test]
6692 fn avro_3896_uuid_schema_for_string() -> TestResult {
6693 let schema = json!(
6695 {
6696 "type": "string",
6697 "name": "StringUUID",
6698 "logicalType": "uuid"
6699 });
6700 let parse_result = Schema::parse(&schema)?;
6701 assert_eq!(parse_result, Schema::Uuid);
6702
6703 Ok(())
6704 }
6705
6706 #[test]
6707 fn avro_3926_uuid_schema_for_fixed_with_size_16() -> TestResult {
6708 let schema = json!(
6709 {
6710 "type": "fixed",
6711 "name": "FixedUUID",
6712 "size": 16,
6713 "logicalType": "uuid"
6714 });
6715 let parse_result = Schema::parse(&schema)?;
6716 assert_eq!(parse_result, Schema::Uuid);
6717 assert_not_logged(
6718 r#"Ignoring uuid logical type for a Fixed schema because its size (6) is not 16! Schema: Fixed(FixedSchema { name: Name { name: "FixedUUID", namespace: None }, aliases: None, doc: None, size: 6, attributes: {"logicalType": String("uuid")} })"#,
6719 );
6720
6721 Ok(())
6722 }
6723
6724 #[test]
6725 fn avro_3926_uuid_schema_for_fixed_with_size_different_than_16() -> TestResult {
6726 let schema = json!(
6727 {
6728 "type": "fixed",
6729 "name": "FixedUUID",
6730 "size": 6,
6731 "logicalType": "uuid"
6732 });
6733 let parse_result = Schema::parse(&schema)?;
6734
6735 assert_eq!(
6736 parse_result,
6737 Schema::Fixed(FixedSchema {
6738 name: Name::new("FixedUUID")?,
6739 aliases: None,
6740 doc: None,
6741 size: 6,
6742 default: None,
6743 attributes: BTreeMap::from([("logicalType".to_string(), "uuid".into())]),
6744 })
6745 );
6746 assert_logged(
6747 r#"Ignoring uuid logical type for a Fixed schema because its size (6) is not 16! Schema: Fixed(FixedSchema { name: Name { name: "FixedUUID", namespace: None }, aliases: None, doc: None, size: 6, default: None, attributes: {"logicalType": String("uuid")} })"#,
6748 );
6749
6750 Ok(())
6751 }
6752
6753 #[test]
6754 fn test_avro_3896_timestamp_millis_schema() -> TestResult {
6755 let schema = json!(
6757 {
6758 "type": "long",
6759 "name": "LongTimestampMillis",
6760 "logicalType": "timestamp-millis"
6761 });
6762 let parse_result = Schema::parse(&schema)?;
6763 assert_eq!(parse_result, Schema::TimestampMillis);
6764
6765 let schema = json!(
6767 {
6768 "type": "int",
6769 "name": "IntTimestampMillis",
6770 "logicalType": "timestamp-millis"
6771 });
6772 let parse_result = Schema::parse(&schema)?;
6773 assert_eq!(parse_result, Schema::Int);
6774
6775 Ok(())
6776 }
6777
6778 #[test]
6779 fn test_avro_3896_custom_bytes_schema() -> TestResult {
6780 let schema = json!(
6782 {
6783 "type": "bytes",
6784 "name": "BytesLog",
6785 "logicalType": "custom"
6786 });
6787 let parse_result = Schema::parse(&schema)?;
6788 assert_eq!(parse_result, Schema::Bytes);
6789 assert_eq!(parse_result.custom_attributes(), None);
6790
6791 Ok(())
6792 }
6793
6794 #[test]
6795 fn test_avro_3899_parse_decimal_type() -> TestResult {
6796 let schema = Schema::parse_str(
6797 r#"{
6798 "name": "InvalidDecimal",
6799 "type": "fixed",
6800 "size": 16,
6801 "logicalType": "decimal",
6802 "precision": 2,
6803 "scale": 3
6804 }"#,
6805 )?;
6806 match schema {
6807 Schema::Fixed(fixed_schema) => {
6808 let attrs = fixed_schema.attributes;
6809 let precision = attrs
6810 .get("precision")
6811 .expect("The 'precision' attribute is missing");
6812 let scale = attrs
6813 .get("scale")
6814 .expect("The 'scale' attribute is missing");
6815 assert_logged(&format!(
6816 "Ignoring invalid decimal logical type: The decimal precision ({precision}) must be bigger or equal to the scale ({scale})"
6817 ));
6818 }
6819 _ => unreachable!("Expected Schema::Fixed, got {:?}", schema),
6820 }
6821
6822 let schema = Schema::parse_str(
6823 r#"{
6824 "name": "ValidDecimal",
6825 "type": "bytes",
6826 "logicalType": "decimal",
6827 "precision": 3,
6828 "scale": 2
6829 }"#,
6830 )?;
6831 match schema {
6832 Schema::Decimal(_) => {
6833 assert_not_logged(
6834 "Ignoring invalid decimal logical type: The decimal precision (2) must be bigger or equal to the scale (3)",
6835 );
6836 }
6837 _ => unreachable!("Expected Schema::Decimal, got {:?}", schema),
6838 }
6839
6840 Ok(())
6841 }
6842
6843 #[test]
6844 fn avro_3920_serialize_record_with_custom_attributes() -> TestResult {
6845 let expected = {
6846 let mut lookup = BTreeMap::new();
6847 lookup.insert("value".to_owned(), 0);
6848 Schema::Record(RecordSchema {
6849 name: Name {
6850 name: "LongList".to_owned(),
6851 namespace: None,
6852 },
6853 aliases: Some(vec![Alias::new("LinkedLongs").unwrap()]),
6854 doc: None,
6855 fields: vec![RecordField {
6856 name: "value".to_string(),
6857 doc: None,
6858 default: None,
6859 aliases: None,
6860 schema: Schema::Long,
6861 order: RecordFieldOrder::Ascending,
6862 position: 0,
6863 custom_attributes: BTreeMap::from([("field-id".to_string(), 1.into())]),
6864 }],
6865 lookup,
6866 attributes: BTreeMap::from([("custom-attribute".to_string(), "value".into())]),
6867 })
6868 };
6869
6870 let value = serde_json::to_value(&expected)?;
6871 let serialized = serde_json::to_string(&value)?;
6872 assert_eq!(
6873 r#"{"aliases":["LinkedLongs"],"custom-attribute":"value","fields":[{"field-id":1,"name":"value","type":"long"}],"name":"LongList","type":"record"}"#,
6874 &serialized
6875 );
6876 assert_eq!(expected, Schema::parse_str(&serialized)?);
6877
6878 Ok(())
6879 }
6880
6881 #[test]
6882 fn test_avro_3925_serialize_decimal_inner_fixed() -> TestResult {
6883 let schema = Schema::Decimal(DecimalSchema {
6884 precision: 36,
6885 scale: 10,
6886 inner: Box::new(Schema::Fixed(FixedSchema {
6887 name: Name::new("decimal_36_10").unwrap(),
6888 aliases: None,
6889 doc: None,
6890 size: 16,
6891 default: None,
6892 attributes: Default::default(),
6893 })),
6894 });
6895
6896 let serialized_json = serde_json::to_string_pretty(&schema)?;
6897
6898 let expected_json = r#"{
6899 "type": "fixed",
6900 "name": "decimal_36_10",
6901 "size": 16,
6902 "logicalType": "decimal",
6903 "scale": 10,
6904 "precision": 36
6905}"#;
6906
6907 assert_eq!(serialized_json, expected_json);
6908
6909 Ok(())
6910 }
6911
6912 #[test]
6913 fn test_avro_3925_serialize_decimal_inner_bytes() -> TestResult {
6914 let schema = Schema::Decimal(DecimalSchema {
6915 precision: 36,
6916 scale: 10,
6917 inner: Box::new(Schema::Bytes),
6918 });
6919
6920 let serialized_json = serde_json::to_string_pretty(&schema)?;
6921
6922 let expected_json = r#"{
6923 "type": "bytes",
6924 "logicalType": "decimal",
6925 "scale": 10,
6926 "precision": 36
6927}"#;
6928
6929 assert_eq!(serialized_json, expected_json);
6930
6931 Ok(())
6932 }
6933
6934 #[test]
6935 fn test_avro_3925_serialize_decimal_inner_invalid() -> TestResult {
6936 let schema = Schema::Decimal(DecimalSchema {
6937 precision: 36,
6938 scale: 10,
6939 inner: Box::new(Schema::String),
6940 });
6941
6942 let serialized_json = serde_json::to_string_pretty(&schema);
6943
6944 assert!(serialized_json.is_err());
6945
6946 Ok(())
6947 }
6948
6949 #[test]
6950 fn test_avro_3927_serialize_array_with_custom_attributes() -> TestResult {
6951 let expected = Schema::array_with_attributes(
6952 Schema::Long,
6953 BTreeMap::from([("field-id".to_string(), "1".into())]),
6954 );
6955
6956 let value = serde_json::to_value(&expected)?;
6957 let serialized = serde_json::to_string(&value)?;
6958 assert_eq!(
6959 r#"{"field-id":"1","items":"long","type":"array"}"#,
6960 &serialized
6961 );
6962 let actual_schema = Schema::parse_str(&serialized)?;
6963 assert_eq!(expected, actual_schema);
6964 assert_eq!(
6965 expected.custom_attributes(),
6966 actual_schema.custom_attributes()
6967 );
6968
6969 Ok(())
6970 }
6971
6972 #[test]
6973 fn test_avro_3927_serialize_map_with_custom_attributes() -> TestResult {
6974 let expected = Schema::map_with_attributes(
6975 Schema::Long,
6976 BTreeMap::from([("field-id".to_string(), "1".into())]),
6977 );
6978
6979 let value = serde_json::to_value(&expected)?;
6980 let serialized = serde_json::to_string(&value)?;
6981 assert_eq!(
6982 r#"{"field-id":"1","type":"map","values":"long"}"#,
6983 &serialized
6984 );
6985 let actual_schema = Schema::parse_str(&serialized)?;
6986 assert_eq!(expected, actual_schema);
6987 assert_eq!(
6988 expected.custom_attributes(),
6989 actual_schema.custom_attributes()
6990 );
6991
6992 Ok(())
6993 }
6994
6995 #[test]
6996 fn avro_3928_parse_int_based_schema_with_default() -> TestResult {
6997 let schema = r#"
6998 {
6999 "type": "record",
7000 "name": "DateLogicalType",
7001 "fields": [ {
7002 "name": "birthday",
7003 "type": {"type": "int", "logicalType": "date"},
7004 "default": 1681601653
7005 } ]
7006 }"#;
7007
7008 match Schema::parse_str(schema)? {
7009 Schema::Record(record_schema) => {
7010 assert_eq!(record_schema.fields.len(), 1);
7011 let field = record_schema.fields.first().unwrap();
7012 assert_eq!(field.name, "birthday");
7013 assert_eq!(field.schema, Schema::Date);
7014 assert_eq!(
7015 types::Value::from(field.default.clone().unwrap()),
7016 types::Value::Int(1681601653)
7017 );
7018 }
7019 _ => unreachable!("Expected Schema::Record"),
7020 }
7021
7022 Ok(())
7023 }
7024
7025 #[test]
7026 fn avro_3946_union_with_single_type() -> TestResult {
7027 let schema = r#"
7028 {
7029 "type": "record",
7030 "name": "Issue",
7031 "namespace": "invalid.example",
7032 "fields": [
7033 {
7034 "name": "myField",
7035 "type": ["long"]
7036 }
7037 ]
7038 }"#;
7039
7040 let _ = Schema::parse_str(schema)?;
7041
7042 assert_logged(
7043 "Union schema with just one member! Consider dropping the union! \
7044 Please enable debug logging to find out which Record schema \
7045 declares the union with 'RUST_LOG=apache_avro::schema=debug'.",
7046 );
7047
7048 Ok(())
7049 }
7050
7051 #[test]
7052 fn avro_3946_union_without_any_types() -> TestResult {
7053 let schema = r#"
7054 {
7055 "type": "record",
7056 "name": "Issue",
7057 "namespace": "invalid.example",
7058 "fields": [
7059 {
7060 "name": "myField",
7061 "type": []
7062 }
7063 ]
7064 }"#;
7065
7066 let _ = Schema::parse_str(schema)?;
7067
7068 assert_logged(
7069 "Union schemas should have at least two members! \
7070 Please enable debug logging to find out which Record schema \
7071 declares the union with 'RUST_LOG=apache_avro::schema=debug'.",
7072 );
7073
7074 Ok(())
7075 }
7076
7077 #[test]
7078 fn avro_3965_fixed_schema_with_default_bigger_than_size() -> TestResult {
7079 match Schema::parse_str(
7080 r#"{
7081 "type": "fixed",
7082 "name": "test",
7083 "size": 1,
7084 "default": "123456789"
7085 }"#,
7086 ) {
7087 Ok(_schema) => panic!("Must fail!"),
7088 Err(err) => {
7089 assert_eq!(
7090 err.to_string(),
7091 "Fixed schema's default value length (9) does not match its size (1)"
7092 );
7093 }
7094 }
7095
7096 Ok(())
7097 }
7098
7099 #[test]
7100 fn avro_4004_canonical_form_strip_logical_types() -> TestResult {
7101 let schema_str = r#"
7102 {
7103 "type": "record",
7104 "name": "test",
7105 "fields": [
7106 {"name": "a", "type": "long", "default": 42, "doc": "The field a"},
7107 {"name": "b", "type": "string", "namespace": "test.a"},
7108 {"name": "c", "type": "long", "logicalType": "timestamp-micros"}
7109 ]
7110 }"#;
7111
7112 let schema = Schema::parse_str(schema_str)?;
7113 let canonical_form = schema.canonical_form();
7114 let fp_rabin = schema.fingerprint::<Rabin>();
7115 assert_eq!(
7116 r#"{"name":"test","type":"record","fields":[{"name":"a","type":"long"},{"name":"b","type":"string"},{"name":"c","type":{"type":"long"}}]}"#,
7117 canonical_form
7118 );
7119 assert_eq!("92f2ccef718c6754", fp_rabin.to_string());
7120 Ok(())
7121 }
7122
7123 #[test]
7124 fn avro_4055_should_fail_to_parse_invalid_schema() -> TestResult {
7125 let invalid_schema_str = r#"
7127 {
7128 "type": "record",
7129 "name": "SampleSchema",
7130 "fields": [
7131 {
7132 "name": "order",
7133 "type": "record",
7134 "fields": [
7135 {
7136 "name": "order_number",
7137 "type": ["null", "string"],
7138 "default": null
7139 },
7140 { "name": "order_date", "type": "string" }
7141 ]
7142 }
7143 ]
7144 }"#;
7145
7146 let schema = Schema::parse_str(invalid_schema_str);
7147 assert!(schema.is_err());
7148 assert_eq!(
7149 schema.unwrap_err().to_string(),
7150 "Invalid schema: There is no type called 'record', if you meant to define a non-primitive schema, it should be defined inside `type` attribute. Please review the specification"
7151 );
7152
7153 let valid_schema = r#"
7154 {
7155 "type": "record",
7156 "name": "SampleSchema",
7157 "fields": [
7158 {
7159 "name": "order",
7160 "type": {
7161 "type": "record",
7162 "name": "Order",
7163 "fields": [
7164 {
7165 "name": "order_number",
7166 "type": ["null", "string"],
7167 "default": null
7168 },
7169 { "name": "order_date", "type": "string" }
7170 ]
7171 }
7172 }
7173 ]
7174 }"#;
7175 let schema = Schema::parse_str(valid_schema);
7176 assert!(schema.is_ok());
7177
7178 Ok(())
7179 }
7180
7181 #[test]
7182 fn avro_rs_292_array_items_should_be_ignored_in_custom_attributes() -> TestResult {
7183 let raw_schema = r#"{
7184 "type": "array",
7185 "items": {
7186 "name": "foo",
7187 "type": "record",
7188 "fields": [
7189 {
7190 "name": "bar",
7191 "type": "array",
7192 "items": {
7193 "type": "record",
7194 "name": "baz",
7195 "fields": [
7196 {
7197 "name": "quux",
7198 "type": "int"
7199 }
7200 ]
7201 }
7202 }
7203 ]
7204 }
7205 }"#;
7206
7207 let schema1 = Schema::parse_str(raw_schema)?;
7208 match &schema1 {
7209 Schema::Array(ArraySchema { items, attributes }) => {
7210 assert!(attributes.is_empty());
7211
7212 match **items {
7213 Schema::Record(RecordSchema {
7214 ref name,
7215 aliases: _,
7216 doc: _,
7217 ref fields,
7218 lookup: _,
7219 ref attributes,
7220 }) => {
7221 assert_eq!(name.to_string(), "foo");
7222 assert_eq!(fields.len(), 1);
7223 assert!(attributes.is_empty());
7224
7225 match &fields[0].schema {
7226 Schema::Array(ArraySchema {
7227 items: _,
7228 attributes,
7229 }) => {
7230 assert!(attributes.is_empty());
7231 }
7232 _ => panic!("Expected ArraySchema. got: {}", &fields[0].schema),
7233 }
7234 }
7235 _ => panic!("Expected RecordSchema. got: {}", &items),
7236 }
7237 }
7238 _ => panic!("Expected ArraySchema. got: {}", &schema1),
7239 }
7240 let canonical_form1 = schema1.canonical_form();
7241 let schema2 = Schema::parse_str(&canonical_form1)?;
7242 let canonical_form2 = schema2.canonical_form();
7243
7244 assert_eq!(canonical_form1, canonical_form2);
7245
7246 Ok(())
7247 }
7248
7249 #[test]
7250 fn avro_rs_292_map_values_should_be_ignored_in_custom_attributes() -> TestResult {
7251 let raw_schema = r#"{
7252 "type": "array",
7253 "items": {
7254 "name": "foo",
7255 "type": "record",
7256 "fields": [
7257 {
7258 "name": "bar",
7259 "type": "map",
7260 "values": {
7261 "type": "record",
7262 "name": "baz",
7263 "fields": [
7264 {
7265 "name": "quux",
7266 "type": "int"
7267 }
7268 ]
7269 }
7270 }
7271 ]
7272 }
7273 }"#;
7274
7275 let schema1 = Schema::parse_str(raw_schema)?;
7276 match &schema1 {
7277 Schema::Array(ArraySchema { items, attributes }) => {
7278 assert!(attributes.is_empty());
7279
7280 match **items {
7281 Schema::Record(RecordSchema {
7282 ref name,
7283 aliases: _,
7284 doc: _,
7285 ref fields,
7286 lookup: _,
7287 ref attributes,
7288 }) => {
7289 assert_eq!(name.to_string(), "foo");
7290 assert_eq!(fields.len(), 1);
7291 assert!(attributes.is_empty());
7292
7293 match &fields[0].schema {
7294 Schema::Map(MapSchema {
7295 types: _,
7296 attributes,
7297 }) => {
7298 assert!(attributes.is_empty());
7299 }
7300 _ => panic!("Expected MapSchema. got: {}", &fields[0].schema),
7301 }
7302 }
7303 _ => panic!("Expected RecordSchema. got: {}", &items),
7304 }
7305 }
7306 _ => panic!("Expected ArraySchema. got: {}", &schema1),
7307 }
7308 let canonical_form1 = schema1.canonical_form();
7309 println!("Canonical Form 1: {}", &canonical_form1);
7310 let schema2 = Schema::parse_str(&canonical_form1)?;
7311 let canonical_form2 = schema2.canonical_form();
7312
7313 assert_eq!(canonical_form1, canonical_form2);
7314
7315 Ok(())
7316 }
7317}