1use crate::{
20 AvroResult,
21 error::{Details, Error},
22 schema_equality, types,
23 util::MapHelper,
24 validator::{
25 validate_enum_symbol_name, validate_namespace, validate_record_field_name,
26 validate_schema_name,
27 },
28};
29use digest::Digest;
30use log::{debug, error, warn};
31use serde::{
32 Deserialize, Serialize, Serializer,
33 ser::{SerializeMap, SerializeSeq},
34};
35use serde_json::{Map, Value};
36use std::{
37 borrow::Borrow,
38 collections::{BTreeMap, HashMap, HashSet},
39 fmt,
40 fmt::Debug,
41 hash::Hash,
42 io::Read,
43 str::FromStr,
44};
45use strum_macros::{Display, EnumDiscriminants, EnumString};
46
47pub struct SchemaFingerprint {
51 pub bytes: Vec<u8>,
52}
53
54impl fmt::Display for SchemaFingerprint {
55 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
56 write!(
57 f,
58 "{}",
59 self.bytes
60 .iter()
61 .map(|byte| format!("{byte:02x}"))
62 .collect::<Vec<String>>()
63 .join("")
64 )
65 }
66}
67
68#[derive(Clone, Debug, EnumDiscriminants, Display)]
72#[strum_discriminants(name(SchemaKind), derive(Hash, Ord, PartialOrd))]
73pub enum Schema {
74 Null,
76 Boolean,
78 Int,
80 Long,
82 Float,
84 Double,
86 Bytes,
89 String,
92 Array(ArraySchema),
95 Map(MapSchema),
99 Union(UnionSchema),
101 Record(RecordSchema),
103 Enum(EnumSchema),
105 Fixed(FixedSchema),
107 Decimal(DecimalSchema),
110 BigDecimal,
113 Uuid,
115 Date,
118 TimeMillis,
121 TimeMicros,
124 TimestampMillis,
126 TimestampMicros,
128 TimestampNanos,
130 LocalTimestampMillis,
132 LocalTimestampMicros,
134 LocalTimestampNanos,
136 Duration,
138 Ref { name: Name },
140}
141
142#[derive(Clone, Debug, PartialEq)]
143pub struct MapSchema {
144 pub types: Box<Schema>,
145 pub attributes: BTreeMap<String, Value>,
146}
147
148#[derive(Clone, Debug, PartialEq)]
149pub struct ArraySchema {
150 pub items: Box<Schema>,
151 pub attributes: BTreeMap<String, Value>,
152}
153
154impl PartialEq for Schema {
155 fn eq(&self, other: &Self) -> bool {
160 schema_equality::compare_schemata(self, other)
161 }
162}
163
164impl SchemaKind {
165 pub fn is_primitive(self) -> bool {
166 matches!(
167 self,
168 SchemaKind::Null
169 | SchemaKind::Boolean
170 | SchemaKind::Int
171 | SchemaKind::Long
172 | SchemaKind::Double
173 | SchemaKind::Float
174 | SchemaKind::Bytes
175 | SchemaKind::String,
176 )
177 }
178
179 pub fn is_named(self) -> bool {
180 matches!(
181 self,
182 SchemaKind::Record | SchemaKind::Enum | SchemaKind::Fixed | SchemaKind::Ref
183 )
184 }
185}
186
187impl From<&types::Value> for SchemaKind {
188 fn from(value: &types::Value) -> Self {
189 use crate::types::Value;
190 match value {
191 Value::Null => Self::Null,
192 Value::Boolean(_) => Self::Boolean,
193 Value::Int(_) => Self::Int,
194 Value::Long(_) => Self::Long,
195 Value::Float(_) => Self::Float,
196 Value::Double(_) => Self::Double,
197 Value::Bytes(_) => Self::Bytes,
198 Value::String(_) => Self::String,
199 Value::Array(_) => Self::Array,
200 Value::Map(_) => Self::Map,
201 Value::Union(_, _) => Self::Union,
202 Value::Record(_) => Self::Record,
203 Value::Enum(_, _) => Self::Enum,
204 Value::Fixed(_, _) => Self::Fixed,
205 Value::Decimal { .. } => Self::Decimal,
206 Value::BigDecimal(_) => Self::BigDecimal,
207 Value::Uuid(_) => Self::Uuid,
208 Value::Date(_) => Self::Date,
209 Value::TimeMillis(_) => Self::TimeMillis,
210 Value::TimeMicros(_) => Self::TimeMicros,
211 Value::TimestampMillis(_) => Self::TimestampMillis,
212 Value::TimestampMicros(_) => Self::TimestampMicros,
213 Value::TimestampNanos(_) => Self::TimestampNanos,
214 Value::LocalTimestampMillis(_) => Self::LocalTimestampMillis,
215 Value::LocalTimestampMicros(_) => Self::LocalTimestampMicros,
216 Value::LocalTimestampNanos(_) => Self::LocalTimestampNanos,
217 Value::Duration { .. } => Self::Duration,
218 }
219 }
220}
221
222#[derive(Clone, Debug, Hash, PartialEq, Eq)]
233pub struct Name {
234 pub name: String,
235 pub namespace: Namespace,
236}
237
238pub type Documentation = Option<String>;
240pub type Aliases = Option<Vec<Alias>>;
242pub(crate) type Names = HashMap<Name, Schema>;
244pub type NamesRef<'a> = HashMap<Name, &'a Schema>;
246pub type Namespace = Option<String>;
248
249impl Name {
250 pub fn new(name: &str) -> AvroResult<Self> {
254 let (name, namespace) = Name::get_name_and_namespace(name)?;
255 Ok(Self {
256 name,
257 namespace: namespace.filter(|ns| !ns.is_empty()),
258 })
259 }
260
261 fn get_name_and_namespace(name: &str) -> AvroResult<(String, Namespace)> {
262 validate_schema_name(name)
263 }
264
265 pub(crate) fn parse(
267 complex: &Map<String, Value>,
268 enclosing_namespace: &Namespace,
269 ) -> AvroResult<Self> {
270 let (name, namespace_from_name) = complex
271 .name()
272 .map(|name| Name::get_name_and_namespace(name.as_str()).unwrap())
273 .ok_or(Details::GetNameField)?;
274 let type_name = match complex.get("type") {
276 Some(Value::Object(complex_type)) => complex_type.name().or(None),
277 _ => None,
278 };
279
280 let namespace = namespace_from_name
281 .or_else(|| {
282 complex
283 .string("namespace")
284 .or_else(|| enclosing_namespace.clone())
285 })
286 .filter(|ns| !ns.is_empty());
287
288 if let Some(ref ns) = namespace {
289 validate_namespace(ns)?;
290 }
291
292 Ok(Self {
293 name: type_name.unwrap_or(name),
294 namespace,
295 })
296 }
297
298 pub fn fullname(&self, default_namespace: Namespace) -> String {
303 if self.name.contains('.') {
304 self.name.clone()
305 } else {
306 let namespace = self.namespace.clone().or(default_namespace);
307
308 match namespace {
309 Some(ref namespace) if !namespace.is_empty() => {
310 format!("{}.{}", namespace, self.name)
311 }
312 _ => self.name.clone(),
313 }
314 }
315 }
316
317 pub fn fully_qualified_name(&self, enclosing_namespace: &Namespace) -> Name {
331 Name {
332 name: self.name.clone(),
333 namespace: self
334 .namespace
335 .clone()
336 .or_else(|| enclosing_namespace.clone().filter(|ns| !ns.is_empty())),
337 }
338 }
339}
340
341impl From<&str> for Name {
342 fn from(name: &str) -> Self {
343 Name::new(name).unwrap()
344 }
345}
346
347impl fmt::Display for Name {
348 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
349 f.write_str(&self.fullname(None)[..])
350 }
351}
352
353impl<'de> Deserialize<'de> for Name {
354 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
355 where
356 D: serde::de::Deserializer<'de>,
357 {
358 Value::deserialize(deserializer).and_then(|value| {
359 use serde::de::Error;
360 if let Value::Object(json) = value {
361 Name::parse(&json, &None).map_err(Error::custom)
362 } else {
363 Err(Error::custom(format!("Expected a JSON object: {value:?}")))
364 }
365 })
366 }
367}
368
369#[derive(Clone, Debug, Hash, PartialEq, Eq)]
372pub struct Alias(Name);
373
374impl Alias {
375 pub fn new(name: &str) -> AvroResult<Self> {
376 Name::new(name).map(Self)
377 }
378
379 pub fn name(&self) -> String {
380 self.0.name.clone()
381 }
382
383 pub fn namespace(&self) -> Namespace {
384 self.0.namespace.clone()
385 }
386
387 pub fn fullname(&self, default_namespace: Namespace) -> String {
388 self.0.fullname(default_namespace)
389 }
390
391 pub fn fully_qualified_name(&self, default_namespace: &Namespace) -> Name {
392 self.0.fully_qualified_name(default_namespace)
393 }
394}
395
396impl From<&str> for Alias {
397 fn from(name: &str) -> Self {
398 Alias::new(name).unwrap()
399 }
400}
401
402impl Serialize for Alias {
403 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
404 where
405 S: Serializer,
406 {
407 serializer.serialize_str(&self.fullname(None))
408 }
409}
410
411#[derive(Debug)]
412pub struct ResolvedSchema<'s> {
413 names_ref: NamesRef<'s>,
414 schemata: Vec<&'s Schema>,
415}
416
417impl<'s> TryFrom<&'s Schema> for ResolvedSchema<'s> {
418 type Error = Error;
419
420 fn try_from(schema: &'s Schema) -> AvroResult<Self> {
421 let names = HashMap::new();
422 let mut rs = ResolvedSchema {
423 names_ref: names,
424 schemata: vec![schema],
425 };
426 rs.resolve(rs.get_schemata(), &None, None)?;
427 Ok(rs)
428 }
429}
430
431impl<'s> TryFrom<Vec<&'s Schema>> for ResolvedSchema<'s> {
432 type Error = Error;
433
434 fn try_from(schemata: Vec<&'s Schema>) -> AvroResult<Self> {
435 let names = HashMap::new();
436 let mut rs = ResolvedSchema {
437 names_ref: names,
438 schemata,
439 };
440 rs.resolve(rs.get_schemata(), &None, None)?;
441 Ok(rs)
442 }
443}
444
445impl<'s> ResolvedSchema<'s> {
446 pub fn get_schemata(&self) -> Vec<&'s Schema> {
447 self.schemata.clone()
448 }
449
450 pub fn get_names(&self) -> &NamesRef<'s> {
451 &self.names_ref
452 }
453
454 pub fn new_with_known_schemata<'n>(
458 schemata_to_resolve: Vec<&'s Schema>,
459 enclosing_namespace: &Namespace,
460 known_schemata: &'n NamesRef<'n>,
461 ) -> AvroResult<Self> {
462 let names = HashMap::new();
463 let mut rs = ResolvedSchema {
464 names_ref: names,
465 schemata: schemata_to_resolve,
466 };
467 rs.resolve(rs.get_schemata(), enclosing_namespace, Some(known_schemata))?;
468 Ok(rs)
469 }
470
471 fn resolve<'n>(
472 &mut self,
473 schemata: Vec<&'s Schema>,
474 enclosing_namespace: &Namespace,
475 known_schemata: Option<&'n NamesRef<'n>>,
476 ) -> AvroResult<()> {
477 for schema in schemata {
478 match schema {
479 Schema::Array(schema) => {
480 self.resolve(vec![&schema.items], enclosing_namespace, known_schemata)?
481 }
482 Schema::Map(schema) => {
483 self.resolve(vec![&schema.types], enclosing_namespace, known_schemata)?
484 }
485 Schema::Union(UnionSchema { schemas, .. }) => {
486 for schema in schemas {
487 self.resolve(vec![schema], enclosing_namespace, known_schemata)?
488 }
489 }
490 Schema::Enum(EnumSchema { name, .. }) | Schema::Fixed(FixedSchema { name, .. }) => {
491 let fully_qualified_name = name.fully_qualified_name(enclosing_namespace);
492 if self
493 .names_ref
494 .insert(fully_qualified_name.clone(), schema)
495 .is_some()
496 {
497 return Err(Details::AmbiguousSchemaDefinition(fully_qualified_name).into());
498 }
499 }
500 Schema::Record(RecordSchema { name, fields, .. }) => {
501 let fully_qualified_name = name.fully_qualified_name(enclosing_namespace);
502 if self
503 .names_ref
504 .insert(fully_qualified_name.clone(), schema)
505 .is_some()
506 {
507 return Err(Details::AmbiguousSchemaDefinition(fully_qualified_name).into());
508 } else {
509 let record_namespace = fully_qualified_name.namespace;
510 for field in fields {
511 self.resolve(vec![&field.schema], &record_namespace, known_schemata)?
512 }
513 }
514 }
515 Schema::Ref { name } => {
516 let fully_qualified_name = name.fully_qualified_name(enclosing_namespace);
517 if !self.names_ref.contains_key(&fully_qualified_name) {
519 let is_resolved_with_known_schemas = known_schemata
520 .as_ref()
521 .map(|names| names.contains_key(&fully_qualified_name))
522 .unwrap_or(false);
523 if !is_resolved_with_known_schemas {
524 return Err(Details::SchemaResolutionError(fully_qualified_name).into());
525 }
526 }
527 }
528 _ => (),
529 }
530 }
531 Ok(())
532 }
533}
534
535pub(crate) struct ResolvedOwnedSchema {
536 names: Names,
537 root_schema: Schema,
538}
539
540impl TryFrom<Schema> for ResolvedOwnedSchema {
541 type Error = Error;
542
543 fn try_from(schema: Schema) -> AvroResult<Self> {
544 let names = HashMap::new();
545 let mut rs = ResolvedOwnedSchema {
546 names,
547 root_schema: schema,
548 };
549 resolve_names(&rs.root_schema, &mut rs.names, &None)?;
550 Ok(rs)
551 }
552}
553
554impl ResolvedOwnedSchema {
555 pub(crate) fn get_root_schema(&self) -> &Schema {
556 &self.root_schema
557 }
558 pub(crate) fn get_names(&self) -> &Names {
559 &self.names
560 }
561}
562
563pub(crate) fn resolve_names(
564 schema: &Schema,
565 names: &mut Names,
566 enclosing_namespace: &Namespace,
567) -> AvroResult<()> {
568 match schema {
569 Schema::Array(schema) => resolve_names(&schema.items, names, enclosing_namespace),
570 Schema::Map(schema) => resolve_names(&schema.types, names, enclosing_namespace),
571 Schema::Union(UnionSchema { schemas, .. }) => {
572 for schema in schemas {
573 resolve_names(schema, names, enclosing_namespace)?
574 }
575 Ok(())
576 }
577 Schema::Enum(EnumSchema { name, .. }) | Schema::Fixed(FixedSchema { name, .. }) => {
578 let fully_qualified_name = name.fully_qualified_name(enclosing_namespace);
579 if names
580 .insert(fully_qualified_name.clone(), schema.clone())
581 .is_some()
582 {
583 Err(Details::AmbiguousSchemaDefinition(fully_qualified_name).into())
584 } else {
585 Ok(())
586 }
587 }
588 Schema::Record(RecordSchema { name, fields, .. }) => {
589 let fully_qualified_name = name.fully_qualified_name(enclosing_namespace);
590 if names
591 .insert(fully_qualified_name.clone(), schema.clone())
592 .is_some()
593 {
594 Err(Details::AmbiguousSchemaDefinition(fully_qualified_name).into())
595 } else {
596 let record_namespace = fully_qualified_name.namespace;
597 for field in fields {
598 resolve_names(&field.schema, names, &record_namespace)?
599 }
600 Ok(())
601 }
602 }
603 Schema::Ref { name } => {
604 let fully_qualified_name = name.fully_qualified_name(enclosing_namespace);
605 names
606 .get(&fully_qualified_name)
607 .map(|_| ())
608 .ok_or_else(|| Details::SchemaResolutionError(fully_qualified_name).into())
609 }
610 _ => Ok(()),
611 }
612}
613
614pub(crate) fn resolve_names_with_schemata(
615 schemata: &Vec<&Schema>,
616 names: &mut Names,
617 enclosing_namespace: &Namespace,
618) -> AvroResult<()> {
619 for schema in schemata {
620 resolve_names(schema, names, enclosing_namespace)?;
621 }
622 Ok(())
623}
624
625#[derive(bon::Builder, Clone, Debug, PartialEq)]
627pub struct RecordField {
628 pub name: String,
630 #[builder(default)]
632 pub doc: Documentation,
633 pub aliases: Option<Vec<String>>,
635 pub default: Option<Value>,
639 pub schema: Schema,
641 #[builder(default = RecordFieldOrder::Ignore)]
645 pub order: RecordFieldOrder,
646 #[builder(default)]
648 pub position: usize,
649 #[builder(default = BTreeMap::new())]
651 pub custom_attributes: BTreeMap<String, Value>,
652}
653
654#[derive(Clone, Debug, Eq, PartialEq, EnumString)]
656#[strum(serialize_all = "kebab_case")]
657pub enum RecordFieldOrder {
658 Ascending,
659 Descending,
660 Ignore,
661}
662
663impl RecordField {
664 fn parse(
666 field: &Map<String, Value>,
667 position: usize,
668 parser: &mut Parser,
669 enclosing_record: &Name,
670 ) -> AvroResult<Self> {
671 let name = field.name().ok_or(Details::GetNameFieldFromRecord)?;
672
673 validate_record_field_name(&name)?;
674
675 let schema = parser.parse_complex(
677 field,
678 &enclosing_record.namespace,
679 RecordSchemaParseLocation::FromField,
680 )?;
681
682 let default = field.get("default").cloned();
683 Self::resolve_default_value(
684 &schema,
685 &name,
686 &enclosing_record.fullname(None),
687 &parser.parsed_schemas,
688 &default,
689 )?;
690
691 let aliases = field.get("aliases").and_then(|aliases| {
692 aliases.as_array().map(|aliases| {
693 aliases
694 .iter()
695 .flat_map(|alias| alias.as_str())
696 .map(|alias| alias.to_string())
697 .collect::<Vec<String>>()
698 })
699 });
700
701 let order = field
702 .get("order")
703 .and_then(|order| order.as_str())
704 .and_then(|order| RecordFieldOrder::from_str(order).ok())
705 .unwrap_or(RecordFieldOrder::Ascending);
706
707 Ok(RecordField {
708 name,
709 doc: field.doc(),
710 default,
711 aliases,
712 order,
713 position,
714 custom_attributes: RecordField::get_field_custom_attributes(field, &schema),
715 schema,
716 })
717 }
718
719 fn resolve_default_value(
720 field_schema: &Schema,
721 field_name: &str,
722 record_name: &str,
723 names: &Names,
724 default: &Option<Value>,
725 ) -> AvroResult<()> {
726 if let Some(value) = default {
727 let avro_value = types::Value::from(value.clone());
728 match field_schema {
729 Schema::Union(union_schema) => {
730 let schemas = &union_schema.schemas;
731 let resolved = schemas.iter().any(|schema| {
732 avro_value
733 .to_owned()
734 .resolve_internal(schema, names, &schema.namespace(), &None)
735 .is_ok()
736 });
737
738 if !resolved {
739 let schema: Option<&Schema> = schemas.first();
740 return match schema {
741 Some(first_schema) => Err(Details::GetDefaultUnion(
742 SchemaKind::from(first_schema),
743 types::ValueKind::from(avro_value),
744 )
745 .into()),
746 None => Err(Details::EmptyUnion.into()),
747 };
748 }
749 }
750 _ => {
751 let resolved = avro_value
752 .resolve_internal(field_schema, names, &field_schema.namespace(), &None)
753 .is_ok();
754
755 if !resolved {
756 return Err(Details::GetDefaultRecordField(
757 field_name.to_string(),
758 record_name.to_string(),
759 field_schema.canonical_form(),
760 )
761 .into());
762 }
763 }
764 };
765 }
766
767 Ok(())
768 }
769
770 fn get_field_custom_attributes(
771 field: &Map<String, Value>,
772 schema: &Schema,
773 ) -> BTreeMap<String, Value> {
774 let mut custom_attributes: BTreeMap<String, Value> = BTreeMap::new();
775 for (key, value) in field {
776 match key.as_str() {
777 "type" | "name" | "doc" | "default" | "order" | "position" | "aliases"
778 | "logicalType" => continue,
779 key if key == "symbols" && matches!(schema, Schema::Enum(_)) => continue,
780 key if key == "size" && matches!(schema, Schema::Fixed(_)) => continue,
781 _ => custom_attributes.insert(key.clone(), value.clone()),
782 };
783 }
784 custom_attributes
785 }
786
787 pub fn is_nullable(&self) -> bool {
789 match self.schema {
790 Schema::Union(ref inner) => inner.is_nullable(),
791 _ => false,
792 }
793 }
794}
795
796#[derive(bon::Builder, Debug, Clone)]
798pub struct RecordSchema {
799 pub name: Name,
801 #[builder(default)]
803 pub aliases: Aliases,
804 #[builder(default)]
806 pub doc: Documentation,
807 pub fields: Vec<RecordField>,
809 pub lookup: BTreeMap<String, usize>,
812 #[builder(default = BTreeMap::new())]
814 pub attributes: BTreeMap<String, Value>,
815}
816
817#[derive(bon::Builder, Debug, Clone)]
819pub struct EnumSchema {
820 pub name: Name,
822 #[builder(default)]
824 pub aliases: Aliases,
825 #[builder(default)]
827 pub doc: Documentation,
828 pub symbols: Vec<String>,
830 pub default: Option<String>,
832 #[builder(default = BTreeMap::new())]
834 pub attributes: BTreeMap<String, Value>,
835}
836
837#[derive(bon::Builder, Debug, Clone)]
839pub struct FixedSchema {
840 pub name: Name,
842 #[builder(default)]
844 pub aliases: Aliases,
845 #[builder(default)]
847 pub doc: Documentation,
848 pub size: usize,
850 pub default: Option<String>,
852 #[builder(default = BTreeMap::new())]
854 pub attributes: BTreeMap<String, Value>,
855}
856
857impl FixedSchema {
858 fn serialize_to_map<S>(&self, mut map: S::SerializeMap) -> Result<S::SerializeMap, S::Error>
859 where
860 S: Serializer,
861 {
862 map.serialize_entry("type", "fixed")?;
863 if let Some(ref n) = self.name.namespace {
864 map.serialize_entry("namespace", n)?;
865 }
866 map.serialize_entry("name", &self.name.name)?;
867 if let Some(ref docstr) = self.doc {
868 map.serialize_entry("doc", docstr)?;
869 }
870 map.serialize_entry("size", &self.size)?;
871
872 if let Some(ref aliases) = self.aliases {
873 map.serialize_entry("aliases", aliases)?;
874 }
875
876 for attr in &self.attributes {
877 map.serialize_entry(attr.0, attr.1)?;
878 }
879
880 Ok(map)
881 }
882}
883
884#[derive(Debug, Clone)]
889pub struct DecimalSchema {
890 pub precision: DecimalMetadata,
892 pub scale: DecimalMetadata,
894 pub inner: Box<Schema>,
896}
897
898#[derive(Debug, Clone)]
900pub struct UnionSchema {
901 pub(crate) schemas: Vec<Schema>,
903 variant_index: BTreeMap<SchemaKind, usize>,
908}
909
910impl UnionSchema {
911 pub fn new(schemas: Vec<Schema>) -> AvroResult<Self> {
913 let mut vindex = BTreeMap::new();
914 for (i, schema) in schemas.iter().enumerate() {
915 if let Schema::Union(_) = schema {
916 return Err(Details::GetNestedUnion.into());
917 }
918 let kind = SchemaKind::from(schema);
919 if !kind.is_named() && vindex.insert(kind, i).is_some() {
920 return Err(Details::GetUnionDuplicate.into());
921 }
922 }
923 Ok(UnionSchema {
924 schemas,
925 variant_index: vindex,
926 })
927 }
928
929 pub fn variants(&self) -> &[Schema] {
931 &self.schemas
932 }
933
934 pub fn is_nullable(&self) -> bool {
936 self.schemas.iter().any(|x| matches!(x, Schema::Null))
937 }
938
939 pub fn find_schema_with_known_schemata<S: Borrow<Schema> + Debug>(
945 &self,
946 value: &types::Value,
947 known_schemata: Option<&HashMap<Name, S>>,
948 enclosing_namespace: &Namespace,
949 ) -> Option<(usize, &Schema)> {
950 let schema_kind = SchemaKind::from(value);
951 if let Some(&i) = self.variant_index.get(&schema_kind) {
952 Some((i, &self.schemas[i]))
954 } else {
955 let mut collected_names: HashMap<Name, &Schema> = known_schemata
959 .map(|names| {
960 names
961 .iter()
962 .map(|(name, schema)| (name.clone(), schema.borrow()))
963 .collect()
964 })
965 .unwrap_or_default();
966
967 self.schemas.iter().enumerate().find(|(_, schema)| {
968 let resolved_schema = ResolvedSchema::new_with_known_schemata(
969 vec![*schema],
970 enclosing_namespace,
971 &collected_names,
972 )
973 .expect("Schema didn't successfully parse");
974 let resolved_names = resolved_schema.names_ref;
975
976 collected_names.extend(resolved_names);
978 let namespace = &schema.namespace().or_else(|| enclosing_namespace.clone());
979
980 value
981 .clone()
982 .resolve_internal(schema, &collected_names, namespace, &None)
983 .is_ok()
984 })
985 }
986 }
987}
988
989impl PartialEq for UnionSchema {
991 fn eq(&self, other: &UnionSchema) -> bool {
992 self.schemas.eq(&other.schemas)
993 }
994}
995
996type DecimalMetadata = usize;
997pub(crate) type Precision = DecimalMetadata;
998pub(crate) type Scale = DecimalMetadata;
999
1000fn parse_json_integer_for_decimal(value: &serde_json::Number) -> Result<DecimalMetadata, Error> {
1001 Ok(if value.is_u64() {
1002 let num = value
1003 .as_u64()
1004 .ok_or_else(|| Details::GetU64FromJson(value.clone()))?;
1005 num.try_into()
1006 .map_err(|e| Details::ConvertU64ToUsize(e, num))?
1007 } else if value.is_i64() {
1008 let num = value
1009 .as_i64()
1010 .ok_or_else(|| Details::GetI64FromJson(value.clone()))?;
1011 num.try_into()
1012 .map_err(|e| Details::ConvertI64ToUsize(e, num))?
1013 } else {
1014 return Err(Details::GetPrecisionOrScaleFromJson(value.clone()).into());
1015 })
1016}
1017
1018#[derive(Debug, Default)]
1019enum RecordSchemaParseLocation {
1020 #[default]
1022 Root,
1023
1024 FromField,
1026}
1027
1028#[derive(Default)]
1029struct Parser {
1030 input_schemas: HashMap<Name, Value>,
1031 resolving_schemas: Names,
1035 input_order: Vec<Name>,
1036 parsed_schemas: Names,
1039}
1040
1041impl Schema {
1042 pub fn canonical_form(&self) -> String {
1047 let json = serde_json::to_value(self)
1048 .unwrap_or_else(|e| panic!("Cannot parse Schema from JSON: {e}"));
1049 let mut defined_names = HashSet::new();
1050 parsing_canonical_form(&json, &mut defined_names)
1051 }
1052
1053 pub fn independent_canonical_form(&self, schemata: &[Schema]) -> Result<String, Error> {
1059 let mut this = self.clone();
1060 this.denormalize(schemata)?;
1061 Ok(this.canonical_form())
1062 }
1063
1064 pub fn fingerprint<D: Digest>(&self) -> SchemaFingerprint {
1071 let mut d = D::new();
1072 d.update(self.canonical_form());
1073 SchemaFingerprint {
1074 bytes: d.finalize().to_vec(),
1075 }
1076 }
1077
1078 pub fn parse_str(input: &str) -> Result<Schema, Error> {
1080 let mut parser = Parser::default();
1081 parser.parse_str(input)
1082 }
1083
1084 pub fn parse_list(input: impl IntoIterator<Item = impl AsRef<str>>) -> AvroResult<Vec<Schema>> {
1092 let input = input.into_iter();
1093 let input_len = input.size_hint().0;
1094 let mut input_schemas: HashMap<Name, Value> = HashMap::with_capacity(input_len);
1095 let mut input_order: Vec<Name> = Vec::with_capacity(input_len);
1096 for json in input {
1097 let json = json.as_ref();
1098 let schema: Value = serde_json::from_str(json).map_err(Details::ParseSchemaJson)?;
1099 if let Value::Object(inner) = &schema {
1100 let name = Name::parse(inner, &None)?;
1101 let previous_value = input_schemas.insert(name.clone(), schema);
1102 if previous_value.is_some() {
1103 return Err(Details::NameCollision(name.fullname(None)).into());
1104 }
1105 input_order.push(name);
1106 } else {
1107 return Err(Details::GetNameField.into());
1108 }
1109 }
1110 let mut parser = Parser {
1111 input_schemas,
1112 resolving_schemas: HashMap::default(),
1113 input_order,
1114 parsed_schemas: HashMap::with_capacity(input_len),
1115 };
1116 parser.parse_list()
1117 }
1118
1119 pub fn parse_str_with_list(
1132 schema: &str,
1133 schemata: impl IntoIterator<Item = impl AsRef<str>>,
1134 ) -> AvroResult<(Schema, Vec<Schema>)> {
1135 let schemata = schemata.into_iter();
1136 let schemata_len = schemata.size_hint().0;
1137 let mut input_schemas: HashMap<Name, Value> = HashMap::with_capacity(schemata_len);
1138 let mut input_order: Vec<Name> = Vec::with_capacity(schemata_len);
1139 for json in schemata {
1140 let json = json.as_ref();
1141 let schema: Value = serde_json::from_str(json).map_err(Details::ParseSchemaJson)?;
1142 if let Value::Object(inner) = &schema {
1143 let name = Name::parse(inner, &None)?;
1144 if let Some(_previous) = input_schemas.insert(name.clone(), schema) {
1145 return Err(Details::NameCollision(name.fullname(None)).into());
1146 }
1147 input_order.push(name);
1148 } else {
1149 return Err(Details::GetNameField.into());
1150 }
1151 }
1152 let mut parser = Parser {
1153 input_schemas,
1154 resolving_schemas: HashMap::default(),
1155 input_order,
1156 parsed_schemas: HashMap::with_capacity(schemata_len),
1157 };
1158 parser.parse_input_schemas()?;
1159
1160 let value = serde_json::from_str(schema).map_err(Details::ParseSchemaJson)?;
1161 let schema = parser.parse(&value, &None)?;
1162 let schemata = parser.parse_list()?;
1163 Ok((schema, schemata))
1164 }
1165
1166 pub fn parse_reader(reader: &mut (impl Read + ?Sized)) -> AvroResult<Schema> {
1168 let mut buf = String::new();
1169 match reader.read_to_string(&mut buf) {
1170 Ok(_) => Self::parse_str(&buf),
1171 Err(e) => Err(Details::ReadSchemaFromReader(e).into()),
1172 }
1173 }
1174
1175 pub fn parse(value: &Value) -> AvroResult<Schema> {
1177 let mut parser = Parser::default();
1178 parser.parse(value, &None)
1179 }
1180
1181 pub(crate) fn parse_with_names(value: &Value, names: Names) -> AvroResult<Schema> {
1184 let mut parser = Parser {
1185 input_schemas: HashMap::with_capacity(1),
1186 resolving_schemas: Names::default(),
1187 input_order: Vec::with_capacity(1),
1188 parsed_schemas: names,
1189 };
1190 parser.parse(value, &None)
1191 }
1192
1193 pub fn custom_attributes(&self) -> Option<&BTreeMap<String, Value>> {
1195 match self {
1196 Schema::Record(RecordSchema { attributes, .. })
1197 | Schema::Enum(EnumSchema { attributes, .. })
1198 | Schema::Fixed(FixedSchema { attributes, .. })
1199 | Schema::Array(ArraySchema { attributes, .. })
1200 | Schema::Map(MapSchema { attributes, .. }) => Some(attributes),
1201 _ => None,
1202 }
1203 }
1204
1205 pub fn name(&self) -> Option<&Name> {
1207 match self {
1208 Schema::Ref { name, .. }
1209 | Schema::Record(RecordSchema { name, .. })
1210 | Schema::Enum(EnumSchema { name, .. })
1211 | Schema::Fixed(FixedSchema { name, .. }) => Some(name),
1212 _ => None,
1213 }
1214 }
1215
1216 pub fn namespace(&self) -> Namespace {
1218 self.name().and_then(|n| n.namespace.clone())
1219 }
1220
1221 pub fn aliases(&self) -> Option<&Vec<Alias>> {
1223 match self {
1224 Schema::Record(RecordSchema { aliases, .. })
1225 | Schema::Enum(EnumSchema { aliases, .. })
1226 | Schema::Fixed(FixedSchema { aliases, .. }) => aliases.as_ref(),
1227 _ => None,
1228 }
1229 }
1230
1231 pub fn doc(&self) -> Option<&String> {
1233 match self {
1234 Schema::Record(RecordSchema { doc, .. })
1235 | Schema::Enum(EnumSchema { doc, .. })
1236 | Schema::Fixed(FixedSchema { doc, .. }) => doc.as_ref(),
1237 _ => None,
1238 }
1239 }
1240
1241 pub fn map(types: Schema) -> Self {
1243 Schema::Map(MapSchema {
1244 types: Box::new(types),
1245 attributes: Default::default(),
1246 })
1247 }
1248
1249 pub fn map_with_attributes(types: Schema, attributes: BTreeMap<String, Value>) -> Self {
1251 Schema::Map(MapSchema {
1252 types: Box::new(types),
1253 attributes,
1254 })
1255 }
1256
1257 pub fn array(items: Schema) -> Self {
1259 Schema::Array(ArraySchema {
1260 items: Box::new(items),
1261 attributes: Default::default(),
1262 })
1263 }
1264
1265 pub fn array_with_attributes(items: Schema, attributes: BTreeMap<String, Value>) -> Self {
1267 Schema::Array(ArraySchema {
1268 items: Box::new(items),
1269 attributes,
1270 })
1271 }
1272
1273 fn denormalize(&mut self, schemata: &[Schema]) -> AvroResult<()> {
1274 match self {
1275 Schema::Ref { name } => {
1276 let replacement_schema = schemata
1277 .iter()
1278 .find(|s| s.name().map(|n| *n == *name).unwrap_or(false));
1279 if let Some(schema) = replacement_schema {
1280 let mut denorm = schema.clone();
1281 denorm.denormalize(schemata)?;
1282 *self = denorm;
1283 } else {
1284 return Err(Details::SchemaResolutionError(name.clone()).into());
1285 }
1286 }
1287 Schema::Record(record_schema) => {
1288 for field in &mut record_schema.fields {
1289 field.schema.denormalize(schemata)?;
1290 }
1291 }
1292 Schema::Array(array_schema) => {
1293 array_schema.items.denormalize(schemata)?;
1294 }
1295 Schema::Map(map_schema) => {
1296 map_schema.types.denormalize(schemata)?;
1297 }
1298 Schema::Union(union_schema) => {
1299 for schema in &mut union_schema.schemas {
1300 schema.denormalize(schemata)?;
1301 }
1302 }
1303 _ => (),
1304 }
1305 Ok(())
1306 }
1307}
1308
1309impl Parser {
1310 fn parse_str(&mut self, input: &str) -> Result<Schema, Error> {
1312 let value = serde_json::from_str(input).map_err(Details::ParseSchemaJson)?;
1313 self.parse(&value, &None)
1314 }
1315
1316 fn parse_list(&mut self) -> Result<Vec<Schema>, Error> {
1319 self.parse_input_schemas()?;
1320
1321 let mut parsed_schemas = Vec::with_capacity(self.parsed_schemas.len());
1322 for name in self.input_order.drain(0..) {
1323 let parsed = self
1324 .parsed_schemas
1325 .remove(&name)
1326 .expect("One of the input schemas was unexpectedly not parsed");
1327 parsed_schemas.push(parsed);
1328 }
1329 Ok(parsed_schemas)
1330 }
1331
1332 fn parse_input_schemas(&mut self) -> Result<(), Error> {
1334 while !self.input_schemas.is_empty() {
1335 let next_name = self
1336 .input_schemas
1337 .keys()
1338 .next()
1339 .expect("Input schemas unexpectedly empty")
1340 .to_owned();
1341 let (name, value) = self
1342 .input_schemas
1343 .remove_entry(&next_name)
1344 .expect("Key unexpectedly missing");
1345 let parsed = self.parse(&value, &None)?;
1346 self.parsed_schemas
1347 .insert(get_schema_type_name(name, value), parsed);
1348 }
1349 Ok(())
1350 }
1351
1352 fn parse(&mut self, value: &Value, enclosing_namespace: &Namespace) -> AvroResult<Schema> {
1355 match *value {
1356 Value::String(ref t) => self.parse_known_schema(t.as_str(), enclosing_namespace),
1357 Value::Object(ref data) => {
1358 self.parse_complex(data, enclosing_namespace, RecordSchemaParseLocation::Root)
1359 }
1360 Value::Array(ref data) => self.parse_union(data, enclosing_namespace),
1361 _ => Err(Details::ParseSchemaFromValidJson.into()),
1362 }
1363 }
1364
1365 fn parse_known_schema(
1369 &mut self,
1370 name: &str,
1371 enclosing_namespace: &Namespace,
1372 ) -> AvroResult<Schema> {
1373 match name {
1374 "null" => Ok(Schema::Null),
1375 "boolean" => Ok(Schema::Boolean),
1376 "int" => Ok(Schema::Int),
1377 "long" => Ok(Schema::Long),
1378 "double" => Ok(Schema::Double),
1379 "float" => Ok(Schema::Float),
1380 "bytes" => Ok(Schema::Bytes),
1381 "string" => Ok(Schema::String),
1382 _ => self.fetch_schema_ref(name, enclosing_namespace),
1383 }
1384 }
1385
1386 fn fetch_schema_ref(
1396 &mut self,
1397 name: &str,
1398 enclosing_namespace: &Namespace,
1399 ) -> AvroResult<Schema> {
1400 fn get_schema_ref(parsed: &Schema) -> Schema {
1401 match parsed {
1402 &Schema::Record(RecordSchema { ref name, .. })
1403 | &Schema::Enum(EnumSchema { ref name, .. })
1404 | &Schema::Fixed(FixedSchema { ref name, .. }) => {
1405 Schema::Ref { name: name.clone() }
1406 }
1407 _ => parsed.clone(),
1408 }
1409 }
1410
1411 let name = Name::new(name)?;
1412 let fully_qualified_name = name.fully_qualified_name(enclosing_namespace);
1413
1414 if self.parsed_schemas.contains_key(&fully_qualified_name) {
1415 return Ok(Schema::Ref {
1416 name: fully_qualified_name,
1417 });
1418 }
1419 if let Some(resolving_schema) = self.resolving_schemas.get(&fully_qualified_name) {
1420 return Ok(resolving_schema.clone());
1421 }
1422
1423 match name.name.as_str() {
1425 "record" | "enum" | "fixed" => {
1426 return Err(Details::InvalidSchemaRecord(name.to_string()).into());
1427 }
1428 _ => (),
1429 }
1430
1431 let value = self
1432 .input_schemas
1433 .remove(&fully_qualified_name)
1434 .ok_or_else(|| Details::ParsePrimitive(fully_qualified_name.fullname(None)))?;
1436
1437 let parsed = self.parse(&value, &None)?;
1439 self.parsed_schemas
1440 .insert(get_schema_type_name(name, value), parsed.clone());
1441
1442 Ok(get_schema_ref(&parsed))
1443 }
1444
1445 fn parse_precision_and_scale(
1446 complex: &Map<String, Value>,
1447 ) -> Result<(Precision, Scale), Error> {
1448 fn get_decimal_integer(
1449 complex: &Map<String, Value>,
1450 key: &'static str,
1451 ) -> Result<DecimalMetadata, Error> {
1452 match complex.get(key) {
1453 Some(Value::Number(value)) => parse_json_integer_for_decimal(value),
1454 None => {
1455 if key == "scale" {
1456 Ok(0)
1457 } else {
1458 Err(Details::GetDecimalMetadataFromJson(key).into())
1459 }
1460 }
1461 Some(value) => Err(Details::GetDecimalMetadataValueFromJson {
1462 key: key.into(),
1463 value: value.clone(),
1464 }
1465 .into()),
1466 }
1467 }
1468 let precision = get_decimal_integer(complex, "precision")?;
1469 let scale = get_decimal_integer(complex, "scale")?;
1470
1471 if precision < 1 {
1472 return Err(Details::DecimalPrecisionMuBePositive { precision }.into());
1473 }
1474
1475 if precision < scale {
1476 Err(Details::DecimalPrecisionLessThanScale { precision, scale }.into())
1477 } else {
1478 Ok((precision, scale))
1479 }
1480 }
1481
1482 fn parse_complex(
1488 &mut self,
1489 complex: &Map<String, Value>,
1490 enclosing_namespace: &Namespace,
1491 parse_location: RecordSchemaParseLocation,
1492 ) -> AvroResult<Schema> {
1493 fn parse_as_native_complex(
1495 complex: &Map<String, Value>,
1496 parser: &mut Parser,
1497 enclosing_namespace: &Namespace,
1498 ) -> AvroResult<Schema> {
1499 match complex.get("type") {
1500 Some(value) => match value {
1501 Value::String(s) if s == "fixed" => {
1502 parser.parse_fixed(complex, enclosing_namespace)
1503 }
1504 _ => parser.parse(value, enclosing_namespace),
1505 },
1506 None => Err(Details::GetLogicalTypeField.into()),
1507 }
1508 }
1509
1510 fn try_convert_to_logical_type<F>(
1517 logical_type: &str,
1518 schema: Schema,
1519 supported_schema_kinds: &[SchemaKind],
1520 convert: F,
1521 ) -> AvroResult<Schema>
1522 where
1523 F: Fn(Schema) -> AvroResult<Schema>,
1524 {
1525 let kind = SchemaKind::from(schema.clone());
1526 if supported_schema_kinds.contains(&kind) {
1527 convert(schema)
1528 } else {
1529 warn!(
1530 "Ignoring unknown logical type '{logical_type}' for schema of type: {schema:?}!"
1531 );
1532 Ok(schema)
1533 }
1534 }
1535
1536 match complex.get("logicalType") {
1537 Some(Value::String(t)) => match t.as_str() {
1538 "decimal" => {
1539 return try_convert_to_logical_type(
1540 "decimal",
1541 parse_as_native_complex(complex, self, enclosing_namespace)?,
1542 &[SchemaKind::Fixed, SchemaKind::Bytes],
1543 |inner| -> AvroResult<Schema> {
1544 match Self::parse_precision_and_scale(complex) {
1545 Ok((precision, scale)) => Ok(Schema::Decimal(DecimalSchema {
1546 precision,
1547 scale,
1548 inner: Box::new(inner),
1549 })),
1550 Err(err) => {
1551 warn!("Ignoring invalid decimal logical type: {err}");
1552 Ok(inner)
1553 }
1554 }
1555 },
1556 );
1557 }
1558 "big-decimal" => {
1559 return try_convert_to_logical_type(
1560 "big-decimal",
1561 parse_as_native_complex(complex, self, enclosing_namespace)?,
1562 &[SchemaKind::Bytes],
1563 |_| -> AvroResult<Schema> { Ok(Schema::BigDecimal) },
1564 );
1565 }
1566 "uuid" => {
1567 return try_convert_to_logical_type(
1568 "uuid",
1569 parse_as_native_complex(complex, self, enclosing_namespace)?,
1570 &[SchemaKind::String, SchemaKind::Fixed],
1571 |schema| match schema {
1572 Schema::String => Ok(Schema::Uuid),
1573 Schema::Fixed(FixedSchema { size: 16, .. }) => Ok(Schema::Uuid),
1574 Schema::Fixed(FixedSchema { size, .. }) => {
1575 warn!(
1576 "Ignoring uuid logical type for a Fixed schema because its size ({size:?}) is not 16! Schema: {schema:?}"
1577 );
1578 Ok(schema)
1579 }
1580 _ => {
1581 warn!("Ignoring invalid uuid logical type for schema: {schema:?}");
1582 Ok(schema)
1583 }
1584 },
1585 );
1586 }
1587 "date" => {
1588 return try_convert_to_logical_type(
1589 "date",
1590 parse_as_native_complex(complex, self, enclosing_namespace)?,
1591 &[SchemaKind::Int],
1592 |_| -> AvroResult<Schema> { Ok(Schema::Date) },
1593 );
1594 }
1595 "time-millis" => {
1596 return try_convert_to_logical_type(
1597 "date",
1598 parse_as_native_complex(complex, self, enclosing_namespace)?,
1599 &[SchemaKind::Int],
1600 |_| -> AvroResult<Schema> { Ok(Schema::TimeMillis) },
1601 );
1602 }
1603 "time-micros" => {
1604 return try_convert_to_logical_type(
1605 "time-micros",
1606 parse_as_native_complex(complex, self, enclosing_namespace)?,
1607 &[SchemaKind::Long],
1608 |_| -> AvroResult<Schema> { Ok(Schema::TimeMicros) },
1609 );
1610 }
1611 "timestamp-millis" => {
1612 return try_convert_to_logical_type(
1613 "timestamp-millis",
1614 parse_as_native_complex(complex, self, enclosing_namespace)?,
1615 &[SchemaKind::Long],
1616 |_| -> AvroResult<Schema> { Ok(Schema::TimestampMillis) },
1617 );
1618 }
1619 "timestamp-micros" => {
1620 return try_convert_to_logical_type(
1621 "timestamp-micros",
1622 parse_as_native_complex(complex, self, enclosing_namespace)?,
1623 &[SchemaKind::Long],
1624 |_| -> AvroResult<Schema> { Ok(Schema::TimestampMicros) },
1625 );
1626 }
1627 "timestamp-nanos" => {
1628 return try_convert_to_logical_type(
1629 "timestamp-nanos",
1630 parse_as_native_complex(complex, self, enclosing_namespace)?,
1631 &[SchemaKind::Long],
1632 |_| -> AvroResult<Schema> { Ok(Schema::TimestampNanos) },
1633 );
1634 }
1635 "local-timestamp-millis" => {
1636 return try_convert_to_logical_type(
1637 "local-timestamp-millis",
1638 parse_as_native_complex(complex, self, enclosing_namespace)?,
1639 &[SchemaKind::Long],
1640 |_| -> AvroResult<Schema> { Ok(Schema::LocalTimestampMillis) },
1641 );
1642 }
1643 "local-timestamp-micros" => {
1644 return try_convert_to_logical_type(
1645 "local-timestamp-micros",
1646 parse_as_native_complex(complex, self, enclosing_namespace)?,
1647 &[SchemaKind::Long],
1648 |_| -> AvroResult<Schema> { Ok(Schema::LocalTimestampMicros) },
1649 );
1650 }
1651 "local-timestamp-nanos" => {
1652 return try_convert_to_logical_type(
1653 "local-timestamp-nanos",
1654 parse_as_native_complex(complex, self, enclosing_namespace)?,
1655 &[SchemaKind::Long],
1656 |_| -> AvroResult<Schema> { Ok(Schema::LocalTimestampNanos) },
1657 );
1658 }
1659 "duration" => {
1660 return try_convert_to_logical_type(
1661 "duration",
1662 parse_as_native_complex(complex, self, enclosing_namespace)?,
1663 &[SchemaKind::Fixed],
1664 |_| -> AvroResult<Schema> { Ok(Schema::Duration) },
1665 );
1666 }
1667 _ => {}
1670 },
1671 Some(value) => return Err(Details::GetLogicalTypeFieldType(value.clone()).into()),
1675 _ => {}
1676 }
1677 match complex.get("type") {
1678 Some(Value::String(t)) => match t.as_str() {
1679 "record" => match parse_location {
1680 RecordSchemaParseLocation::Root => {
1681 self.parse_record(complex, enclosing_namespace)
1682 }
1683 RecordSchemaParseLocation::FromField => {
1684 self.fetch_schema_ref(t, enclosing_namespace)
1685 }
1686 },
1687 "enum" => self.parse_enum(complex, enclosing_namespace),
1688 "array" => self.parse_array(complex, enclosing_namespace),
1689 "map" => self.parse_map(complex, enclosing_namespace),
1690 "fixed" => self.parse_fixed(complex, enclosing_namespace),
1691 other => self.parse_known_schema(other, enclosing_namespace),
1692 },
1693 Some(Value::Object(data)) => {
1694 self.parse_complex(data, enclosing_namespace, RecordSchemaParseLocation::Root)
1695 }
1696 Some(Value::Array(variants)) => self.parse_union(variants, enclosing_namespace),
1697 Some(unknown) => Err(Details::GetComplexType(unknown.clone()).into()),
1698 None => Err(Details::GetComplexTypeField.into()),
1699 }
1700 }
1701
1702 fn register_resolving_schema(&mut self, name: &Name, aliases: &Aliases) {
1703 let resolving_schema = Schema::Ref { name: name.clone() };
1704 self.resolving_schemas
1705 .insert(name.clone(), resolving_schema.clone());
1706
1707 let namespace = &name.namespace;
1708
1709 if let Some(aliases) = aliases {
1710 aliases.iter().for_each(|alias| {
1711 let alias_fullname = alias.fully_qualified_name(namespace);
1712 self.resolving_schemas
1713 .insert(alias_fullname, resolving_schema.clone());
1714 });
1715 }
1716 }
1717
1718 fn register_parsed_schema(
1719 &mut self,
1720 fully_qualified_name: &Name,
1721 schema: &Schema,
1722 aliases: &Aliases,
1723 ) {
1724 self.parsed_schemas
1727 .insert(fully_qualified_name.clone(), schema.clone());
1728 self.resolving_schemas.remove(fully_qualified_name);
1729
1730 let namespace = &fully_qualified_name.namespace;
1731
1732 if let Some(aliases) = aliases {
1733 aliases.iter().for_each(|alias| {
1734 let alias_fullname = alias.fully_qualified_name(namespace);
1735 self.resolving_schemas.remove(&alias_fullname);
1736 self.parsed_schemas.insert(alias_fullname, schema.clone());
1737 });
1738 }
1739 }
1740
1741 fn get_already_seen_schema(
1743 &self,
1744 complex: &Map<String, Value>,
1745 enclosing_namespace: &Namespace,
1746 ) -> Option<&Schema> {
1747 match complex.get("type") {
1748 Some(Value::String(typ)) => {
1749 let name = Name::new(typ.as_str())
1750 .unwrap()
1751 .fully_qualified_name(enclosing_namespace);
1752 self.resolving_schemas
1753 .get(&name)
1754 .or_else(|| self.parsed_schemas.get(&name))
1755 }
1756 _ => None,
1757 }
1758 }
1759
1760 fn parse_record(
1763 &mut self,
1764 complex: &Map<String, Value>,
1765 enclosing_namespace: &Namespace,
1766 ) -> AvroResult<Schema> {
1767 let fields_opt = complex.get("fields");
1768
1769 if fields_opt.is_none() {
1770 if let Some(seen) = self.get_already_seen_schema(complex, enclosing_namespace) {
1771 return Ok(seen.clone());
1772 }
1773 }
1774
1775 let fully_qualified_name = Name::parse(complex, enclosing_namespace)?;
1776 let aliases = fix_aliases_namespace(complex.aliases(), &fully_qualified_name.namespace);
1777
1778 let mut lookup = BTreeMap::new();
1779
1780 self.register_resolving_schema(&fully_qualified_name, &aliases);
1781
1782 debug!("Going to parse record schema: {:?}", &fully_qualified_name);
1783
1784 let fields: Vec<RecordField> = fields_opt
1785 .and_then(|fields| fields.as_array())
1786 .ok_or_else(|| Error::new(Details::GetRecordFieldsJson))
1787 .and_then(|fields| {
1788 fields
1789 .iter()
1790 .filter_map(|field| field.as_object())
1791 .enumerate()
1792 .map(|(position, field)| {
1793 RecordField::parse(field, position, self, &fully_qualified_name)
1794 })
1795 .collect::<Result<_, _>>()
1796 })?;
1797
1798 for field in &fields {
1799 if let Some(_old) = lookup.insert(field.name.clone(), field.position) {
1800 return Err(Details::FieldNameDuplicate(field.name.clone()).into());
1801 }
1802
1803 if let Some(ref field_aliases) = field.aliases {
1804 for alias in field_aliases {
1805 lookup.insert(alias.clone(), field.position);
1806 }
1807 }
1808 }
1809
1810 let schema = Schema::Record(RecordSchema {
1811 name: fully_qualified_name.clone(),
1812 aliases: aliases.clone(),
1813 doc: complex.doc(),
1814 fields,
1815 lookup,
1816 attributes: self.get_custom_attributes(complex, vec!["fields"]),
1817 });
1818
1819 self.register_parsed_schema(&fully_qualified_name, &schema, &aliases);
1820 Ok(schema)
1821 }
1822
1823 fn get_custom_attributes(
1824 &self,
1825 complex: &Map<String, Value>,
1826 excluded: Vec<&'static str>,
1827 ) -> BTreeMap<String, Value> {
1828 let mut custom_attributes: BTreeMap<String, Value> = BTreeMap::new();
1829 for (key, value) in complex {
1830 match key.as_str() {
1831 "type" | "name" | "namespace" | "doc" | "aliases" => continue,
1832 candidate if excluded.contains(&candidate) => continue,
1833 _ => custom_attributes.insert(key.clone(), value.clone()),
1834 };
1835 }
1836 custom_attributes
1837 }
1838
1839 fn parse_enum(
1842 &mut self,
1843 complex: &Map<String, Value>,
1844 enclosing_namespace: &Namespace,
1845 ) -> AvroResult<Schema> {
1846 let symbols_opt = complex.get("symbols");
1847
1848 if symbols_opt.is_none() {
1849 if let Some(seen) = self.get_already_seen_schema(complex, enclosing_namespace) {
1850 return Ok(seen.clone());
1851 }
1852 }
1853
1854 let fully_qualified_name = Name::parse(complex, enclosing_namespace)?;
1855 let aliases = fix_aliases_namespace(complex.aliases(), &fully_qualified_name.namespace);
1856
1857 let symbols: Vec<String> = symbols_opt
1858 .and_then(|v| v.as_array())
1859 .ok_or(Details::GetEnumSymbolsField)
1860 .and_then(|symbols| {
1861 symbols
1862 .iter()
1863 .map(|symbol| symbol.as_str().map(|s| s.to_string()))
1864 .collect::<Option<_>>()
1865 .ok_or(Details::GetEnumSymbols)
1866 })?;
1867
1868 let mut existing_symbols: HashSet<&String> = HashSet::with_capacity(symbols.len());
1869 for symbol in symbols.iter() {
1870 validate_enum_symbol_name(symbol)?;
1871
1872 if existing_symbols.contains(&symbol) {
1874 return Err(Details::EnumSymbolDuplicate(symbol.to_string()).into());
1875 }
1876
1877 existing_symbols.insert(symbol);
1878 }
1879
1880 let mut default: Option<String> = None;
1881 if let Some(value) = complex.get("default") {
1882 if let Value::String(ref s) = *value {
1883 default = Some(s.clone());
1884 } else {
1885 return Err(Details::EnumDefaultWrongType(value.clone()).into());
1886 }
1887 }
1888
1889 if let Some(ref value) = default {
1890 let resolved = types::Value::from(value.clone())
1891 .resolve_enum(&symbols, &Some(value.to_string()), &None)
1892 .is_ok();
1893 if !resolved {
1894 return Err(Details::GetEnumDefault {
1895 symbol: value.to_string(),
1896 symbols,
1897 }
1898 .into());
1899 }
1900 }
1901
1902 let schema = Schema::Enum(EnumSchema {
1903 name: fully_qualified_name.clone(),
1904 aliases: aliases.clone(),
1905 doc: complex.doc(),
1906 symbols,
1907 default,
1908 attributes: self.get_custom_attributes(complex, vec!["symbols"]),
1909 });
1910
1911 self.register_parsed_schema(&fully_qualified_name, &schema, &aliases);
1912
1913 Ok(schema)
1914 }
1915
1916 fn parse_array(
1919 &mut self,
1920 complex: &Map<String, Value>,
1921 enclosing_namespace: &Namespace,
1922 ) -> AvroResult<Schema> {
1923 complex
1924 .get("items")
1925 .ok_or_else(|| Details::GetArrayItemsField.into())
1926 .and_then(|items| self.parse(items, enclosing_namespace))
1927 .map(|items| {
1928 Schema::array_with_attributes(
1929 items,
1930 self.get_custom_attributes(complex, vec!["items"]),
1931 )
1932 })
1933 }
1934
1935 fn parse_map(
1938 &mut self,
1939 complex: &Map<String, Value>,
1940 enclosing_namespace: &Namespace,
1941 ) -> AvroResult<Schema> {
1942 complex
1943 .get("values")
1944 .ok_or_else(|| Details::GetMapValuesField.into())
1945 .and_then(|items| self.parse(items, enclosing_namespace))
1946 .map(|items| {
1947 Schema::map_with_attributes(
1948 items,
1949 self.get_custom_attributes(complex, vec!["values"]),
1950 )
1951 })
1952 }
1953
1954 fn parse_union(
1957 &mut self,
1958 items: &[Value],
1959 enclosing_namespace: &Namespace,
1960 ) -> AvroResult<Schema> {
1961 items
1962 .iter()
1963 .map(|v| self.parse(v, enclosing_namespace))
1964 .collect::<Result<Vec<_>, _>>()
1965 .and_then(|schemas| {
1966 if schemas.is_empty() {
1967 error!(
1968 "Union schemas should have at least two members! \
1969 Please enable debug logging to find out which Record schema \
1970 declares the union with 'RUST_LOG=apache_avro::schema=debug'."
1971 );
1972 } else if schemas.len() == 1 {
1973 warn!(
1974 "Union schema with just one member! Consider dropping the union! \
1975 Please enable debug logging to find out which Record schema \
1976 declares the union with 'RUST_LOG=apache_avro::schema=debug'."
1977 );
1978 }
1979 Ok(Schema::Union(UnionSchema::new(schemas)?))
1980 })
1981 }
1982
1983 fn parse_fixed(
1986 &mut self,
1987 complex: &Map<String, Value>,
1988 enclosing_namespace: &Namespace,
1989 ) -> AvroResult<Schema> {
1990 let size_opt = complex.get("size");
1991 if size_opt.is_none() {
1992 if let Some(seen) = self.get_already_seen_schema(complex, enclosing_namespace) {
1993 return Ok(seen.clone());
1994 }
1995 }
1996
1997 let doc = complex.get("doc").and_then(|v| match &v {
1998 &Value::String(docstr) => Some(docstr.clone()),
1999 _ => None,
2000 });
2001
2002 let size = match size_opt {
2003 Some(size) => size
2004 .as_u64()
2005 .ok_or_else(|| Details::GetFixedSizeFieldPositive(size.clone())),
2006 None => Err(Details::GetFixedSizeField),
2007 }?;
2008
2009 let default = complex.get("default").and_then(|v| match &v {
2010 &Value::String(default) => Some(default.clone()),
2011 _ => None,
2012 });
2013
2014 if default.is_some() {
2015 let len = default.clone().unwrap().len();
2016 if len != size as usize {
2017 return Err(Details::FixedDefaultLenSizeMismatch(len, size).into());
2018 }
2019 }
2020
2021 let fully_qualified_name = Name::parse(complex, enclosing_namespace)?;
2022 let aliases = fix_aliases_namespace(complex.aliases(), &fully_qualified_name.namespace);
2023
2024 let schema = Schema::Fixed(FixedSchema {
2025 name: fully_qualified_name.clone(),
2026 aliases: aliases.clone(),
2027 doc,
2028 size: size as usize,
2029 default,
2030 attributes: self.get_custom_attributes(complex, vec!["size"]),
2031 });
2032
2033 self.register_parsed_schema(&fully_qualified_name, &schema, &aliases);
2034
2035 Ok(schema)
2036 }
2037}
2038
2039fn fix_aliases_namespace(aliases: Option<Vec<String>>, namespace: &Namespace) -> Aliases {
2045 aliases.map(|aliases| {
2046 aliases
2047 .iter()
2048 .map(|alias| {
2049 if alias.find('.').is_none() {
2050 match namespace {
2051 Some(ns) => format!("{ns}.{alias}"),
2052 None => alias.clone(),
2053 }
2054 } else {
2055 alias.clone()
2056 }
2057 })
2058 .map(|alias| Alias::new(alias.as_str()).unwrap())
2059 .collect()
2060 })
2061}
2062
2063fn get_schema_type_name(name: Name, value: Value) -> Name {
2064 match value.get("type") {
2065 Some(Value::Object(complex_type)) => match complex_type.name() {
2066 Some(name) => Name::new(name.as_str()).unwrap(),
2067 _ => name,
2068 },
2069 _ => name,
2070 }
2071}
2072
2073impl Serialize for Schema {
2074 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
2075 where
2076 S: Serializer,
2077 {
2078 match *self {
2079 Schema::Ref { ref name } => serializer.serialize_str(&name.fullname(None)),
2080 Schema::Null => serializer.serialize_str("null"),
2081 Schema::Boolean => serializer.serialize_str("boolean"),
2082 Schema::Int => serializer.serialize_str("int"),
2083 Schema::Long => serializer.serialize_str("long"),
2084 Schema::Float => serializer.serialize_str("float"),
2085 Schema::Double => serializer.serialize_str("double"),
2086 Schema::Bytes => serializer.serialize_str("bytes"),
2087 Schema::String => serializer.serialize_str("string"),
2088 Schema::Array(ref inner) => {
2089 let mut map = serializer.serialize_map(Some(2 + inner.attributes.len()))?;
2090 map.serialize_entry("type", "array")?;
2091 map.serialize_entry("items", &*inner.items.clone())?;
2092 for attr in &inner.attributes {
2093 map.serialize_entry(attr.0, attr.1)?;
2094 }
2095 map.end()
2096 }
2097 Schema::Map(ref inner) => {
2098 let mut map = serializer.serialize_map(Some(2 + inner.attributes.len()))?;
2099 map.serialize_entry("type", "map")?;
2100 map.serialize_entry("values", &*inner.types.clone())?;
2101 for attr in &inner.attributes {
2102 map.serialize_entry(attr.0, attr.1)?;
2103 }
2104 map.end()
2105 }
2106 Schema::Union(ref inner) => {
2107 let variants = inner.variants();
2108 let mut seq = serializer.serialize_seq(Some(variants.len()))?;
2109 for v in variants {
2110 seq.serialize_element(v)?;
2111 }
2112 seq.end()
2113 }
2114 Schema::Record(RecordSchema {
2115 ref name,
2116 ref aliases,
2117 ref doc,
2118 ref fields,
2119 ref attributes,
2120 ..
2121 }) => {
2122 let mut map = serializer.serialize_map(None)?;
2123 map.serialize_entry("type", "record")?;
2124 if let Some(ref n) = name.namespace {
2125 map.serialize_entry("namespace", n)?;
2126 }
2127 map.serialize_entry("name", &name.name)?;
2128 if let Some(docstr) = doc {
2129 map.serialize_entry("doc", docstr)?;
2130 }
2131 if let Some(aliases) = aliases {
2132 map.serialize_entry("aliases", aliases)?;
2133 }
2134 map.serialize_entry("fields", fields)?;
2135 for attr in attributes {
2136 map.serialize_entry(attr.0, attr.1)?;
2137 }
2138 map.end()
2139 }
2140 Schema::Enum(EnumSchema {
2141 ref name,
2142 ref symbols,
2143 ref aliases,
2144 ref attributes,
2145 ..
2146 }) => {
2147 let mut map = serializer.serialize_map(None)?;
2148 map.serialize_entry("type", "enum")?;
2149 if let Some(ref n) = name.namespace {
2150 map.serialize_entry("namespace", n)?;
2151 }
2152 map.serialize_entry("name", &name.name)?;
2153 map.serialize_entry("symbols", symbols)?;
2154
2155 if let Some(aliases) = aliases {
2156 map.serialize_entry("aliases", aliases)?;
2157 }
2158 for attr in attributes {
2159 map.serialize_entry(attr.0, attr.1)?;
2160 }
2161 map.end()
2162 }
2163 Schema::Fixed(ref fixed_schema) => {
2164 let mut map = serializer.serialize_map(None)?;
2165 map = fixed_schema.serialize_to_map::<S>(map)?;
2166 map.end()
2167 }
2168 Schema::Decimal(DecimalSchema {
2169 ref scale,
2170 ref precision,
2171 ref inner,
2172 }) => {
2173 let mut map = serializer.serialize_map(None)?;
2174 match inner.as_ref() {
2175 Schema::Fixed(fixed_schema) => {
2176 map = fixed_schema.serialize_to_map::<S>(map)?;
2177 }
2178 Schema::Bytes => {
2179 map.serialize_entry("type", "bytes")?;
2180 }
2181 others => {
2182 return Err(serde::ser::Error::custom(format!(
2183 "DecimalSchema inner type must be Fixed or Bytes, got {:?}",
2184 SchemaKind::from(others)
2185 )));
2186 }
2187 }
2188 map.serialize_entry("logicalType", "decimal")?;
2189 map.serialize_entry("scale", scale)?;
2190 map.serialize_entry("precision", precision)?;
2191 map.end()
2192 }
2193
2194 Schema::BigDecimal => {
2195 let mut map = serializer.serialize_map(None)?;
2196 map.serialize_entry("type", "bytes")?;
2197 map.serialize_entry("logicalType", "big-decimal")?;
2198 map.end()
2199 }
2200 Schema::Uuid => {
2201 let mut map = serializer.serialize_map(None)?;
2202 map.serialize_entry("type", "string")?;
2203 map.serialize_entry("logicalType", "uuid")?;
2204 map.end()
2205 }
2206 Schema::Date => {
2207 let mut map = serializer.serialize_map(None)?;
2208 map.serialize_entry("type", "int")?;
2209 map.serialize_entry("logicalType", "date")?;
2210 map.end()
2211 }
2212 Schema::TimeMillis => {
2213 let mut map = serializer.serialize_map(None)?;
2214 map.serialize_entry("type", "int")?;
2215 map.serialize_entry("logicalType", "time-millis")?;
2216 map.end()
2217 }
2218 Schema::TimeMicros => {
2219 let mut map = serializer.serialize_map(None)?;
2220 map.serialize_entry("type", "long")?;
2221 map.serialize_entry("logicalType", "time-micros")?;
2222 map.end()
2223 }
2224 Schema::TimestampMillis => {
2225 let mut map = serializer.serialize_map(None)?;
2226 map.serialize_entry("type", "long")?;
2227 map.serialize_entry("logicalType", "timestamp-millis")?;
2228 map.end()
2229 }
2230 Schema::TimestampMicros => {
2231 let mut map = serializer.serialize_map(None)?;
2232 map.serialize_entry("type", "long")?;
2233 map.serialize_entry("logicalType", "timestamp-micros")?;
2234 map.end()
2235 }
2236 Schema::TimestampNanos => {
2237 let mut map = serializer.serialize_map(None)?;
2238 map.serialize_entry("type", "long")?;
2239 map.serialize_entry("logicalType", "timestamp-nanos")?;
2240 map.end()
2241 }
2242 Schema::LocalTimestampMillis => {
2243 let mut map = serializer.serialize_map(None)?;
2244 map.serialize_entry("type", "long")?;
2245 map.serialize_entry("logicalType", "local-timestamp-millis")?;
2246 map.end()
2247 }
2248 Schema::LocalTimestampMicros => {
2249 let mut map = serializer.serialize_map(None)?;
2250 map.serialize_entry("type", "long")?;
2251 map.serialize_entry("logicalType", "local-timestamp-micros")?;
2252 map.end()
2253 }
2254 Schema::LocalTimestampNanos => {
2255 let mut map = serializer.serialize_map(None)?;
2256 map.serialize_entry("type", "long")?;
2257 map.serialize_entry("logicalType", "local-timestamp-nanos")?;
2258 map.end()
2259 }
2260 Schema::Duration => {
2261 let mut map = serializer.serialize_map(None)?;
2262
2263 let inner = Schema::Fixed(FixedSchema {
2266 name: Name::new("duration").unwrap(),
2267 aliases: None,
2268 doc: None,
2269 size: 12,
2270 default: None,
2271 attributes: Default::default(),
2272 });
2273 map.serialize_entry("type", &inner)?;
2274 map.serialize_entry("logicalType", "duration")?;
2275 map.end()
2276 }
2277 }
2278 }
2279}
2280
2281impl Serialize for RecordField {
2282 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
2283 where
2284 S: Serializer,
2285 {
2286 let mut map = serializer.serialize_map(None)?;
2287 map.serialize_entry("name", &self.name)?;
2288 map.serialize_entry("type", &self.schema)?;
2289
2290 if let Some(ref default) = self.default {
2291 map.serialize_entry("default", default)?;
2292 }
2293
2294 if let Some(ref aliases) = self.aliases {
2295 map.serialize_entry("aliases", aliases)?;
2296 }
2297
2298 for attr in &self.custom_attributes {
2299 map.serialize_entry(attr.0, attr.1)?;
2300 }
2301
2302 map.end()
2303 }
2304}
2305
2306fn parsing_canonical_form(schema: &Value, defined_names: &mut HashSet<String>) -> String {
2309 match schema {
2310 Value::Object(map) => pcf_map(map, defined_names),
2311 Value::String(s) => pcf_string(s),
2312 Value::Array(v) => pcf_array(v, defined_names),
2313 json => panic!("got invalid JSON value for canonical form of schema: {json}"),
2314 }
2315}
2316
2317fn pcf_map(schema: &Map<String, Value>, defined_names: &mut HashSet<String>) -> String {
2318 let ns = schema.get("namespace").and_then(|v| v.as_str());
2320 let typ = schema.get("type").and_then(|v| v.as_str());
2321 let raw_name = schema.get("name").and_then(|v| v.as_str());
2322 let name = if is_named_type(typ) {
2323 Some(format!(
2324 "{}{}",
2325 ns.map_or("".to_string(), |n| { format!("{n}.") }),
2326 raw_name.unwrap_or_default()
2327 ))
2328 } else {
2329 None
2330 };
2331
2332 if let Some(ref n) = name {
2334 if defined_names.contains(n) {
2335 return pcf_string(n);
2336 } else {
2337 defined_names.insert(n.clone());
2338 }
2339 }
2340
2341 let mut fields = Vec::new();
2342 for (k, v) in schema {
2343 if schema.len() == 1 && k == "type" {
2345 if let Value::String(s) = v {
2347 return pcf_string(s);
2348 }
2349 }
2350
2351 if field_ordering_position(k).is_none()
2353 || k == "default"
2354 || k == "doc"
2355 || k == "aliases"
2356 || k == "logicalType"
2357 {
2358 continue;
2359 }
2360
2361 if k == "name" {
2363 if let Some(ref n) = name {
2364 fields.push(("name", format!("{}:{}", pcf_string(k), pcf_string(n))));
2365 continue;
2366 }
2367 }
2368
2369 if k == "size" || k == "precision" || k == "scale" {
2371 let i = match v.as_str() {
2372 Some(s) => s.parse::<i64>().expect("Only valid schemas are accepted!"),
2373 None => v.as_i64().unwrap(),
2374 };
2375 fields.push((k, format!("{}:{}", pcf_string(k), i)));
2376 continue;
2377 }
2378
2379 fields.push((
2381 k,
2382 format!(
2383 "{}:{}",
2384 pcf_string(k),
2385 parsing_canonical_form(v, defined_names)
2386 ),
2387 ));
2388 }
2389
2390 fields.sort_unstable_by_key(|(k, _)| field_ordering_position(k).unwrap());
2392 let inter = fields
2393 .into_iter()
2394 .map(|(_, v)| v)
2395 .collect::<Vec<_>>()
2396 .join(",");
2397 format!("{{{inter}}}")
2398}
2399
2400fn is_named_type(typ: Option<&str>) -> bool {
2401 matches!(
2402 typ,
2403 Some("record") | Some("enum") | Some("fixed") | Some("ref")
2404 )
2405}
2406
2407fn pcf_array(arr: &[Value], defined_names: &mut HashSet<String>) -> String {
2408 let inter = arr
2409 .iter()
2410 .map(|a| parsing_canonical_form(a, defined_names))
2411 .collect::<Vec<String>>()
2412 .join(",");
2413 format!("[{inter}]")
2414}
2415
2416fn pcf_string(s: &str) -> String {
2417 format!("\"{s}\"")
2418}
2419
2420const RESERVED_FIELDS: &[&str] = &[
2421 "name",
2422 "type",
2423 "fields",
2424 "symbols",
2425 "items",
2426 "values",
2427 "size",
2428 "logicalType",
2429 "order",
2430 "doc",
2431 "aliases",
2432 "default",
2433 "precision",
2434 "scale",
2435];
2436
2437fn field_ordering_position(field: &str) -> Option<usize> {
2439 RESERVED_FIELDS
2440 .iter()
2441 .position(|&f| f == field)
2442 .map(|pos| pos + 1)
2443}
2444
2445pub trait AvroSchema {
2450 fn get_schema() -> Schema;
2451}
2452
2453#[cfg(feature = "derive")]
2454pub mod derive {
2455 use super::*;
2456 use std::borrow::Cow;
2457
2458 pub trait AvroSchemaComponent {
2507 fn get_schema_in_ctxt(named_schemas: &mut Names, enclosing_namespace: &Namespace)
2508 -> Schema;
2509 }
2510
2511 impl<T> AvroSchema for T
2512 where
2513 T: AvroSchemaComponent,
2514 {
2515 fn get_schema() -> Schema {
2516 T::get_schema_in_ctxt(&mut HashMap::default(), &None)
2517 }
2518 }
2519
2520 macro_rules! impl_schema (
2521 ($type:ty, $variant_constructor:expr) => (
2522 impl AvroSchemaComponent for $type {
2523 fn get_schema_in_ctxt(_: &mut Names, _: &Namespace) -> Schema {
2524 $variant_constructor
2525 }
2526 }
2527 );
2528 );
2529
2530 impl_schema!(bool, Schema::Boolean);
2531 impl_schema!(i8, Schema::Int);
2532 impl_schema!(i16, Schema::Int);
2533 impl_schema!(i32, Schema::Int);
2534 impl_schema!(i64, Schema::Long);
2535 impl_schema!(u8, Schema::Int);
2536 impl_schema!(u16, Schema::Int);
2537 impl_schema!(u32, Schema::Long);
2538 impl_schema!(f32, Schema::Float);
2539 impl_schema!(f64, Schema::Double);
2540 impl_schema!(String, Schema::String);
2541 impl_schema!(uuid::Uuid, Schema::Uuid);
2542 impl_schema!(core::time::Duration, Schema::Duration);
2543
2544 impl<T> AvroSchemaComponent for Vec<T>
2545 where
2546 T: AvroSchemaComponent,
2547 {
2548 fn get_schema_in_ctxt(
2549 named_schemas: &mut Names,
2550 enclosing_namespace: &Namespace,
2551 ) -> Schema {
2552 Schema::array(T::get_schema_in_ctxt(named_schemas, enclosing_namespace))
2553 }
2554 }
2555
2556 impl<T> AvroSchemaComponent for Option<T>
2557 where
2558 T: AvroSchemaComponent,
2559 {
2560 fn get_schema_in_ctxt(
2561 named_schemas: &mut Names,
2562 enclosing_namespace: &Namespace,
2563 ) -> Schema {
2564 let inner_schema = T::get_schema_in_ctxt(named_schemas, enclosing_namespace);
2565 Schema::Union(UnionSchema {
2566 schemas: vec![Schema::Null, inner_schema.clone()],
2567 variant_index: vec![Schema::Null, inner_schema]
2568 .iter()
2569 .enumerate()
2570 .map(|(idx, s)| (SchemaKind::from(s), idx))
2571 .collect(),
2572 })
2573 }
2574 }
2575
2576 impl<T> AvroSchemaComponent for Map<String, T>
2577 where
2578 T: AvroSchemaComponent,
2579 {
2580 fn get_schema_in_ctxt(
2581 named_schemas: &mut Names,
2582 enclosing_namespace: &Namespace,
2583 ) -> Schema {
2584 Schema::map(T::get_schema_in_ctxt(named_schemas, enclosing_namespace))
2585 }
2586 }
2587
2588 impl<T> AvroSchemaComponent for HashMap<String, T>
2589 where
2590 T: AvroSchemaComponent,
2591 {
2592 fn get_schema_in_ctxt(
2593 named_schemas: &mut Names,
2594 enclosing_namespace: &Namespace,
2595 ) -> Schema {
2596 Schema::map(T::get_schema_in_ctxt(named_schemas, enclosing_namespace))
2597 }
2598 }
2599
2600 impl<T> AvroSchemaComponent for Box<T>
2601 where
2602 T: AvroSchemaComponent,
2603 {
2604 fn get_schema_in_ctxt(
2605 named_schemas: &mut Names,
2606 enclosing_namespace: &Namespace,
2607 ) -> Schema {
2608 T::get_schema_in_ctxt(named_schemas, enclosing_namespace)
2609 }
2610 }
2611
2612 impl<T> AvroSchemaComponent for std::sync::Mutex<T>
2613 where
2614 T: AvroSchemaComponent,
2615 {
2616 fn get_schema_in_ctxt(
2617 named_schemas: &mut Names,
2618 enclosing_namespace: &Namespace,
2619 ) -> Schema {
2620 T::get_schema_in_ctxt(named_schemas, enclosing_namespace)
2621 }
2622 }
2623
2624 impl<T> AvroSchemaComponent for Cow<'_, T>
2625 where
2626 T: AvroSchemaComponent + Clone,
2627 {
2628 fn get_schema_in_ctxt(
2629 named_schemas: &mut Names,
2630 enclosing_namespace: &Namespace,
2631 ) -> Schema {
2632 T::get_schema_in_ctxt(named_schemas, enclosing_namespace)
2633 }
2634 }
2635}
2636
2637#[cfg(test)]
2638mod tests {
2639 use super::*;
2640 use crate::{SpecificSingleObjectWriter, error::Details, rabin::Rabin};
2641 use apache_avro_test_helper::{
2642 TestResult,
2643 logger::{assert_logged, assert_not_logged},
2644 };
2645 use serde_json::json;
2646 use serial_test::serial;
2647 use std::sync::atomic::Ordering;
2648
2649 #[test]
2650 fn test_invalid_schema() {
2651 assert!(Schema::parse_str("invalid").is_err());
2652 }
2653
2654 #[test]
2655 fn test_primitive_schema() -> TestResult {
2656 assert_eq!(Schema::Null, Schema::parse_str("\"null\"")?);
2657 assert_eq!(Schema::Int, Schema::parse_str("\"int\"")?);
2658 assert_eq!(Schema::Double, Schema::parse_str("\"double\"")?);
2659 Ok(())
2660 }
2661
2662 #[test]
2663 fn test_array_schema() -> TestResult {
2664 let schema = Schema::parse_str(r#"{"type": "array", "items": "string"}"#)?;
2665 assert_eq!(Schema::array(Schema::String), schema);
2666 Ok(())
2667 }
2668
2669 #[test]
2670 fn test_map_schema() -> TestResult {
2671 let schema = Schema::parse_str(r#"{"type": "map", "values": "double"}"#)?;
2672 assert_eq!(Schema::map(Schema::Double), schema);
2673 Ok(())
2674 }
2675
2676 #[test]
2677 fn test_union_schema() -> TestResult {
2678 let schema = Schema::parse_str(r#"["null", "int"]"#)?;
2679 assert_eq!(
2680 Schema::Union(UnionSchema::new(vec![Schema::Null, Schema::Int])?),
2681 schema
2682 );
2683 Ok(())
2684 }
2685
2686 #[test]
2687 fn test_union_unsupported_schema() {
2688 let schema = Schema::parse_str(r#"["null", ["null", "int"], "string"]"#);
2689 assert!(schema.is_err());
2690 }
2691
2692 #[test]
2693 fn test_multi_union_schema() -> TestResult {
2694 let schema = Schema::parse_str(r#"["null", "int", "float", "string", "bytes"]"#);
2695 assert!(schema.is_ok());
2696 let schema = schema?;
2697 assert_eq!(SchemaKind::from(&schema), SchemaKind::Union);
2698 let union_schema = match schema {
2699 Schema::Union(u) => u,
2700 _ => unreachable!(),
2701 };
2702 assert_eq!(union_schema.variants().len(), 5);
2703 let mut variants = union_schema.variants().iter();
2704 assert_eq!(SchemaKind::from(variants.next().unwrap()), SchemaKind::Null);
2705 assert_eq!(SchemaKind::from(variants.next().unwrap()), SchemaKind::Int);
2706 assert_eq!(
2707 SchemaKind::from(variants.next().unwrap()),
2708 SchemaKind::Float
2709 );
2710 assert_eq!(
2711 SchemaKind::from(variants.next().unwrap()),
2712 SchemaKind::String
2713 );
2714 assert_eq!(
2715 SchemaKind::from(variants.next().unwrap()),
2716 SchemaKind::Bytes
2717 );
2718 assert_eq!(variants.next(), None);
2719
2720 Ok(())
2721 }
2722
2723 #[test]
2724 fn test_avro_3621_nullable_record_field() -> TestResult {
2725 let nullable_record_field = RecordField::builder()
2726 .name("next".to_string())
2727 .schema(Schema::Union(UnionSchema::new(vec![
2728 Schema::Null,
2729 Schema::Ref {
2730 name: Name {
2731 name: "LongList".to_owned(),
2732 namespace: None,
2733 },
2734 },
2735 ])?))
2736 .order(RecordFieldOrder::Ascending)
2737 .position(1)
2738 .build();
2739
2740 assert!(nullable_record_field.is_nullable());
2741
2742 let non_nullable_record_field = RecordField::builder()
2743 .name("next".to_string())
2744 .default(json!(2))
2745 .schema(Schema::Long)
2746 .order(RecordFieldOrder::Ascending)
2747 .position(1)
2748 .build();
2749
2750 assert!(!non_nullable_record_field.is_nullable());
2751 Ok(())
2752 }
2753
2754 #[test]
2756 fn test_union_of_records() -> TestResult {
2757 use std::iter::FromIterator;
2758
2759 let schema_str_a = r#"{
2761 "name": "A",
2762 "type": "record",
2763 "fields": [
2764 {"name": "field_one", "type": "float"}
2765 ]
2766 }"#;
2767
2768 let schema_str_b = r#"{
2769 "name": "B",
2770 "type": "record",
2771 "fields": [
2772 {"name": "field_one", "type": "float"}
2773 ]
2774 }"#;
2775
2776 let schema_str_c = r#"{
2778 "name": "C",
2779 "type": "record",
2780 "fields": [
2781 {"name": "field_one", "type": ["A", "B"]}
2782 ]
2783 }"#;
2784
2785 let schema_c = Schema::parse_list([schema_str_a, schema_str_b, schema_str_c])?
2786 .last()
2787 .unwrap()
2788 .clone();
2789
2790 let schema_c_expected = Schema::Record(
2791 RecordSchema::builder()
2792 .name(Name::new("C")?)
2793 .fields(vec![
2794 RecordField::builder()
2795 .name("field_one".to_string())
2796 .schema(Schema::Union(UnionSchema::new(vec![
2797 Schema::Ref {
2798 name: Name::new("A")?,
2799 },
2800 Schema::Ref {
2801 name: Name::new("B")?,
2802 },
2803 ])?))
2804 .build(),
2805 ])
2806 .lookup(BTreeMap::from_iter(vec![("field_one".to_string(), 0)]))
2807 .build(),
2808 );
2809
2810 assert_eq!(schema_c, schema_c_expected);
2811 Ok(())
2812 }
2813
2814 #[test]
2815 fn avro_rs_104_test_root_union_of_records() -> TestResult {
2816 let schema_str_a = r#"{
2818 "name": "A",
2819 "type": "record",
2820 "fields": [
2821 {"name": "field_one", "type": "float"}
2822 ]
2823 }"#;
2824
2825 let schema_str_b = r#"{
2826 "name": "B",
2827 "type": "record",
2828 "fields": [
2829 {"name": "field_one", "type": "float"}
2830 ]
2831 }"#;
2832
2833 let schema_str_c = r#"["A", "B"]"#;
2834
2835 let (schema_c, schemata) =
2836 Schema::parse_str_with_list(schema_str_c, [schema_str_a, schema_str_b])?;
2837
2838 let schema_a_expected = Schema::Record(RecordSchema {
2839 name: Name::new("A")?,
2840 aliases: None,
2841 doc: None,
2842 fields: vec![RecordField {
2843 name: "field_one".to_string(),
2844 doc: None,
2845 default: None,
2846 aliases: None,
2847 schema: Schema::Float,
2848 order: RecordFieldOrder::Ignore,
2849 position: 0,
2850 custom_attributes: Default::default(),
2851 }],
2852 lookup: BTreeMap::from_iter(vec![("field_one".to_string(), 0)]),
2853 attributes: Default::default(),
2854 });
2855
2856 let schema_b_expected = Schema::Record(RecordSchema {
2857 name: Name::new("B")?,
2858 aliases: None,
2859 doc: None,
2860 fields: vec![RecordField {
2861 name: "field_one".to_string(),
2862 doc: None,
2863 default: None,
2864 aliases: None,
2865 schema: Schema::Float,
2866 order: RecordFieldOrder::Ignore,
2867 position: 0,
2868 custom_attributes: Default::default(),
2869 }],
2870 lookup: BTreeMap::from_iter(vec![("field_one".to_string(), 0)]),
2871 attributes: Default::default(),
2872 });
2873
2874 let schema_c_expected = Schema::Union(UnionSchema::new(vec![
2875 Schema::Ref {
2876 name: Name::new("A")?,
2877 },
2878 Schema::Ref {
2879 name: Name::new("B")?,
2880 },
2881 ])?);
2882
2883 assert_eq!(schema_c, schema_c_expected);
2884 assert_eq!(schemata[0], schema_a_expected);
2885 assert_eq!(schemata[1], schema_b_expected);
2886
2887 Ok(())
2888 }
2889
2890 #[test]
2891 fn avro_rs_104_test_root_union_of_records_name_collision() -> TestResult {
2892 let schema_str_a1 = r#"{
2894 "name": "A",
2895 "type": "record",
2896 "fields": [
2897 {"name": "field_one", "type": "float"}
2898 ]
2899 }"#;
2900
2901 let schema_str_a2 = r#"{
2902 "name": "A",
2903 "type": "record",
2904 "fields": [
2905 {"name": "field_one", "type": "float"}
2906 ]
2907 }"#;
2908
2909 let schema_str_c = r#"["A", "A"]"#;
2910
2911 match Schema::parse_str_with_list(schema_str_c, [schema_str_a1, schema_str_a2]) {
2912 Ok(_) => unreachable!("Expected an error that the name is already defined"),
2913 Err(e) => assert_eq!(
2914 e.to_string(),
2915 "Two schemas with the same fullname were given: \"A\""
2916 ),
2917 }
2918
2919 Ok(())
2920 }
2921
2922 #[test]
2923 fn avro_rs_104_test_root_union_of_records_no_name() -> TestResult {
2924 let schema_str_a = r#"{
2925 "name": "A",
2926 "type": "record",
2927 "fields": [
2928 {"name": "field_one", "type": "float"}
2929 ]
2930 }"#;
2931
2932 let schema_str_b = r#"{
2934 "type": "record",
2935 "fields": [
2936 {"name": "field_one", "type": "float"}
2937 ]
2938 }"#;
2939
2940 let schema_str_c = r#"["A", "A"]"#;
2941
2942 match Schema::parse_str_with_list(schema_str_c, [schema_str_a, schema_str_b]) {
2943 Ok(_) => unreachable!("Expected an error that schema_str_b is missing a name field"),
2944 Err(e) => assert_eq!(e.to_string(), "No `name` field"),
2945 }
2946
2947 Ok(())
2948 }
2949
2950 #[test]
2951 fn avro_3584_test_recursion_records() -> TestResult {
2952 let schema_str_a = r#"{
2954 "name": "A",
2955 "type": "record",
2956 "fields": [ {"name": "field_one", "type": "B"} ]
2957 }"#;
2958
2959 let schema_str_b = r#"{
2960 "name": "B",
2961 "type": "record",
2962 "fields": [ {"name": "field_one", "type": "A"} ]
2963 }"#;
2964
2965 let list = Schema::parse_list([schema_str_a, schema_str_b])?;
2966
2967 let schema_a = list.first().unwrap().clone();
2968
2969 match schema_a {
2970 Schema::Record(RecordSchema { fields, .. }) => {
2971 let f1 = fields.first();
2972
2973 let ref_schema = Schema::Ref {
2974 name: Name::new("B")?,
2975 };
2976 assert_eq!(ref_schema, f1.unwrap().schema);
2977 }
2978 _ => panic!("Expected a record schema!"),
2979 }
2980
2981 Ok(())
2982 }
2983
2984 #[test]
2985 fn test_avro_3248_nullable_record() -> TestResult {
2986 use std::iter::FromIterator;
2987
2988 let schema_str_a = r#"{
2989 "name": "A",
2990 "type": "record",
2991 "fields": [
2992 {"name": "field_one", "type": "float"}
2993 ]
2994 }"#;
2995
2996 let schema_str_option_a = r#"{
2998 "name": "OptionA",
2999 "type": "record",
3000 "fields": [
3001 {"name": "field_one", "type": ["null", "A"], "default": null}
3002 ]
3003 }"#;
3004
3005 let schema_option_a = Schema::parse_list([schema_str_a, schema_str_option_a])?
3006 .last()
3007 .unwrap()
3008 .clone();
3009
3010 let schema_option_a_expected = Schema::Record(RecordSchema {
3011 name: Name::new("OptionA")?,
3012 aliases: None,
3013 doc: None,
3014 fields: vec![RecordField {
3015 name: "field_one".to_string(),
3016 doc: None,
3017 default: Some(Value::Null),
3018 aliases: None,
3019 schema: Schema::Union(UnionSchema::new(vec![
3020 Schema::Null,
3021 Schema::Ref {
3022 name: Name::new("A")?,
3023 },
3024 ])?),
3025 order: RecordFieldOrder::Ignore,
3026 position: 0,
3027 custom_attributes: Default::default(),
3028 }],
3029 lookup: BTreeMap::from_iter(vec![("field_one".to_string(), 0)]),
3030 attributes: Default::default(),
3031 });
3032
3033 assert_eq!(schema_option_a, schema_option_a_expected);
3034
3035 Ok(())
3036 }
3037
3038 #[test]
3039 fn test_record_schema() -> TestResult {
3040 let parsed = Schema::parse_str(
3041 r#"
3042 {
3043 "type": "record",
3044 "name": "test",
3045 "fields": [
3046 {"name": "a", "type": "long", "default": 42},
3047 {"name": "b", "type": "string"}
3048 ]
3049 }
3050 "#,
3051 )?;
3052
3053 let mut lookup = BTreeMap::new();
3054 lookup.insert("a".to_owned(), 0);
3055 lookup.insert("b".to_owned(), 1);
3056
3057 let expected = Schema::Record(RecordSchema {
3058 name: Name::new("test")?,
3059 aliases: None,
3060 doc: None,
3061 fields: vec![
3062 RecordField {
3063 name: "a".to_string(),
3064 doc: None,
3065 default: Some(Value::Number(42i64.into())),
3066 aliases: None,
3067 schema: Schema::Long,
3068 order: RecordFieldOrder::Ascending,
3069 position: 0,
3070 custom_attributes: Default::default(),
3071 },
3072 RecordField {
3073 name: "b".to_string(),
3074 doc: None,
3075 default: None,
3076 aliases: None,
3077 schema: Schema::String,
3078 order: RecordFieldOrder::Ascending,
3079 position: 1,
3080 custom_attributes: Default::default(),
3081 },
3082 ],
3083 lookup,
3084 attributes: Default::default(),
3085 });
3086
3087 assert_eq!(parsed, expected);
3088
3089 Ok(())
3090 }
3091
3092 #[test]
3093 fn test_avro_3302_record_schema_with_currently_parsing_schema() -> TestResult {
3094 let schema = Schema::parse_str(
3095 r#"
3096 {
3097 "type": "record",
3098 "name": "test",
3099 "fields": [{
3100 "name": "recordField",
3101 "type": {
3102 "type": "record",
3103 "name": "Node",
3104 "fields": [
3105 {"name": "label", "type": "string"},
3106 {"name": "children", "type": {"type": "array", "items": "Node"}}
3107 ]
3108 }
3109 }]
3110 }
3111 "#,
3112 )?;
3113
3114 let mut lookup = BTreeMap::new();
3115 lookup.insert("recordField".to_owned(), 0);
3116
3117 let mut node_lookup = BTreeMap::new();
3118 node_lookup.insert("children".to_owned(), 1);
3119 node_lookup.insert("label".to_owned(), 0);
3120
3121 let expected = Schema::Record(RecordSchema {
3122 name: Name::new("test")?,
3123 aliases: None,
3124 doc: None,
3125 fields: vec![RecordField {
3126 name: "recordField".to_string(),
3127 doc: None,
3128 default: None,
3129 aliases: None,
3130 schema: Schema::Record(RecordSchema {
3131 name: Name::new("Node")?,
3132 aliases: None,
3133 doc: None,
3134 fields: vec![
3135 RecordField {
3136 name: "label".to_string(),
3137 doc: None,
3138 default: None,
3139 aliases: None,
3140 schema: Schema::String,
3141 order: RecordFieldOrder::Ascending,
3142 position: 0,
3143 custom_attributes: Default::default(),
3144 },
3145 RecordField {
3146 name: "children".to_string(),
3147 doc: None,
3148 default: None,
3149 aliases: None,
3150 schema: Schema::array(Schema::Ref {
3151 name: Name::new("Node")?,
3152 }),
3153 order: RecordFieldOrder::Ascending,
3154 position: 1,
3155 custom_attributes: Default::default(),
3156 },
3157 ],
3158 lookup: node_lookup,
3159 attributes: Default::default(),
3160 }),
3161 order: RecordFieldOrder::Ascending,
3162 position: 0,
3163 custom_attributes: Default::default(),
3164 }],
3165 lookup,
3166 attributes: Default::default(),
3167 });
3168 assert_eq!(schema, expected);
3169
3170 let canonical_form = &schema.canonical_form();
3171 let expected = r#"{"name":"test","type":"record","fields":[{"name":"recordField","type":{"name":"Node","type":"record","fields":[{"name":"label","type":"string"},{"name":"children","type":{"type":"array","items":"Node"}}]}}]}"#;
3172 assert_eq!(canonical_form, &expected);
3173
3174 Ok(())
3175 }
3176
3177 #[test]
3179 fn test_parsing_of_recursive_type_enum() -> TestResult {
3180 let schema = r#"
3181 {
3182 "type": "record",
3183 "name": "User",
3184 "namespace": "office",
3185 "fields": [
3186 {
3187 "name": "details",
3188 "type": [
3189 {
3190 "type": "record",
3191 "name": "Employee",
3192 "fields": [
3193 {
3194 "name": "gender",
3195 "type": {
3196 "type": "enum",
3197 "name": "Gender",
3198 "symbols": [
3199 "male",
3200 "female"
3201 ]
3202 },
3203 "default": "female"
3204 }
3205 ]
3206 },
3207 {
3208 "type": "record",
3209 "name": "Manager",
3210 "fields": [
3211 {
3212 "name": "gender",
3213 "type": "Gender"
3214 }
3215 ]
3216 }
3217 ]
3218 }
3219 ]
3220 }
3221 "#;
3222
3223 let schema = Schema::parse_str(schema)?;
3224 let schema_str = schema.canonical_form();
3225 let expected = r#"{"name":"office.User","type":"record","fields":[{"name":"details","type":[{"name":"office.Employee","type":"record","fields":[{"name":"gender","type":{"name":"office.Gender","type":"enum","symbols":["male","female"]}}]},{"name":"office.Manager","type":"record","fields":[{"name":"gender","type":"office.Gender"}]}]}]}"#;
3226 assert_eq!(schema_str, expected);
3227
3228 Ok(())
3229 }
3230
3231 #[test]
3232 fn test_parsing_of_recursive_type_fixed() -> TestResult {
3233 let schema = r#"
3234 {
3235 "type": "record",
3236 "name": "User",
3237 "namespace": "office",
3238 "fields": [
3239 {
3240 "name": "details",
3241 "type": [
3242 {
3243 "type": "record",
3244 "name": "Employee",
3245 "fields": [
3246 {
3247 "name": "id",
3248 "type": {
3249 "type": "fixed",
3250 "name": "EmployeeId",
3251 "size": 16
3252 },
3253 "default": "female"
3254 }
3255 ]
3256 },
3257 {
3258 "type": "record",
3259 "name": "Manager",
3260 "fields": [
3261 {
3262 "name": "id",
3263 "type": "EmployeeId"
3264 }
3265 ]
3266 }
3267 ]
3268 }
3269 ]
3270 }
3271 "#;
3272
3273 let schema = Schema::parse_str(schema)?;
3274 let schema_str = schema.canonical_form();
3275 let expected = r#"{"name":"office.User","type":"record","fields":[{"name":"details","type":[{"name":"office.Employee","type":"record","fields":[{"name":"id","type":{"name":"office.EmployeeId","type":"fixed","size":16}}]},{"name":"office.Manager","type":"record","fields":[{"name":"id","type":"office.EmployeeId"}]}]}]}"#;
3276 assert_eq!(schema_str, expected);
3277
3278 Ok(())
3279 }
3280
3281 #[test]
3282 fn test_avro_3302_record_schema_with_currently_parsing_schema_aliases() -> TestResult {
3283 let schema = Schema::parse_str(
3284 r#"
3285 {
3286 "type": "record",
3287 "name": "LongList",
3288 "aliases": ["LinkedLongs"],
3289 "fields" : [
3290 {"name": "value", "type": "long"},
3291 {"name": "next", "type": ["null", "LinkedLongs"]}
3292 ]
3293 }
3294 "#,
3295 )?;
3296
3297 let mut lookup = BTreeMap::new();
3298 lookup.insert("value".to_owned(), 0);
3299 lookup.insert("next".to_owned(), 1);
3300
3301 let expected = Schema::Record(RecordSchema {
3302 name: Name {
3303 name: "LongList".to_owned(),
3304 namespace: None,
3305 },
3306 aliases: Some(vec![Alias::new("LinkedLongs").unwrap()]),
3307 doc: None,
3308 fields: vec![
3309 RecordField {
3310 name: "value".to_string(),
3311 doc: None,
3312 default: None,
3313 aliases: None,
3314 schema: Schema::Long,
3315 order: RecordFieldOrder::Ascending,
3316 position: 0,
3317 custom_attributes: Default::default(),
3318 },
3319 RecordField {
3320 name: "next".to_string(),
3321 doc: None,
3322 default: None,
3323 aliases: None,
3324 schema: Schema::Union(UnionSchema::new(vec![
3325 Schema::Null,
3326 Schema::Ref {
3327 name: Name {
3328 name: "LongList".to_owned(),
3329 namespace: None,
3330 },
3331 },
3332 ])?),
3333 order: RecordFieldOrder::Ascending,
3334 position: 1,
3335 custom_attributes: Default::default(),
3336 },
3337 ],
3338 lookup,
3339 attributes: Default::default(),
3340 });
3341 assert_eq!(schema, expected);
3342
3343 let canonical_form = &schema.canonical_form();
3344 let expected = r#"{"name":"LongList","type":"record","fields":[{"name":"value","type":"long"},{"name":"next","type":["null","LongList"]}]}"#;
3345 assert_eq!(canonical_form, &expected);
3346
3347 Ok(())
3348 }
3349
3350 #[test]
3351 fn test_avro_3370_record_schema_with_currently_parsing_schema_named_record() -> TestResult {
3352 let schema = Schema::parse_str(
3353 r#"
3354 {
3355 "type" : "record",
3356 "name" : "record",
3357 "fields" : [
3358 { "name" : "value", "type" : "long" },
3359 { "name" : "next", "type" : "record" }
3360 ]
3361 }
3362 "#,
3363 )?;
3364
3365 let mut lookup = BTreeMap::new();
3366 lookup.insert("value".to_owned(), 0);
3367 lookup.insert("next".to_owned(), 1);
3368
3369 let expected = Schema::Record(RecordSchema {
3370 name: Name {
3371 name: "record".to_owned(),
3372 namespace: None,
3373 },
3374 aliases: None,
3375 doc: None,
3376 fields: vec![
3377 RecordField {
3378 name: "value".to_string(),
3379 doc: None,
3380 default: None,
3381 aliases: None,
3382 schema: Schema::Long,
3383 order: RecordFieldOrder::Ascending,
3384 position: 0,
3385 custom_attributes: Default::default(),
3386 },
3387 RecordField {
3388 name: "next".to_string(),
3389 doc: None,
3390 default: None,
3391 aliases: None,
3392 schema: Schema::Ref {
3393 name: Name {
3394 name: "record".to_owned(),
3395 namespace: None,
3396 },
3397 },
3398 order: RecordFieldOrder::Ascending,
3399 position: 1,
3400 custom_attributes: Default::default(),
3401 },
3402 ],
3403 lookup,
3404 attributes: Default::default(),
3405 });
3406 assert_eq!(schema, expected);
3407
3408 let canonical_form = &schema.canonical_form();
3409 let expected = r#"{"name":"record","type":"record","fields":[{"name":"value","type":"long"},{"name":"next","type":"record"}]}"#;
3410 assert_eq!(canonical_form, &expected);
3411
3412 Ok(())
3413 }
3414
3415 #[test]
3416 fn test_avro_3370_record_schema_with_currently_parsing_schema_named_enum() -> TestResult {
3417 let schema = Schema::parse_str(
3418 r#"
3419 {
3420 "type" : "record",
3421 "name" : "record",
3422 "fields" : [
3423 {
3424 "type" : "enum",
3425 "name" : "enum",
3426 "symbols": ["one", "two", "three"]
3427 },
3428 { "name" : "next", "type" : "enum" }
3429 ]
3430 }
3431 "#,
3432 )?;
3433
3434 let mut lookup = BTreeMap::new();
3435 lookup.insert("enum".to_owned(), 0);
3436 lookup.insert("next".to_owned(), 1);
3437
3438 let expected = Schema::Record(RecordSchema {
3439 name: Name {
3440 name: "record".to_owned(),
3441 namespace: None,
3442 },
3443 aliases: None,
3444 doc: None,
3445 fields: vec![
3446 RecordField {
3447 name: "enum".to_string(),
3448 doc: None,
3449 default: None,
3450 aliases: None,
3451 schema: Schema::Enum(
3452 EnumSchema::builder()
3453 .name(Name::new("enum")?)
3454 .symbols(vec![
3455 "one".to_string(),
3456 "two".to_string(),
3457 "three".to_string(),
3458 ])
3459 .build(),
3460 ),
3461 order: RecordFieldOrder::Ascending,
3462 position: 0,
3463 custom_attributes: Default::default(),
3464 },
3465 RecordField {
3466 name: "next".to_string(),
3467 doc: None,
3468 default: None,
3469 aliases: None,
3470 schema: Schema::Enum(EnumSchema {
3471 name: Name {
3472 name: "enum".to_owned(),
3473 namespace: None,
3474 },
3475 aliases: None,
3476 doc: None,
3477 symbols: vec!["one".to_string(), "two".to_string(), "three".to_string()],
3478 default: None,
3479 attributes: Default::default(),
3480 }),
3481 order: RecordFieldOrder::Ascending,
3482 position: 1,
3483 custom_attributes: Default::default(),
3484 },
3485 ],
3486 lookup,
3487 attributes: Default::default(),
3488 });
3489 assert_eq!(schema, expected);
3490
3491 let canonical_form = &schema.canonical_form();
3492 let expected = r#"{"name":"record","type":"record","fields":[{"name":"enum","type":{"name":"enum","type":"enum","symbols":["one","two","three"]}},{"name":"next","type":"enum"}]}"#;
3493 assert_eq!(canonical_form, &expected);
3494
3495 Ok(())
3496 }
3497
3498 #[test]
3499 fn test_avro_3370_record_schema_with_currently_parsing_schema_named_fixed() -> TestResult {
3500 let schema = Schema::parse_str(
3501 r#"
3502 {
3503 "type" : "record",
3504 "name" : "record",
3505 "fields" : [
3506 {
3507 "type" : "fixed",
3508 "name" : "fixed",
3509 "size": 456
3510 },
3511 { "name" : "next", "type" : "fixed" }
3512 ]
3513 }
3514 "#,
3515 )?;
3516
3517 let mut lookup = BTreeMap::new();
3518 lookup.insert("fixed".to_owned(), 0);
3519 lookup.insert("next".to_owned(), 1);
3520
3521 let expected = Schema::Record(RecordSchema {
3522 name: Name {
3523 name: "record".to_owned(),
3524 namespace: None,
3525 },
3526 aliases: None,
3527 doc: None,
3528 fields: vec![
3529 RecordField {
3530 name: "fixed".to_string(),
3531 doc: None,
3532 default: None,
3533 aliases: None,
3534 schema: Schema::Fixed(FixedSchema {
3535 name: Name {
3536 name: "fixed".to_owned(),
3537 namespace: None,
3538 },
3539 aliases: None,
3540 doc: None,
3541 size: 456,
3542 default: None,
3543 attributes: Default::default(),
3544 }),
3545 order: RecordFieldOrder::Ascending,
3546 position: 0,
3547 custom_attributes: Default::default(),
3548 },
3549 RecordField {
3550 name: "next".to_string(),
3551 doc: None,
3552 default: None,
3553 aliases: None,
3554 schema: Schema::Fixed(FixedSchema {
3555 name: Name {
3556 name: "fixed".to_owned(),
3557 namespace: None,
3558 },
3559 aliases: None,
3560 doc: None,
3561 size: 456,
3562 default: None,
3563 attributes: Default::default(),
3564 }),
3565 order: RecordFieldOrder::Ascending,
3566 position: 1,
3567 custom_attributes: Default::default(),
3568 },
3569 ],
3570 lookup,
3571 attributes: Default::default(),
3572 });
3573 assert_eq!(schema, expected);
3574
3575 let canonical_form = &schema.canonical_form();
3576 let expected = r#"{"name":"record","type":"record","fields":[{"name":"fixed","type":{"name":"fixed","type":"fixed","size":456}},{"name":"next","type":"fixed"}]}"#;
3577 assert_eq!(canonical_form, &expected);
3578
3579 Ok(())
3580 }
3581
3582 #[test]
3583 fn test_enum_schema() -> TestResult {
3584 let schema = Schema::parse_str(
3585 r#"{"type": "enum", "name": "Suit", "symbols": ["diamonds", "spades", "clubs", "hearts"]}"#,
3586 )?;
3587
3588 let expected = Schema::Enum(EnumSchema {
3589 name: Name::new("Suit")?,
3590 aliases: None,
3591 doc: None,
3592 symbols: vec![
3593 "diamonds".to_owned(),
3594 "spades".to_owned(),
3595 "clubs".to_owned(),
3596 "hearts".to_owned(),
3597 ],
3598 default: None,
3599 attributes: Default::default(),
3600 });
3601
3602 assert_eq!(expected, schema);
3603
3604 Ok(())
3605 }
3606
3607 #[test]
3608 fn test_enum_schema_duplicate() -> TestResult {
3609 let schema = Schema::parse_str(
3611 r#"{"type": "enum", "name": "Suit", "symbols": ["diamonds", "spades", "clubs", "diamonds"]}"#,
3612 );
3613 assert!(schema.is_err());
3614
3615 Ok(())
3616 }
3617
3618 #[test]
3619 fn test_enum_schema_name() -> TestResult {
3620 let schema = Schema::parse_str(
3622 r#"{"type": "enum", "name": "Enum", "symbols": ["0000", "variant"]}"#,
3623 );
3624 assert!(schema.is_err());
3625
3626 Ok(())
3627 }
3628
3629 #[test]
3630 fn test_fixed_schema() -> TestResult {
3631 let schema = Schema::parse_str(r#"{"type": "fixed", "name": "test", "size": 16}"#)?;
3632
3633 let expected = Schema::Fixed(FixedSchema {
3634 name: Name::new("test")?,
3635 aliases: None,
3636 doc: None,
3637 size: 16_usize,
3638 default: None,
3639 attributes: Default::default(),
3640 });
3641
3642 assert_eq!(expected, schema);
3643
3644 Ok(())
3645 }
3646
3647 #[test]
3648 fn test_fixed_schema_with_documentation() -> TestResult {
3649 let schema = Schema::parse_str(
3650 r#"{"type": "fixed", "name": "test", "size": 16, "doc": "FixedSchema documentation"}"#,
3651 )?;
3652
3653 let expected = Schema::Fixed(FixedSchema {
3654 name: Name::new("test")?,
3655 aliases: None,
3656 doc: Some(String::from("FixedSchema documentation")),
3657 size: 16_usize,
3658 default: None,
3659 attributes: Default::default(),
3660 });
3661
3662 assert_eq!(expected, schema);
3663
3664 Ok(())
3665 }
3666
3667 #[test]
3668 fn test_no_documentation() -> TestResult {
3669 let schema = Schema::parse_str(
3670 r#"{"type": "enum", "name": "Coin", "symbols": ["heads", "tails"]}"#,
3671 )?;
3672
3673 let doc = match schema {
3674 Schema::Enum(EnumSchema { doc, .. }) => doc,
3675 _ => unreachable!(),
3676 };
3677
3678 assert!(doc.is_none());
3679
3680 Ok(())
3681 }
3682
3683 #[test]
3684 fn test_documentation() -> TestResult {
3685 let schema = Schema::parse_str(
3686 r#"{"type": "enum", "name": "Coin", "doc": "Some documentation", "symbols": ["heads", "tails"]}"#,
3687 )?;
3688
3689 let doc = match schema {
3690 Schema::Enum(EnumSchema { doc, .. }) => doc,
3691 _ => None,
3692 };
3693
3694 assert_eq!("Some documentation".to_owned(), doc.unwrap());
3695
3696 Ok(())
3697 }
3698
3699 #[test]
3702 fn test_schema_is_send() {
3703 fn send<S: Send>(_s: S) {}
3704
3705 let schema = Schema::Null;
3706 send(schema);
3707 }
3708
3709 #[test]
3710 fn test_schema_is_sync() {
3711 fn sync<S: Sync>(_s: S) {}
3712
3713 let schema = Schema::Null;
3714 sync(&schema);
3715 sync(schema);
3716 }
3717
3718 #[test]
3719 fn test_schema_fingerprint() -> TestResult {
3720 use crate::rabin::Rabin;
3721 use md5::Md5;
3722 use sha2::Sha256;
3723
3724 let raw_schema = r#"
3725 {
3726 "type": "record",
3727 "name": "test",
3728 "fields": [
3729 {"name": "a", "type": "long", "default": 42},
3730 {"name": "b", "type": "string"},
3731 {"name": "c", "type": "long", "logicalType": "timestamp-micros"}
3732 ]
3733 }
3734"#;
3735
3736 let schema = Schema::parse_str(raw_schema)?;
3737 assert_eq!(
3738 "7eb3b28d73dfc99bdd9af1848298b40804a2f8ad5d2642be2ecc2ad34842b987",
3739 format!("{}", schema.fingerprint::<Sha256>())
3740 );
3741
3742 assert_eq!(
3743 "cb11615e412ee5d872620d8df78ff6ae",
3744 format!("{}", schema.fingerprint::<Md5>())
3745 );
3746 assert_eq!(
3747 "92f2ccef718c6754",
3748 format!("{}", schema.fingerprint::<Rabin>())
3749 );
3750
3751 Ok(())
3752 }
3753
3754 #[test]
3755 fn test_logical_types() -> TestResult {
3756 let schema = Schema::parse_str(r#"{"type": "int", "logicalType": "date"}"#)?;
3757 assert_eq!(schema, Schema::Date);
3758
3759 let schema = Schema::parse_str(r#"{"type": "long", "logicalType": "timestamp-micros"}"#)?;
3760 assert_eq!(schema, Schema::TimestampMicros);
3761
3762 Ok(())
3763 }
3764
3765 #[test]
3766 fn test_nullable_logical_type() -> TestResult {
3767 let schema = Schema::parse_str(
3768 r#"{"type": ["null", {"type": "long", "logicalType": "timestamp-micros"}]}"#,
3769 )?;
3770 assert_eq!(
3771 schema,
3772 Schema::Union(UnionSchema::new(vec![
3773 Schema::Null,
3774 Schema::TimestampMicros,
3775 ])?)
3776 );
3777
3778 Ok(())
3779 }
3780
3781 #[test]
3782 fn record_field_order_from_str() -> TestResult {
3783 use std::str::FromStr;
3784
3785 assert_eq!(
3786 RecordFieldOrder::from_str("ascending").unwrap(),
3787 RecordFieldOrder::Ascending
3788 );
3789 assert_eq!(
3790 RecordFieldOrder::from_str("descending").unwrap(),
3791 RecordFieldOrder::Descending
3792 );
3793 assert_eq!(
3794 RecordFieldOrder::from_str("ignore").unwrap(),
3795 RecordFieldOrder::Ignore
3796 );
3797 assert!(RecordFieldOrder::from_str("not an ordering").is_err());
3798
3799 Ok(())
3800 }
3801
3802 #[test]
3803 fn test_avro_3374_preserve_namespace_for_primitive() -> TestResult {
3804 let schema = Schema::parse_str(
3805 r#"
3806 {
3807 "type" : "record",
3808 "name" : "ns.int",
3809 "fields" : [
3810 {"name" : "value", "type" : "int"},
3811 {"name" : "next", "type" : [ "null", "ns.int" ]}
3812 ]
3813 }
3814 "#,
3815 )?;
3816
3817 let json = schema.canonical_form();
3818 assert_eq!(
3819 json,
3820 r#"{"name":"ns.int","type":"record","fields":[{"name":"value","type":"int"},{"name":"next","type":["null","ns.int"]}]}"#
3821 );
3822
3823 Ok(())
3824 }
3825
3826 #[test]
3827 fn test_avro_3433_preserve_schema_refs_in_json() -> TestResult {
3828 let schema = r#"
3829 {
3830 "name": "test.test",
3831 "type": "record",
3832 "fields": [
3833 {
3834 "name": "bar",
3835 "type": { "name": "test.foo", "type": "record", "fields": [{ "name": "id", "type": "long" }] }
3836 },
3837 { "name": "baz", "type": "test.foo" }
3838 ]
3839 }
3840 "#;
3841
3842 let schema = Schema::parse_str(schema)?;
3843
3844 let expected = r#"{"name":"test.test","type":"record","fields":[{"name":"bar","type":{"name":"test.foo","type":"record","fields":[{"name":"id","type":"long"}]}},{"name":"baz","type":"test.foo"}]}"#;
3845 assert_eq!(schema.canonical_form(), expected);
3846
3847 Ok(())
3848 }
3849
3850 #[test]
3851 fn test_read_namespace_from_name() -> TestResult {
3852 let schema = r#"
3853 {
3854 "name": "space.name",
3855 "type": "record",
3856 "fields": [
3857 {
3858 "name": "num",
3859 "type": "int"
3860 }
3861 ]
3862 }
3863 "#;
3864
3865 let schema = Schema::parse_str(schema)?;
3866 if let Schema::Record(RecordSchema { name, .. }) = schema {
3867 assert_eq!(name.name, "name");
3868 assert_eq!(name.namespace, Some("space".to_string()));
3869 } else {
3870 panic!("Expected a record schema!");
3871 }
3872
3873 Ok(())
3874 }
3875
3876 #[test]
3877 fn test_namespace_from_name_has_priority_over_from_field() -> TestResult {
3878 let schema = r#"
3879 {
3880 "name": "space1.name",
3881 "namespace": "space2",
3882 "type": "record",
3883 "fields": [
3884 {
3885 "name": "num",
3886 "type": "int"
3887 }
3888 ]
3889 }
3890 "#;
3891
3892 let schema = Schema::parse_str(schema)?;
3893 if let Schema::Record(RecordSchema { name, .. }) = schema {
3894 assert_eq!(name.namespace, Some("space1".to_string()));
3895 } else {
3896 panic!("Expected a record schema!");
3897 }
3898
3899 Ok(())
3900 }
3901
3902 #[test]
3903 fn test_namespace_from_field() -> TestResult {
3904 let schema = r#"
3905 {
3906 "name": "name",
3907 "namespace": "space2",
3908 "type": "record",
3909 "fields": [
3910 {
3911 "name": "num",
3912 "type": "int"
3913 }
3914 ]
3915 }
3916 "#;
3917
3918 let schema = Schema::parse_str(schema)?;
3919 if let Schema::Record(RecordSchema { name, .. }) = schema {
3920 assert_eq!(name.namespace, Some("space2".to_string()));
3921 } else {
3922 panic!("Expected a record schema!");
3923 }
3924
3925 Ok(())
3926 }
3927
3928 #[test]
3929 fn test_namespace_from_name_with_empty_value() -> TestResult {
3931 let name = Name::new(".name")?;
3932 assert_eq!(name.name, "name");
3933 assert_eq!(name.namespace, None);
3934
3935 Ok(())
3936 }
3937
3938 #[test]
3939 fn test_name_with_whitespace_value() {
3941 match Name::new(" ").map_err(Error::into_details) {
3942 Err(Details::InvalidSchemaName(_, _)) => {}
3943 _ => panic!("Expected an Details::InvalidSchemaName!"),
3944 }
3945 }
3946
3947 #[test]
3948 fn test_name_with_no_name_part() {
3950 match Name::new("space.").map_err(Error::into_details) {
3951 Err(Details::InvalidSchemaName(_, _)) => {}
3952 _ => panic!("Expected an Details::InvalidSchemaName!"),
3953 }
3954 }
3955
3956 #[test]
3957 fn avro_3448_test_proper_resolution_inner_record_inherited_namespace() -> TestResult {
3958 let schema = r#"
3959 {
3960 "name": "record_name",
3961 "namespace": "space",
3962 "type": "record",
3963 "fields": [
3964 {
3965 "name": "outer_field_1",
3966 "type": [
3967 "null",
3968 {
3969 "type":"record",
3970 "name":"inner_record_name",
3971 "fields":[
3972 {
3973 "name":"inner_field_1",
3974 "type":"double"
3975 }
3976 ]
3977 }
3978 ]
3979 },
3980 {
3981 "name": "outer_field_2",
3982 "type" : "inner_record_name"
3983 }
3984 ]
3985 }
3986 "#;
3987 let schema = Schema::parse_str(schema)?;
3988 let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
3989 assert_eq!(rs.get_names().len(), 2);
3990 for s in &["space.record_name", "space.inner_record_name"] {
3991 assert!(rs.get_names().contains_key(&Name::new(s)?));
3992 }
3993
3994 Ok(())
3995 }
3996
3997 #[test]
3998 fn avro_3448_test_proper_resolution_inner_record_qualified_namespace() -> TestResult {
3999 let schema = r#"
4000 {
4001 "name": "record_name",
4002 "namespace": "space",
4003 "type": "record",
4004 "fields": [
4005 {
4006 "name": "outer_field_1",
4007 "type": [
4008 "null",
4009 {
4010 "type":"record",
4011 "name":"inner_record_name",
4012 "fields":[
4013 {
4014 "name":"inner_field_1",
4015 "type":"double"
4016 }
4017 ]
4018 }
4019 ]
4020 },
4021 {
4022 "name": "outer_field_2",
4023 "type" : "space.inner_record_name"
4024 }
4025 ]
4026 }
4027 "#;
4028 let schema = Schema::parse_str(schema)?;
4029 let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
4030 assert_eq!(rs.get_names().len(), 2);
4031 for s in &["space.record_name", "space.inner_record_name"] {
4032 assert!(rs.get_names().contains_key(&Name::new(s)?));
4033 }
4034
4035 Ok(())
4036 }
4037
4038 #[test]
4039 fn avro_3448_test_proper_resolution_inner_enum_inherited_namespace() -> TestResult {
4040 let schema = r#"
4041 {
4042 "name": "record_name",
4043 "namespace": "space",
4044 "type": "record",
4045 "fields": [
4046 {
4047 "name": "outer_field_1",
4048 "type": [
4049 "null",
4050 {
4051 "type":"enum",
4052 "name":"inner_enum_name",
4053 "symbols":["Extensive","Testing"]
4054 }
4055 ]
4056 },
4057 {
4058 "name": "outer_field_2",
4059 "type" : "inner_enum_name"
4060 }
4061 ]
4062 }
4063 "#;
4064 let schema = Schema::parse_str(schema)?;
4065 let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
4066 assert_eq!(rs.get_names().len(), 2);
4067 for s in &["space.record_name", "space.inner_enum_name"] {
4068 assert!(rs.get_names().contains_key(&Name::new(s)?));
4069 }
4070
4071 Ok(())
4072 }
4073
4074 #[test]
4075 fn avro_3448_test_proper_resolution_inner_enum_qualified_namespace() -> TestResult {
4076 let schema = r#"
4077 {
4078 "name": "record_name",
4079 "namespace": "space",
4080 "type": "record",
4081 "fields": [
4082 {
4083 "name": "outer_field_1",
4084 "type": [
4085 "null",
4086 {
4087 "type":"enum",
4088 "name":"inner_enum_name",
4089 "symbols":["Extensive","Testing"]
4090 }
4091 ]
4092 },
4093 {
4094 "name": "outer_field_2",
4095 "type" : "space.inner_enum_name"
4096 }
4097 ]
4098 }
4099 "#;
4100 let schema = Schema::parse_str(schema)?;
4101 let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
4102 assert_eq!(rs.get_names().len(), 2);
4103 for s in &["space.record_name", "space.inner_enum_name"] {
4104 assert!(rs.get_names().contains_key(&Name::new(s)?));
4105 }
4106
4107 Ok(())
4108 }
4109
4110 #[test]
4111 fn avro_3448_test_proper_resolution_inner_fixed_inherited_namespace() -> TestResult {
4112 let schema = r#"
4113 {
4114 "name": "record_name",
4115 "namespace": "space",
4116 "type": "record",
4117 "fields": [
4118 {
4119 "name": "outer_field_1",
4120 "type": [
4121 "null",
4122 {
4123 "type":"fixed",
4124 "name":"inner_fixed_name",
4125 "size": 16
4126 }
4127 ]
4128 },
4129 {
4130 "name": "outer_field_2",
4131 "type" : "inner_fixed_name"
4132 }
4133 ]
4134 }
4135 "#;
4136 let schema = Schema::parse_str(schema)?;
4137 let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
4138 assert_eq!(rs.get_names().len(), 2);
4139 for s in &["space.record_name", "space.inner_fixed_name"] {
4140 assert!(rs.get_names().contains_key(&Name::new(s)?));
4141 }
4142
4143 Ok(())
4144 }
4145
4146 #[test]
4147 fn avro_3448_test_proper_resolution_inner_fixed_qualified_namespace() -> TestResult {
4148 let schema = r#"
4149 {
4150 "name": "record_name",
4151 "namespace": "space",
4152 "type": "record",
4153 "fields": [
4154 {
4155 "name": "outer_field_1",
4156 "type": [
4157 "null",
4158 {
4159 "type":"fixed",
4160 "name":"inner_fixed_name",
4161 "size": 16
4162 }
4163 ]
4164 },
4165 {
4166 "name": "outer_field_2",
4167 "type" : "space.inner_fixed_name"
4168 }
4169 ]
4170 }
4171 "#;
4172 let schema = Schema::parse_str(schema)?;
4173 let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
4174 assert_eq!(rs.get_names().len(), 2);
4175 for s in &["space.record_name", "space.inner_fixed_name"] {
4176 assert!(rs.get_names().contains_key(&Name::new(s)?));
4177 }
4178
4179 Ok(())
4180 }
4181
4182 #[test]
4183 fn avro_3448_test_proper_resolution_inner_record_inner_namespace() -> TestResult {
4184 let schema = r#"
4185 {
4186 "name": "record_name",
4187 "namespace": "space",
4188 "type": "record",
4189 "fields": [
4190 {
4191 "name": "outer_field_1",
4192 "type": [
4193 "null",
4194 {
4195 "type":"record",
4196 "name":"inner_record_name",
4197 "namespace":"inner_space",
4198 "fields":[
4199 {
4200 "name":"inner_field_1",
4201 "type":"double"
4202 }
4203 ]
4204 }
4205 ]
4206 },
4207 {
4208 "name": "outer_field_2",
4209 "type" : "inner_space.inner_record_name"
4210 }
4211 ]
4212 }
4213 "#;
4214 let schema = Schema::parse_str(schema)?;
4215 let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
4216 assert_eq!(rs.get_names().len(), 2);
4217 for s in &["space.record_name", "inner_space.inner_record_name"] {
4218 assert!(rs.get_names().contains_key(&Name::new(s)?));
4219 }
4220
4221 Ok(())
4222 }
4223
4224 #[test]
4225 fn avro_3448_test_proper_resolution_inner_enum_inner_namespace() -> TestResult {
4226 let schema = r#"
4227 {
4228 "name": "record_name",
4229 "namespace": "space",
4230 "type": "record",
4231 "fields": [
4232 {
4233 "name": "outer_field_1",
4234 "type": [
4235 "null",
4236 {
4237 "type":"enum",
4238 "name":"inner_enum_name",
4239 "namespace": "inner_space",
4240 "symbols":["Extensive","Testing"]
4241 }
4242 ]
4243 },
4244 {
4245 "name": "outer_field_2",
4246 "type" : "inner_space.inner_enum_name"
4247 }
4248 ]
4249 }
4250 "#;
4251 let schema = Schema::parse_str(schema)?;
4252 let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
4253 assert_eq!(rs.get_names().len(), 2);
4254 for s in &["space.record_name", "inner_space.inner_enum_name"] {
4255 assert!(rs.get_names().contains_key(&Name::new(s)?));
4256 }
4257
4258 Ok(())
4259 }
4260
4261 #[test]
4262 fn avro_3448_test_proper_resolution_inner_fixed_inner_namespace() -> TestResult {
4263 let schema = r#"
4264 {
4265 "name": "record_name",
4266 "namespace": "space",
4267 "type": "record",
4268 "fields": [
4269 {
4270 "name": "outer_field_1",
4271 "type": [
4272 "null",
4273 {
4274 "type":"fixed",
4275 "name":"inner_fixed_name",
4276 "namespace": "inner_space",
4277 "size": 16
4278 }
4279 ]
4280 },
4281 {
4282 "name": "outer_field_2",
4283 "type" : "inner_space.inner_fixed_name"
4284 }
4285 ]
4286 }
4287 "#;
4288 let schema = Schema::parse_str(schema)?;
4289 let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
4290 assert_eq!(rs.get_names().len(), 2);
4291 for s in &["space.record_name", "inner_space.inner_fixed_name"] {
4292 assert!(rs.get_names().contains_key(&Name::new(s)?));
4293 }
4294
4295 Ok(())
4296 }
4297
4298 #[test]
4299 fn avro_3448_test_proper_multi_level_resolution_inner_record_outer_namespace() -> TestResult {
4300 let schema = r#"
4301 {
4302 "name": "record_name",
4303 "namespace": "space",
4304 "type": "record",
4305 "fields": [
4306 {
4307 "name": "outer_field_1",
4308 "type": [
4309 "null",
4310 {
4311 "type":"record",
4312 "name":"middle_record_name",
4313 "fields":[
4314 {
4315 "name":"middle_field_1",
4316 "type":[
4317 "null",
4318 {
4319 "type":"record",
4320 "name":"inner_record_name",
4321 "fields":[
4322 {
4323 "name":"inner_field_1",
4324 "type":"double"
4325 }
4326 ]
4327 }
4328 ]
4329 }
4330 ]
4331 }
4332 ]
4333 },
4334 {
4335 "name": "outer_field_2",
4336 "type" : "space.inner_record_name"
4337 }
4338 ]
4339 }
4340 "#;
4341 let schema = Schema::parse_str(schema)?;
4342 let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
4343 assert_eq!(rs.get_names().len(), 3);
4344 for s in &[
4345 "space.record_name",
4346 "space.middle_record_name",
4347 "space.inner_record_name",
4348 ] {
4349 assert!(rs.get_names().contains_key(&Name::new(s)?));
4350 }
4351
4352 Ok(())
4353 }
4354
4355 #[test]
4356 fn avro_3448_test_proper_multi_level_resolution_inner_record_middle_namespace() -> TestResult {
4357 let schema = r#"
4358 {
4359 "name": "record_name",
4360 "namespace": "space",
4361 "type": "record",
4362 "fields": [
4363 {
4364 "name": "outer_field_1",
4365 "type": [
4366 "null",
4367 {
4368 "type":"record",
4369 "name":"middle_record_name",
4370 "namespace":"middle_namespace",
4371 "fields":[
4372 {
4373 "name":"middle_field_1",
4374 "type":[
4375 "null",
4376 {
4377 "type":"record",
4378 "name":"inner_record_name",
4379 "fields":[
4380 {
4381 "name":"inner_field_1",
4382 "type":"double"
4383 }
4384 ]
4385 }
4386 ]
4387 }
4388 ]
4389 }
4390 ]
4391 },
4392 {
4393 "name": "outer_field_2",
4394 "type" : "middle_namespace.inner_record_name"
4395 }
4396 ]
4397 }
4398 "#;
4399 let schema = Schema::parse_str(schema)?;
4400 let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
4401 assert_eq!(rs.get_names().len(), 3);
4402 for s in &[
4403 "space.record_name",
4404 "middle_namespace.middle_record_name",
4405 "middle_namespace.inner_record_name",
4406 ] {
4407 assert!(rs.get_names().contains_key(&Name::new(s)?));
4408 }
4409
4410 Ok(())
4411 }
4412
4413 #[test]
4414 fn avro_3448_test_proper_multi_level_resolution_inner_record_inner_namespace() -> TestResult {
4415 let schema = r#"
4416 {
4417 "name": "record_name",
4418 "namespace": "space",
4419 "type": "record",
4420 "fields": [
4421 {
4422 "name": "outer_field_1",
4423 "type": [
4424 "null",
4425 {
4426 "type":"record",
4427 "name":"middle_record_name",
4428 "namespace":"middle_namespace",
4429 "fields":[
4430 {
4431 "name":"middle_field_1",
4432 "type":[
4433 "null",
4434 {
4435 "type":"record",
4436 "name":"inner_record_name",
4437 "namespace":"inner_namespace",
4438 "fields":[
4439 {
4440 "name":"inner_field_1",
4441 "type":"double"
4442 }
4443 ]
4444 }
4445 ]
4446 }
4447 ]
4448 }
4449 ]
4450 },
4451 {
4452 "name": "outer_field_2",
4453 "type" : "inner_namespace.inner_record_name"
4454 }
4455 ]
4456 }
4457 "#;
4458 let schema = Schema::parse_str(schema)?;
4459 let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
4460 assert_eq!(rs.get_names().len(), 3);
4461 for s in &[
4462 "space.record_name",
4463 "middle_namespace.middle_record_name",
4464 "inner_namespace.inner_record_name",
4465 ] {
4466 assert!(rs.get_names().contains_key(&Name::new(s)?));
4467 }
4468
4469 Ok(())
4470 }
4471
4472 #[test]
4473 fn avro_3448_test_proper_in_array_resolution_inherited_namespace() -> TestResult {
4474 let schema = r#"
4475 {
4476 "name": "record_name",
4477 "namespace": "space",
4478 "type": "record",
4479 "fields": [
4480 {
4481 "name": "outer_field_1",
4482 "type": {
4483 "type":"array",
4484 "items":{
4485 "type":"record",
4486 "name":"in_array_record",
4487 "fields": [
4488 {
4489 "name":"array_record_field",
4490 "type":"string"
4491 }
4492 ]
4493 }
4494 }
4495 },
4496 {
4497 "name":"outer_field_2",
4498 "type":"in_array_record"
4499 }
4500 ]
4501 }
4502 "#;
4503 let schema = Schema::parse_str(schema)?;
4504 let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
4505 assert_eq!(rs.get_names().len(), 2);
4506 for s in &["space.record_name", "space.in_array_record"] {
4507 assert!(rs.get_names().contains_key(&Name::new(s)?));
4508 }
4509
4510 Ok(())
4511 }
4512
4513 #[test]
4514 fn avro_3448_test_proper_in_map_resolution_inherited_namespace() -> TestResult {
4515 let schema = r#"
4516 {
4517 "name": "record_name",
4518 "namespace": "space",
4519 "type": "record",
4520 "fields": [
4521 {
4522 "name": "outer_field_1",
4523 "type": {
4524 "type":"map",
4525 "values":{
4526 "type":"record",
4527 "name":"in_map_record",
4528 "fields": [
4529 {
4530 "name":"map_record_field",
4531 "type":"string"
4532 }
4533 ]
4534 }
4535 }
4536 },
4537 {
4538 "name":"outer_field_2",
4539 "type":"in_map_record"
4540 }
4541 ]
4542 }
4543 "#;
4544 let schema = Schema::parse_str(schema)?;
4545 let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
4546 assert_eq!(rs.get_names().len(), 2);
4547 for s in &["space.record_name", "space.in_map_record"] {
4548 assert!(rs.get_names().contains_key(&Name::new(s)?));
4549 }
4550
4551 Ok(())
4552 }
4553
4554 #[test]
4555 fn avro_3466_test_to_json_inner_enum_inner_namespace() -> TestResult {
4556 let schema = r#"
4557 {
4558 "name": "record_name",
4559 "namespace": "space",
4560 "type": "record",
4561 "fields": [
4562 {
4563 "name": "outer_field_1",
4564 "type": [
4565 "null",
4566 {
4567 "type":"enum",
4568 "name":"inner_enum_name",
4569 "namespace": "inner_space",
4570 "symbols":["Extensive","Testing"]
4571 }
4572 ]
4573 },
4574 {
4575 "name": "outer_field_2",
4576 "type" : "inner_space.inner_enum_name"
4577 }
4578 ]
4579 }
4580 "#;
4581 let schema = Schema::parse_str(schema)?;
4582 let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
4583
4584 assert_eq!(rs.get_names().len(), 2);
4586 for s in &["space.record_name", "inner_space.inner_enum_name"] {
4587 assert!(rs.get_names().contains_key(&Name::new(s)?));
4588 }
4589
4590 let schema_str = serde_json::to_string(&schema).expect("test failed");
4592 let _schema = Schema::parse_str(&schema_str).expect("test failed");
4593 assert_eq!(schema, _schema);
4594
4595 Ok(())
4596 }
4597
4598 #[test]
4599 fn avro_3466_test_to_json_inner_fixed_inner_namespace() -> TestResult {
4600 let schema = r#"
4601 {
4602 "name": "record_name",
4603 "namespace": "space",
4604 "type": "record",
4605 "fields": [
4606 {
4607 "name": "outer_field_1",
4608 "type": [
4609 "null",
4610 {
4611 "type":"fixed",
4612 "name":"inner_fixed_name",
4613 "namespace": "inner_space",
4614 "size":54
4615 }
4616 ]
4617 },
4618 {
4619 "name": "outer_field_2",
4620 "type" : "inner_space.inner_fixed_name"
4621 }
4622 ]
4623 }
4624 "#;
4625 let schema = Schema::parse_str(schema)?;
4626 let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
4627
4628 assert_eq!(rs.get_names().len(), 2);
4630 for s in &["space.record_name", "inner_space.inner_fixed_name"] {
4631 assert!(rs.get_names().contains_key(&Name::new(s)?));
4632 }
4633
4634 let schema_str = serde_json::to_string(&schema).expect("test failed");
4636 let _schema = Schema::parse_str(&schema_str).expect("test failed");
4637 assert_eq!(schema, _schema);
4638
4639 Ok(())
4640 }
4641
4642 fn assert_avro_3512_aliases(aliases: &Aliases) {
4643 match aliases {
4644 Some(aliases) => {
4645 assert_eq!(aliases.len(), 3);
4646 assert_eq!(aliases[0], Alias::new("space.b").unwrap());
4647 assert_eq!(aliases[1], Alias::new("x.y").unwrap());
4648 assert_eq!(aliases[2], Alias::new(".c").unwrap());
4649 }
4650 None => {
4651 panic!("'aliases' must be Some");
4652 }
4653 }
4654 }
4655
4656 #[test]
4657 fn avro_3512_alias_with_null_namespace_record() -> TestResult {
4658 let schema = Schema::parse_str(
4659 r#"
4660 {
4661 "type": "record",
4662 "name": "a",
4663 "namespace": "space",
4664 "aliases": ["b", "x.y", ".c"],
4665 "fields" : [
4666 {"name": "time", "type": "long"}
4667 ]
4668 }
4669 "#,
4670 )?;
4671
4672 if let Schema::Record(RecordSchema { ref aliases, .. }) = schema {
4673 assert_avro_3512_aliases(aliases);
4674 } else {
4675 panic!("The Schema should be a record: {schema:?}");
4676 }
4677
4678 Ok(())
4679 }
4680
4681 #[test]
4682 fn avro_3512_alias_with_null_namespace_enum() -> TestResult {
4683 let schema = Schema::parse_str(
4684 r#"
4685 {
4686 "type": "enum",
4687 "name": "a",
4688 "namespace": "space",
4689 "aliases": ["b", "x.y", ".c"],
4690 "symbols" : [
4691 "symbol1", "symbol2"
4692 ]
4693 }
4694 "#,
4695 )?;
4696
4697 if let Schema::Enum(EnumSchema { ref aliases, .. }) = schema {
4698 assert_avro_3512_aliases(aliases);
4699 } else {
4700 panic!("The Schema should be an enum: {schema:?}");
4701 }
4702
4703 Ok(())
4704 }
4705
4706 #[test]
4707 fn avro_3512_alias_with_null_namespace_fixed() -> TestResult {
4708 let schema = Schema::parse_str(
4709 r#"
4710 {
4711 "type": "fixed",
4712 "name": "a",
4713 "namespace": "space",
4714 "aliases": ["b", "x.y", ".c"],
4715 "size" : 12
4716 }
4717 "#,
4718 )?;
4719
4720 if let Schema::Fixed(FixedSchema { ref aliases, .. }) = schema {
4721 assert_avro_3512_aliases(aliases);
4722 } else {
4723 panic!("The Schema should be a fixed: {schema:?}");
4724 }
4725
4726 Ok(())
4727 }
4728
4729 #[test]
4730 fn avro_3518_serialize_aliases_record() -> TestResult {
4731 let schema = Schema::parse_str(
4732 r#"
4733 {
4734 "type": "record",
4735 "name": "a",
4736 "namespace": "space",
4737 "aliases": ["b", "x.y", ".c"],
4738 "fields" : [
4739 {
4740 "name": "time",
4741 "type": "long",
4742 "doc": "The documentation is not serialized",
4743 "default": 123,
4744 "aliases": ["time1", "ns.time2"]
4745 }
4746 ]
4747 }
4748 "#,
4749 )?;
4750
4751 let value = serde_json::to_value(&schema)?;
4752 let serialized = serde_json::to_string(&value)?;
4753 assert_eq!(
4754 r#"{"aliases":["space.b","x.y","c"],"fields":[{"aliases":["time1","ns.time2"],"default":123,"name":"time","type":"long"}],"name":"a","namespace":"space","type":"record"}"#,
4755 &serialized
4756 );
4757 assert_eq!(schema, Schema::parse_str(&serialized)?);
4758
4759 Ok(())
4760 }
4761
4762 #[test]
4763 fn avro_3518_serialize_aliases_enum() -> TestResult {
4764 let schema = Schema::parse_str(
4765 r#"
4766 {
4767 "type": "enum",
4768 "name": "a",
4769 "namespace": "space",
4770 "aliases": ["b", "x.y", ".c"],
4771 "symbols" : [
4772 "symbol1", "symbol2"
4773 ]
4774 }
4775 "#,
4776 )?;
4777
4778 let value = serde_json::to_value(&schema)?;
4779 let serialized = serde_json::to_string(&value)?;
4780 assert_eq!(
4781 r#"{"aliases":["space.b","x.y","c"],"name":"a","namespace":"space","symbols":["symbol1","symbol2"],"type":"enum"}"#,
4782 &serialized
4783 );
4784 assert_eq!(schema, Schema::parse_str(&serialized)?);
4785
4786 Ok(())
4787 }
4788
4789 #[test]
4790 fn avro_3518_serialize_aliases_fixed() -> TestResult {
4791 let schema = Schema::parse_str(
4792 r#"
4793 {
4794 "type": "fixed",
4795 "name": "a",
4796 "namespace": "space",
4797 "aliases": ["b", "x.y", ".c"],
4798 "size" : 12
4799 }
4800 "#,
4801 )?;
4802
4803 let value = serde_json::to_value(&schema)?;
4804 let serialized = serde_json::to_string(&value)?;
4805 assert_eq!(
4806 r#"{"aliases":["space.b","x.y","c"],"name":"a","namespace":"space","size":12,"type":"fixed"}"#,
4807 &serialized
4808 );
4809 assert_eq!(schema, Schema::parse_str(&serialized)?);
4810
4811 Ok(())
4812 }
4813
4814 #[test]
4815 fn avro_3130_parse_anonymous_union_type() -> TestResult {
4816 let schema_str = r#"
4817 {
4818 "type": "record",
4819 "name": "AccountEvent",
4820 "fields": [
4821 {"type":
4822 ["null",
4823 { "name": "accountList",
4824 "type": {
4825 "type": "array",
4826 "items": "long"
4827 }
4828 }
4829 ],
4830 "name":"NullableLongArray"
4831 }
4832 ]
4833 }
4834 "#;
4835 let schema = Schema::parse_str(schema_str)?;
4836
4837 if let Schema::Record(RecordSchema { name, fields, .. }) = schema {
4838 assert_eq!(name, Name::new("AccountEvent")?);
4839
4840 let field = &fields[0];
4841 assert_eq!(&field.name, "NullableLongArray");
4842
4843 if let Schema::Union(ref union) = field.schema {
4844 assert_eq!(union.schemas[0], Schema::Null);
4845
4846 if let Schema::Array(ref array_schema) = union.schemas[1] {
4847 if let Schema::Long = *array_schema.items {
4848 } else {
4850 panic!("Expected a Schema::Array of type Long");
4851 }
4852 } else {
4853 panic!("Expected Schema::Array");
4854 }
4855 } else {
4856 panic!("Expected Schema::Union");
4857 }
4858 } else {
4859 panic!("Expected Schema::Record");
4860 }
4861
4862 Ok(())
4863 }
4864
4865 #[test]
4866 fn avro_custom_attributes_schema_without_attributes() -> TestResult {
4867 let schemata_str = [
4868 r#"
4869 {
4870 "type": "record",
4871 "name": "Rec",
4872 "doc": "A Record schema without custom attributes",
4873 "fields": []
4874 }
4875 "#,
4876 r#"
4877 {
4878 "type": "enum",
4879 "name": "Enum",
4880 "doc": "An Enum schema without custom attributes",
4881 "symbols": []
4882 }
4883 "#,
4884 r#"
4885 {
4886 "type": "fixed",
4887 "name": "Fixed",
4888 "doc": "A Fixed schema without custom attributes",
4889 "size": 0
4890 }
4891 "#,
4892 ];
4893 for schema_str in schemata_str.iter() {
4894 let schema = Schema::parse_str(schema_str)?;
4895 assert_eq!(schema.custom_attributes(), Some(&Default::default()));
4896 }
4897
4898 Ok(())
4899 }
4900
4901 const CUSTOM_ATTRS_SUFFIX: &str = r#"
4902 "string_key": "value",
4903 "number_key": 1.23,
4904 "null_key": null,
4905 "array_key": [1, 2, 3],
4906 "object_key": {
4907 "key": "value"
4908 }
4909 "#;
4910
4911 #[test]
4912 fn avro_3609_custom_attributes_schema_with_attributes() -> TestResult {
4913 let schemata_str = [
4914 r#"
4915 {
4916 "type": "record",
4917 "name": "Rec",
4918 "namespace": "ns",
4919 "doc": "A Record schema with custom attributes",
4920 "fields": [],
4921 {{{}}}
4922 }
4923 "#,
4924 r#"
4925 {
4926 "type": "enum",
4927 "name": "Enum",
4928 "namespace": "ns",
4929 "doc": "An Enum schema with custom attributes",
4930 "symbols": [],
4931 {{{}}}
4932 }
4933 "#,
4934 r#"
4935 {
4936 "type": "fixed",
4937 "name": "Fixed",
4938 "namespace": "ns",
4939 "doc": "A Fixed schema with custom attributes",
4940 "size": 2,
4941 {{{}}}
4942 }
4943 "#,
4944 ];
4945
4946 for schema_str in schemata_str.iter() {
4947 let schema = Schema::parse_str(
4948 schema_str
4949 .to_owned()
4950 .replace("{{{}}}", CUSTOM_ATTRS_SUFFIX)
4951 .as_str(),
4952 )?;
4953
4954 assert_eq!(
4955 schema.custom_attributes(),
4956 Some(&expected_custom_attributes())
4957 );
4958 }
4959
4960 Ok(())
4961 }
4962
4963 fn expected_custom_attributes() -> BTreeMap<String, Value> {
4964 let mut expected_attributes: BTreeMap<String, Value> = Default::default();
4965 expected_attributes.insert("string_key".to_string(), Value::String("value".to_string()));
4966 expected_attributes.insert("number_key".to_string(), json!(1.23));
4967 expected_attributes.insert("null_key".to_string(), Value::Null);
4968 expected_attributes.insert(
4969 "array_key".to_string(),
4970 Value::Array(vec![json!(1), json!(2), json!(3)]),
4971 );
4972 let mut object_value: HashMap<String, Value> = HashMap::new();
4973 object_value.insert("key".to_string(), Value::String("value".to_string()));
4974 expected_attributes.insert("object_key".to_string(), json!(object_value));
4975 expected_attributes
4976 }
4977
4978 #[test]
4979 fn avro_3609_custom_attributes_record_field_without_attributes() -> TestResult {
4980 let schema_str = String::from(
4981 r#"
4982 {
4983 "type": "record",
4984 "name": "Rec",
4985 "doc": "A Record schema without custom attributes",
4986 "fields": [
4987 {
4988 "name": "field_one",
4989 "type": "float",
4990 {{{}}}
4991 }
4992 ]
4993 }
4994 "#,
4995 );
4996
4997 let schema = Schema::parse_str(schema_str.replace("{{{}}}", CUSTOM_ATTRS_SUFFIX).as_str())?;
4998
4999 match schema {
5000 Schema::Record(RecordSchema { name, fields, .. }) => {
5001 assert_eq!(name, Name::new("Rec")?);
5002 assert_eq!(fields.len(), 1);
5003 let field = &fields[0];
5004 assert_eq!(&field.name, "field_one");
5005 assert_eq!(field.custom_attributes, expected_custom_attributes());
5006 }
5007 _ => panic!("Expected Schema::Record"),
5008 }
5009
5010 Ok(())
5011 }
5012
5013 #[test]
5014 fn avro_3625_null_is_first() -> TestResult {
5015 let schema_str = String::from(
5016 r#"
5017 {
5018 "type": "record",
5019 "name": "union_schema_test",
5020 "fields": [
5021 {"name": "a", "type": ["null", "long"], "default": null}
5022 ]
5023 }
5024 "#,
5025 );
5026
5027 let schema = Schema::parse_str(&schema_str)?;
5028
5029 match schema {
5030 Schema::Record(RecordSchema { name, fields, .. }) => {
5031 assert_eq!(name, Name::new("union_schema_test")?);
5032 assert_eq!(fields.len(), 1);
5033 let field = &fields[0];
5034 assert_eq!(&field.name, "a");
5035 assert_eq!(&field.default, &Some(Value::Null));
5036 match &field.schema {
5037 Schema::Union(union) => {
5038 assert_eq!(union.variants().len(), 2);
5039 assert!(union.is_nullable());
5040 assert_eq!(union.variants()[0], Schema::Null);
5041 assert_eq!(union.variants()[1], Schema::Long);
5042 }
5043 _ => panic!("Expected Schema::Union"),
5044 }
5045 }
5046 _ => panic!("Expected Schema::Record"),
5047 }
5048
5049 Ok(())
5050 }
5051
5052 #[test]
5053 fn avro_3625_null_is_last() -> TestResult {
5054 let schema_str = String::from(
5055 r#"
5056 {
5057 "type": "record",
5058 "name": "union_schema_test",
5059 "fields": [
5060 {"name": "a", "type": ["long","null"], "default": 123}
5061 ]
5062 }
5063 "#,
5064 );
5065
5066 let schema = Schema::parse_str(&schema_str)?;
5067
5068 match schema {
5069 Schema::Record(RecordSchema { name, fields, .. }) => {
5070 assert_eq!(name, Name::new("union_schema_test")?);
5071 assert_eq!(fields.len(), 1);
5072 let field = &fields[0];
5073 assert_eq!(&field.name, "a");
5074 assert_eq!(&field.default, &Some(json!(123)));
5075 match &field.schema {
5076 Schema::Union(union) => {
5077 assert_eq!(union.variants().len(), 2);
5078 assert_eq!(union.variants()[0], Schema::Long);
5079 assert_eq!(union.variants()[1], Schema::Null);
5080 }
5081 _ => panic!("Expected Schema::Union"),
5082 }
5083 }
5084 _ => panic!("Expected Schema::Record"),
5085 }
5086
5087 Ok(())
5088 }
5089
5090 #[test]
5091 fn avro_3625_null_is_the_middle() -> TestResult {
5092 let schema_str = String::from(
5093 r#"
5094 {
5095 "type": "record",
5096 "name": "union_schema_test",
5097 "fields": [
5098 {"name": "a", "type": ["long","null","int"], "default": 123}
5099 ]
5100 }
5101 "#,
5102 );
5103
5104 let schema = Schema::parse_str(&schema_str)?;
5105
5106 match schema {
5107 Schema::Record(RecordSchema { name, fields, .. }) => {
5108 assert_eq!(name, Name::new("union_schema_test")?);
5109 assert_eq!(fields.len(), 1);
5110 let field = &fields[0];
5111 assert_eq!(&field.name, "a");
5112 assert_eq!(&field.default, &Some(json!(123)));
5113 match &field.schema {
5114 Schema::Union(union) => {
5115 assert_eq!(union.variants().len(), 3);
5116 assert_eq!(union.variants()[0], Schema::Long);
5117 assert_eq!(union.variants()[1], Schema::Null);
5118 assert_eq!(union.variants()[2], Schema::Int);
5119 }
5120 _ => panic!("Expected Schema::Union"),
5121 }
5122 }
5123 _ => panic!("Expected Schema::Record"),
5124 }
5125
5126 Ok(())
5127 }
5128
5129 #[test]
5130 fn avro_3649_default_notintfirst() -> TestResult {
5131 let schema_str = String::from(
5132 r#"
5133 {
5134 "type": "record",
5135 "name": "union_schema_test",
5136 "fields": [
5137 {"name": "a", "type": ["string", "int"], "default": 123}
5138 ]
5139 }
5140 "#,
5141 );
5142
5143 let schema = Schema::parse_str(&schema_str)?;
5144
5145 match schema {
5146 Schema::Record(RecordSchema { name, fields, .. }) => {
5147 assert_eq!(name, Name::new("union_schema_test")?);
5148 assert_eq!(fields.len(), 1);
5149 let field = &fields[0];
5150 assert_eq!(&field.name, "a");
5151 assert_eq!(&field.default, &Some(json!(123)));
5152 match &field.schema {
5153 Schema::Union(union) => {
5154 assert_eq!(union.variants().len(), 2);
5155 assert_eq!(union.variants()[0], Schema::String);
5156 assert_eq!(union.variants()[1], Schema::Int);
5157 }
5158 _ => panic!("Expected Schema::Union"),
5159 }
5160 }
5161 _ => panic!("Expected Schema::Record"),
5162 }
5163
5164 Ok(())
5165 }
5166
5167 #[test]
5168 fn avro_3709_parsing_of_record_field_aliases() -> TestResult {
5169 let schema = r#"
5170 {
5171 "name": "rec",
5172 "type": "record",
5173 "fields": [
5174 {
5175 "name": "num",
5176 "type": "int",
5177 "aliases": ["num1", "num2"]
5178 }
5179 ]
5180 }
5181 "#;
5182
5183 let schema = Schema::parse_str(schema)?;
5184 if let Schema::Record(RecordSchema { fields, .. }) = schema {
5185 let num_field = &fields[0];
5186 assert_eq!(num_field.name, "num");
5187 assert_eq!(num_field.aliases, Some(vec!("num1".into(), "num2".into())));
5188 } else {
5189 panic!("Expected a record schema!");
5190 }
5191
5192 Ok(())
5193 }
5194
5195 #[test]
5196 fn avro_3735_parse_enum_namespace() -> TestResult {
5197 let schema = r#"
5198 {
5199 "type": "record",
5200 "name": "Foo",
5201 "namespace": "name.space",
5202 "fields":
5203 [
5204 {
5205 "name": "barInit",
5206 "type":
5207 {
5208 "type": "enum",
5209 "name": "Bar",
5210 "symbols":
5211 [
5212 "bar0",
5213 "bar1"
5214 ]
5215 }
5216 },
5217 {
5218 "name": "barUse",
5219 "type": "Bar"
5220 }
5221 ]
5222 }
5223 "#;
5224
5225 #[derive(
5226 Debug, PartialEq, Eq, Hash, PartialOrd, Ord, Clone, serde::Deserialize, serde::Serialize,
5227 )]
5228 pub enum Bar {
5229 #[serde(rename = "bar0")]
5230 Bar0,
5231 #[serde(rename = "bar1")]
5232 Bar1,
5233 }
5234
5235 #[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize)]
5236 pub struct Foo {
5237 #[serde(rename = "barInit")]
5238 pub bar_init: Bar,
5239 #[serde(rename = "barUse")]
5240 pub bar_use: Bar,
5241 }
5242
5243 let schema = Schema::parse_str(schema)?;
5244
5245 let foo = Foo {
5246 bar_init: Bar::Bar0,
5247 bar_use: Bar::Bar1,
5248 };
5249
5250 let avro_value = crate::to_value(foo)?;
5251 assert!(avro_value.validate(&schema));
5252
5253 let mut writer = crate::Writer::new(&schema, Vec::new());
5254
5255 writer.append(avro_value)?;
5257
5258 Ok(())
5259 }
5260
5261 #[test]
5262 fn avro_3755_deserialize() -> TestResult {
5263 #[derive(
5264 Debug, PartialEq, Eq, Hash, PartialOrd, Ord, Clone, serde::Deserialize, serde::Serialize,
5265 )]
5266 pub enum Bar {
5267 #[serde(rename = "bar0")]
5268 Bar0,
5269 #[serde(rename = "bar1")]
5270 Bar1,
5271 #[serde(rename = "bar2")]
5272 Bar2,
5273 }
5274
5275 #[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize)]
5276 pub struct Foo {
5277 #[serde(rename = "barInit")]
5278 pub bar_init: Bar,
5279 #[serde(rename = "barUse")]
5280 pub bar_use: Bar,
5281 }
5282
5283 let writer_schema = r#"{
5284 "type": "record",
5285 "name": "Foo",
5286 "fields":
5287 [
5288 {
5289 "name": "barInit",
5290 "type":
5291 {
5292 "type": "enum",
5293 "name": "Bar",
5294 "symbols":
5295 [
5296 "bar0",
5297 "bar1"
5298 ]
5299 }
5300 },
5301 {
5302 "name": "barUse",
5303 "type": "Bar"
5304 }
5305 ]
5306 }"#;
5307
5308 let reader_schema = r#"{
5309 "type": "record",
5310 "name": "Foo",
5311 "namespace": "name.space",
5312 "fields":
5313 [
5314 {
5315 "name": "barInit",
5316 "type":
5317 {
5318 "type": "enum",
5319 "name": "Bar",
5320 "symbols":
5321 [
5322 "bar0",
5323 "bar1",
5324 "bar2"
5325 ]
5326 }
5327 },
5328 {
5329 "name": "barUse",
5330 "type": "Bar"
5331 }
5332 ]
5333 }"#;
5334
5335 let writer_schema = Schema::parse_str(writer_schema)?;
5336 let foo = Foo {
5337 bar_init: Bar::Bar0,
5338 bar_use: Bar::Bar1,
5339 };
5340 let avro_value = crate::to_value(foo)?;
5341 assert!(
5342 avro_value.validate(&writer_schema),
5343 "value is valid for schema",
5344 );
5345 let datum = crate::to_avro_datum(&writer_schema, avro_value)?;
5346 let mut x = &datum[..];
5347 let reader_schema = Schema::parse_str(reader_schema)?;
5348 let deser_value = crate::from_avro_datum(&writer_schema, &mut x, Some(&reader_schema))?;
5349 match deser_value {
5350 types::Value::Record(fields) => {
5351 assert_eq!(fields.len(), 2);
5352 assert_eq!(fields[0].0, "barInit");
5353 assert_eq!(fields[0].1, types::Value::Enum(0, "bar0".to_string()));
5354 assert_eq!(fields[1].0, "barUse");
5355 assert_eq!(fields[1].1, types::Value::Enum(1, "bar1".to_string()));
5356 }
5357 _ => panic!("Expected Value::Record"),
5358 }
5359
5360 Ok(())
5361 }
5362
5363 #[test]
5364 fn test_avro_3780_decimal_schema_type_with_fixed() -> TestResult {
5365 let schema = json!(
5366 {
5367 "type": "record",
5368 "name": "recordWithDecimal",
5369 "fields": [
5370 {
5371 "name": "decimal",
5372 "type": "fixed",
5373 "name": "nestedFixed",
5374 "size": 8,
5375 "logicalType": "decimal",
5376 "precision": 4
5377 }
5378 ]
5379 });
5380
5381 let parse_result = Schema::parse(&schema);
5382 assert!(
5383 parse_result.is_ok(),
5384 "parse result must be ok, got: {parse_result:?}"
5385 );
5386
5387 Ok(())
5388 }
5389
5390 #[test]
5391 fn test_avro_3772_enum_default_wrong_type() -> TestResult {
5392 let schema = r#"
5393 {
5394 "type": "record",
5395 "name": "test",
5396 "fields": [
5397 {"name": "a", "type": "long", "default": 42},
5398 {"name": "b", "type": "string"},
5399 {
5400 "name": "c",
5401 "type": {
5402 "type": "enum",
5403 "name": "suit",
5404 "symbols": ["diamonds", "spades", "clubs", "hearts"],
5405 "default": 123
5406 }
5407 }
5408 ]
5409 }
5410 "#;
5411
5412 match Schema::parse_str(schema) {
5413 Err(err) => {
5414 assert_eq!(
5415 err.to_string(),
5416 "Default value for enum must be a string! Got: 123"
5417 );
5418 }
5419 _ => panic!("Expected an error"),
5420 }
5421 Ok(())
5422 }
5423
5424 #[test]
5425 fn test_avro_3812_handle_null_namespace_properly() -> TestResult {
5426 let schema_str = r#"
5427 {
5428 "namespace": "",
5429 "type": "record",
5430 "name": "my_schema",
5431 "fields": [
5432 {
5433 "name": "a",
5434 "type": {
5435 "type": "enum",
5436 "name": "my_enum",
5437 "namespace": "",
5438 "symbols": ["a", "b"]
5439 }
5440 }, {
5441 "name": "b",
5442 "type": {
5443 "type": "fixed",
5444 "name": "my_fixed",
5445 "namespace": "",
5446 "size": 10
5447 }
5448 }
5449 ]
5450 }
5451 "#;
5452
5453 let expected = r#"{"name":"my_schema","type":"record","fields":[{"name":"a","type":{"name":"my_enum","type":"enum","symbols":["a","b"]}},{"name":"b","type":{"name":"my_fixed","type":"fixed","size":10}}]}"#;
5454 let schema = Schema::parse_str(schema_str)?;
5455 let canonical_form = schema.canonical_form();
5456 assert_eq!(canonical_form, expected);
5457
5458 let name = Name::new("my_name")?;
5459 let fullname = name.fullname(Some("".to_string()));
5460 assert_eq!(fullname, "my_name");
5461 let qname = name.fully_qualified_name(&Some("".to_string())).to_string();
5462 assert_eq!(qname, "my_name");
5463
5464 Ok(())
5465 }
5466
5467 #[test]
5468 fn test_avro_3818_inherit_enclosing_namespace() -> TestResult {
5469 let schema_str = r#"
5471 {
5472 "namespace": "my_ns",
5473 "type": "record",
5474 "name": "my_schema",
5475 "fields": [
5476 {
5477 "name": "f1",
5478 "type": {
5479 "name": "enum1",
5480 "type": "enum",
5481 "symbols": ["a"]
5482 }
5483 }, {
5484 "name": "f2",
5485 "type": {
5486 "name": "fixed1",
5487 "type": "fixed",
5488 "size": 1
5489 }
5490 }
5491 ]
5492 }
5493 "#;
5494
5495 let expected = r#"{"name":"my_ns.my_schema","type":"record","fields":[{"name":"f1","type":{"name":"my_ns.enum1","type":"enum","symbols":["a"]}},{"name":"f2","type":{"name":"my_ns.fixed1","type":"fixed","size":1}}]}"#;
5496 let schema = Schema::parse_str(schema_str)?;
5497 let canonical_form = schema.canonical_form();
5498 assert_eq!(canonical_form, expected);
5499
5500 let schema_str = r#"
5503 {
5504 "namespace": "my_ns",
5505 "type": "record",
5506 "name": "my_schema",
5507 "fields": [
5508 {
5509 "name": "f1",
5510 "type": {
5511 "name": "enum1",
5512 "type": "enum",
5513 "namespace": "",
5514 "symbols": ["a"]
5515 }
5516 }, {
5517 "name": "f2",
5518 "type": {
5519 "name": "fixed1",
5520 "type": "fixed",
5521 "namespace": "",
5522 "size": 1
5523 }
5524 }
5525 ]
5526 }
5527 "#;
5528
5529 let expected = r#"{"name":"my_ns.my_schema","type":"record","fields":[{"name":"f1","type":{"name":"enum1","type":"enum","symbols":["a"]}},{"name":"f2","type":{"name":"fixed1","type":"fixed","size":1}}]}"#;
5530 let schema = Schema::parse_str(schema_str)?;
5531 let canonical_form = schema.canonical_form();
5532 assert_eq!(canonical_form, expected);
5533
5534 let schema_str = r#"
5536 {
5537 "namespace": "",
5538 "type": "record",
5539 "name": "my_schema",
5540 "fields": [
5541 {
5542 "name": "f1",
5543 "type": {
5544 "name": "enum1",
5545 "type": "enum",
5546 "namespace": "f1.ns",
5547 "symbols": ["a"]
5548 }
5549 }, {
5550 "name": "f2",
5551 "type": {
5552 "name": "f2.ns.fixed1",
5553 "type": "fixed",
5554 "size": 1
5555 }
5556 }
5557 ]
5558 }
5559 "#;
5560
5561 let expected = r#"{"name":"my_schema","type":"record","fields":[{"name":"f1","type":{"name":"f1.ns.enum1","type":"enum","symbols":["a"]}},{"name":"f2","type":{"name":"f2.ns.fixed1","type":"fixed","size":1}}]}"#;
5562 let schema = Schema::parse_str(schema_str)?;
5563 let canonical_form = schema.canonical_form();
5564 assert_eq!(canonical_form, expected);
5565
5566 let schema_str = r#"
5568 {
5569 "type": "record",
5570 "name": "my_ns.my_schema",
5571 "fields": [
5572 {
5573 "name": "f1",
5574 "type": {
5575 "name": "inner_record1",
5576 "type": "record",
5577 "fields": [
5578 {
5579 "name": "f1_1",
5580 "type": {
5581 "name": "enum1",
5582 "type": "enum",
5583 "symbols": ["a"]
5584 }
5585 }
5586 ]
5587 }
5588 }, {
5589 "name": "f2",
5590 "type": {
5591 "name": "inner_record2",
5592 "type": "record",
5593 "namespace": "inner_ns",
5594 "fields": [
5595 {
5596 "name": "f2_1",
5597 "type": {
5598 "name": "enum2",
5599 "type": "enum",
5600 "symbols": ["a"]
5601 }
5602 }
5603 ]
5604 }
5605 }
5606 ]
5607 }
5608 "#;
5609
5610 let expected = r#"{"name":"my_ns.my_schema","type":"record","fields":[{"name":"f1","type":{"name":"my_ns.inner_record1","type":"record","fields":[{"name":"f1_1","type":{"name":"my_ns.enum1","type":"enum","symbols":["a"]}}]}},{"name":"f2","type":{"name":"inner_ns.inner_record2","type":"record","fields":[{"name":"f2_1","type":{"name":"inner_ns.enum2","type":"enum","symbols":["a"]}}]}}]}"#;
5611 let schema = Schema::parse_str(schema_str)?;
5612 let canonical_form = schema.canonical_form();
5613 assert_eq!(canonical_form, expected);
5614
5615 Ok(())
5616 }
5617
5618 #[test]
5619 fn test_avro_3779_bigdecimal_schema() -> TestResult {
5620 let schema = json!(
5621 {
5622 "name": "decimal",
5623 "type": "bytes",
5624 "logicalType": "big-decimal"
5625 }
5626 );
5627
5628 let parse_result = Schema::parse(&schema);
5629 assert!(
5630 parse_result.is_ok(),
5631 "parse result must be ok, got: {parse_result:?}"
5632 );
5633 match parse_result? {
5634 Schema::BigDecimal => (),
5635 other => panic!("Expected Schema::BigDecimal but got: {other:?}"),
5636 }
5637
5638 Ok(())
5639 }
5640
5641 #[test]
5642 fn test_avro_3820_deny_invalid_field_names() -> TestResult {
5643 let schema_str = r#"
5644 {
5645 "name": "my_record",
5646 "type": "record",
5647 "fields": [
5648 {
5649 "name": "f1.x",
5650 "type": {
5651 "name": "my_enum",
5652 "type": "enum",
5653 "symbols": ["a"]
5654 }
5655 }, {
5656 "name": "f2",
5657 "type": {
5658 "name": "my_fixed",
5659 "type": "fixed",
5660 "size": 1
5661 }
5662 }
5663 ]
5664 }
5665 "#;
5666
5667 match Schema::parse_str(schema_str).map_err(Error::into_details) {
5668 Err(Details::FieldName(x)) if x == "f1.x" => Ok(()),
5669 other => Err(format!("Expected Details::FieldName, got {other:?}").into()),
5670 }
5671 }
5672
5673 #[test]
5674 fn test_avro_3827_disallow_duplicate_field_names() -> TestResult {
5675 let schema_str = r#"
5676 {
5677 "name": "my_schema",
5678 "type": "record",
5679 "fields": [
5680 {
5681 "name": "f1",
5682 "type": {
5683 "name": "a",
5684 "type": "record",
5685 "fields": []
5686 }
5687 }, {
5688 "name": "f1",
5689 "type": {
5690 "name": "b",
5691 "type": "record",
5692 "fields": []
5693 }
5694 }
5695 ]
5696 }
5697 "#;
5698
5699 match Schema::parse_str(schema_str).map_err(Error::into_details) {
5700 Err(Details::FieldNameDuplicate(_)) => (),
5701 other => {
5702 return Err(format!("Expected Details::FieldNameDuplicate, got {other:?}").into());
5703 }
5704 };
5705
5706 let schema_str = r#"
5707 {
5708 "name": "my_schema",
5709 "type": "record",
5710 "fields": [
5711 {
5712 "name": "f1",
5713 "type": {
5714 "name": "a",
5715 "type": "record",
5716 "fields": [
5717 {
5718 "name": "f1",
5719 "type": {
5720 "name": "b",
5721 "type": "record",
5722 "fields": []
5723 }
5724 }
5725 ]
5726 }
5727 }
5728 ]
5729 }
5730 "#;
5731
5732 let expected = r#"{"name":"my_schema","type":"record","fields":[{"name":"f1","type":{"name":"a","type":"record","fields":[{"name":"f1","type":{"name":"b","type":"record","fields":[]}}]}}]}"#;
5733 let schema = Schema::parse_str(schema_str)?;
5734 let canonical_form = schema.canonical_form();
5735 assert_eq!(canonical_form, expected);
5736
5737 Ok(())
5738 }
5739
5740 #[test]
5741 fn test_avro_3830_null_namespace_in_fully_qualified_names() -> TestResult {
5742 let schema_str = r#"
5745 {
5746 "name": ".record1",
5747 "namespace": "ns1",
5748 "type": "record",
5749 "fields": [
5750 {
5751 "name": "f1",
5752 "type": {
5753 "name": ".enum1",
5754 "namespace": "ns2",
5755 "type": "enum",
5756 "symbols": ["a"]
5757 }
5758 }, {
5759 "name": "f2",
5760 "type": {
5761 "name": ".fxed1",
5762 "namespace": "ns3",
5763 "type": "fixed",
5764 "size": 1
5765 }
5766 }
5767 ]
5768 }
5769 "#;
5770
5771 let expected = r#"{"name":"record1","type":"record","fields":[{"name":"f1","type":{"name":"enum1","type":"enum","symbols":["a"]}},{"name":"f2","type":{"name":"fxed1","type":"fixed","size":1}}]}"#;
5772 let schema = Schema::parse_str(schema_str)?;
5773 let canonical_form = schema.canonical_form();
5774 assert_eq!(canonical_form, expected);
5775
5776 let schema_str = r#"
5778 {
5779 "name": ".record1",
5780 "namespace": "ns1",
5781 "type": "record",
5782 "fields": [
5783 {
5784 "name": "f1",
5785 "type": {
5786 "name": "enum1",
5787 "type": "enum",
5788 "symbols": ["a"]
5789 }
5790 }, {
5791 "name": "f2",
5792 "type": {
5793 "name": "fxed1",
5794 "type": "fixed",
5795 "size": 1
5796 }
5797 }
5798 ]
5799 }
5800 "#;
5801
5802 let expected = r#"{"name":"record1","type":"record","fields":[{"name":"f1","type":{"name":"enum1","type":"enum","symbols":["a"]}},{"name":"f2","type":{"name":"fxed1","type":"fixed","size":1}}]}"#;
5803 let schema = Schema::parse_str(schema_str)?;
5804 let canonical_form = schema.canonical_form();
5805 assert_eq!(canonical_form, expected);
5806
5807 let name = Name::new(".my_name")?;
5808 let fullname = name.fullname(None);
5809 assert_eq!(fullname, "my_name");
5810 let qname = name.fully_qualified_name(&None).to_string();
5811 assert_eq!(qname, "my_name");
5812
5813 Ok(())
5814 }
5815
5816 #[test]
5817 fn test_avro_3814_schema_resolution_failure() -> TestResult {
5818 let reader_schema = json!(
5820 {
5821 "type": "record",
5822 "name": "MyOuterRecord",
5823 "fields": [
5824 {
5825 "name": "inner_record",
5826 "type": [
5827 "null",
5828 {
5829 "type": "record",
5830 "name": "MyRecord",
5831 "fields": [
5832 {"name": "a", "type": "string"}
5833 ]
5834 }
5835 ],
5836 "default": null
5837 }
5838 ]
5839 }
5840 );
5841
5842 let writer_schema = json!(
5845 {
5846 "type": "record",
5847 "name": "MyOuterRecord",
5848 "fields": [
5849 {
5850 "name": "inner_record",
5851 "type": [
5852 "null",
5853 {
5854 "type": "record",
5855 "name": "MyRecord",
5856 "fields": [
5857 {"name": "a", "type": "string"},
5858 {
5859 "name": "b",
5860 "type": [
5861 "null",
5862 {
5863 "type": "enum",
5864 "name": "MyEnum",
5865 "symbols": ["A", "B", "C"],
5866 "default": "C"
5867 }
5868 ],
5869 "default": null
5870 },
5871 ]
5872 }
5873 ]
5874 }
5875 ],
5876 "default": null
5877 }
5878 );
5879
5880 #[derive(Serialize, Deserialize, Debug)]
5883 struct MyInnerRecordReader {
5884 a: String,
5885 }
5886
5887 #[derive(Serialize, Deserialize, Debug)]
5888 struct MyRecordReader {
5889 inner_record: Option<MyInnerRecordReader>,
5890 }
5891
5892 #[derive(Serialize, Deserialize, Debug)]
5893 enum MyEnum {
5894 A,
5895 B,
5896 C,
5897 }
5898
5899 #[derive(Serialize, Deserialize, Debug)]
5900 struct MyInnerRecordWriter {
5901 a: String,
5902 b: Option<MyEnum>,
5903 }
5904
5905 #[derive(Serialize, Deserialize, Debug)]
5906 struct MyRecordWriter {
5907 inner_record: Option<MyInnerRecordWriter>,
5908 }
5909
5910 let s = MyRecordWriter {
5911 inner_record: Some(MyInnerRecordWriter {
5912 a: "foo".to_string(),
5913 b: None,
5914 }),
5915 };
5916
5917 let writer_schema = Schema::parse(&writer_schema)?;
5919 let avro_value = crate::to_value(s)?;
5920 assert!(
5921 avro_value.validate(&writer_schema),
5922 "value is valid for schema",
5923 );
5924 let datum = crate::to_avro_datum(&writer_schema, avro_value)?;
5925
5926 let reader_schema = Schema::parse(&reader_schema)?;
5928 let mut x = &datum[..];
5929
5930 let deser_value = crate::from_avro_datum(&writer_schema, &mut x, Some(&reader_schema))?;
5932 assert!(deser_value.validate(&reader_schema));
5933
5934 let d: MyRecordReader = crate::from_value(&deser_value)?;
5936 assert_eq!(d.inner_record.unwrap().a, "foo".to_string());
5937 Ok(())
5938 }
5939
5940 #[test]
5941 fn test_avro_3837_disallow_invalid_namespace() -> TestResult {
5942 let schema_str = r#"
5944 {
5945 "name": "record1",
5946 "namespace": "ns1",
5947 "type": "record",
5948 "fields": []
5949 }
5950 "#;
5951
5952 let expected = r#"{"name":"ns1.record1","type":"record","fields":[]}"#;
5953 let schema = Schema::parse_str(schema_str)?;
5954 let canonical_form = schema.canonical_form();
5955 assert_eq!(canonical_form, expected);
5956
5957 let schema_str = r#"
5959 {
5960 "name": "enum1",
5961 "namespace": "ns1.foo.bar",
5962 "type": "enum",
5963 "symbols": ["a"]
5964 }
5965 "#;
5966
5967 let expected = r#"{"name":"ns1.foo.bar.enum1","type":"enum","symbols":["a"]}"#;
5968 let schema = Schema::parse_str(schema_str)?;
5969 let canonical_form = schema.canonical_form();
5970 assert_eq!(canonical_form, expected);
5971
5972 let schema_str = r#"
5974 {
5975 "name": "fixed1",
5976 "namespace": ".ns1.a.b",
5977 "type": "fixed",
5978 "size": 1
5979 }
5980 "#;
5981
5982 match Schema::parse_str(schema_str).map_err(Error::into_details) {
5983 Err(Details::InvalidNamespace(_, _)) => (),
5984 other => {
5985 return Err(format!("Expected Details::InvalidNamespace, got {other:?}").into());
5986 }
5987 };
5988
5989 let schema_str = r#"
5991 {
5992 "name": "record1",
5993 "namespace": "ns1.a*b.c",
5994 "type": "record",
5995 "fields": []
5996 }
5997 "#;
5998
5999 match Schema::parse_str(schema_str).map_err(Error::into_details) {
6000 Err(Details::InvalidNamespace(_, _)) => (),
6001 other => {
6002 return Err(format!("Expected Details::InvalidNamespace, got {other:?}").into());
6003 }
6004 };
6005
6006 let schema_str = r#"
6008 {
6009 "name": "fixed1",
6010 "namespace": "ns1.1a.b",
6011 "type": "fixed",
6012 "size": 1
6013 }
6014 "#;
6015
6016 match Schema::parse_str(schema_str).map_err(Error::into_details) {
6017 Err(Details::InvalidNamespace(_, _)) => (),
6018 other => {
6019 return Err(format!("Expected Details::InvalidNamespace, got {other:?}").into());
6020 }
6021 };
6022
6023 let schema_str = r#"
6025 {
6026 "name": "fixed1",
6027 "namespace": "ns1..a",
6028 "type": "fixed",
6029 "size": 1
6030 }
6031 "#;
6032
6033 match Schema::parse_str(schema_str).map_err(Error::into_details) {
6034 Err(Details::InvalidNamespace(_, _)) => (),
6035 other => {
6036 return Err(format!("Expected Details::InvalidNamespace, got {other:?}").into());
6037 }
6038 };
6039
6040 let schema_str = r#"
6042 {
6043 "name": "fixed1",
6044 "namespace": "ns1.a.",
6045 "type": "fixed",
6046 "size": 1
6047 }
6048 "#;
6049
6050 match Schema::parse_str(schema_str).map_err(Error::into_details) {
6051 Err(Details::InvalidNamespace(_, _)) => (),
6052 other => {
6053 return Err(format!("Expected Details::InvalidNamespace, got {other:?}").into());
6054 }
6055 };
6056
6057 Ok(())
6058 }
6059
6060 #[test]
6061 fn test_avro_3851_validate_default_value_of_simple_record_field() -> TestResult {
6062 let schema_str = r#"
6063 {
6064 "name": "record1",
6065 "namespace": "ns",
6066 "type": "record",
6067 "fields": [
6068 {
6069 "name": "f1",
6070 "type": "int",
6071 "default": "invalid"
6072 }
6073 ]
6074 }
6075 "#;
6076 let expected = Details::GetDefaultRecordField(
6077 "f1".to_string(),
6078 "ns.record1".to_string(),
6079 r#""int""#.to_string(),
6080 )
6081 .to_string();
6082 let result = Schema::parse_str(schema_str);
6083 assert!(result.is_err());
6084 let err = result
6085 .map_err(|e| e.to_string())
6086 .err()
6087 .unwrap_or_else(|| "unexpected".to_string());
6088 assert_eq!(expected, err);
6089
6090 Ok(())
6091 }
6092
6093 #[test]
6094 fn test_avro_3851_validate_default_value_of_nested_record_field() -> TestResult {
6095 let schema_str = r#"
6096 {
6097 "name": "record1",
6098 "namespace": "ns",
6099 "type": "record",
6100 "fields": [
6101 {
6102 "name": "f1",
6103 "type": {
6104 "name": "record2",
6105 "type": "record",
6106 "fields": [
6107 {
6108 "name": "f1_1",
6109 "type": "int"
6110 }
6111 ]
6112 },
6113 "default": "invalid"
6114 }
6115 ]
6116 }
6117 "#;
6118 let expected = Details::GetDefaultRecordField(
6119 "f1".to_string(),
6120 "ns.record1".to_string(),
6121 r#"{"name":"ns.record2","type":"record","fields":[{"name":"f1_1","type":"int"}]}"#
6122 .to_string(),
6123 )
6124 .to_string();
6125 let result = Schema::parse_str(schema_str);
6126 assert!(result.is_err());
6127 let err = result
6128 .map_err(|e| e.to_string())
6129 .err()
6130 .unwrap_or_else(|| "unexpected".to_string());
6131 assert_eq!(expected, err);
6132
6133 Ok(())
6134 }
6135
6136 #[test]
6137 fn test_avro_3851_validate_default_value_of_enum_record_field() -> TestResult {
6138 let schema_str = r#"
6139 {
6140 "name": "record1",
6141 "namespace": "ns",
6142 "type": "record",
6143 "fields": [
6144 {
6145 "name": "f1",
6146 "type": {
6147 "name": "enum1",
6148 "type": "enum",
6149 "symbols": ["a", "b", "c"]
6150 },
6151 "default": "invalid"
6152 }
6153 ]
6154 }
6155 "#;
6156 let expected = Details::GetDefaultRecordField(
6157 "f1".to_string(),
6158 "ns.record1".to_string(),
6159 r#"{"name":"ns.enum1","type":"enum","symbols":["a","b","c"]}"#.to_string(),
6160 )
6161 .to_string();
6162 let result = Schema::parse_str(schema_str);
6163 assert!(result.is_err());
6164 let err = result
6165 .map_err(|e| e.to_string())
6166 .err()
6167 .unwrap_or_else(|| "unexpected".to_string());
6168 assert_eq!(expected, err);
6169
6170 Ok(())
6171 }
6172
6173 #[test]
6174 fn test_avro_3851_validate_default_value_of_fixed_record_field() -> TestResult {
6175 let schema_str = r#"
6176 {
6177 "name": "record1",
6178 "namespace": "ns",
6179 "type": "record",
6180 "fields": [
6181 {
6182 "name": "f1",
6183 "type": {
6184 "name": "fixed1",
6185 "type": "fixed",
6186 "size": 3
6187 },
6188 "default": 100
6189 }
6190 ]
6191 }
6192 "#;
6193 let expected = Details::GetDefaultRecordField(
6194 "f1".to_string(),
6195 "ns.record1".to_string(),
6196 r#"{"name":"ns.fixed1","type":"fixed","size":3}"#.to_string(),
6197 )
6198 .to_string();
6199 let result = Schema::parse_str(schema_str);
6200 assert!(result.is_err());
6201 let err = result
6202 .map_err(|e| e.to_string())
6203 .err()
6204 .unwrap_or_else(|| "unexpected".to_string());
6205 assert_eq!(expected, err);
6206
6207 Ok(())
6208 }
6209
6210 #[test]
6211 fn test_avro_3851_validate_default_value_of_array_record_field() -> TestResult {
6212 let schema_str = r#"
6213 {
6214 "name": "record1",
6215 "namespace": "ns",
6216 "type": "record",
6217 "fields": [
6218 {
6219 "name": "f1",
6220 "type": "array",
6221 "items": "int",
6222 "default": "invalid"
6223 }
6224 ]
6225 }
6226 "#;
6227 let expected = Details::GetDefaultRecordField(
6228 "f1".to_string(),
6229 "ns.record1".to_string(),
6230 r#"{"type":"array","items":"int"}"#.to_string(),
6231 )
6232 .to_string();
6233 let result = Schema::parse_str(schema_str);
6234 assert!(result.is_err());
6235 let err = result
6236 .map_err(|e| e.to_string())
6237 .err()
6238 .unwrap_or_else(|| "unexpected".to_string());
6239 assert_eq!(expected, err);
6240
6241 Ok(())
6242 }
6243
6244 #[test]
6245 fn test_avro_3851_validate_default_value_of_map_record_field() -> TestResult {
6246 let schema_str = r#"
6247 {
6248 "name": "record1",
6249 "namespace": "ns",
6250 "type": "record",
6251 "fields": [
6252 {
6253 "name": "f1",
6254 "type": "map",
6255 "values": "string",
6256 "default": "invalid"
6257 }
6258 ]
6259 }
6260 "#;
6261 let expected = Details::GetDefaultRecordField(
6262 "f1".to_string(),
6263 "ns.record1".to_string(),
6264 r#"{"type":"map","values":"string"}"#.to_string(),
6265 )
6266 .to_string();
6267 let result = Schema::parse_str(schema_str);
6268 assert!(result.is_err());
6269 let err = result
6270 .map_err(|e| e.to_string())
6271 .err()
6272 .unwrap_or_else(|| "unexpected".to_string());
6273 assert_eq!(expected, err);
6274
6275 Ok(())
6276 }
6277
6278 #[test]
6279 fn test_avro_3851_validate_default_value_of_ref_record_field() -> TestResult {
6280 let schema_str = r#"
6281 {
6282 "name": "record1",
6283 "namespace": "ns",
6284 "type": "record",
6285 "fields": [
6286 {
6287 "name": "f1",
6288 "type": {
6289 "name": "record2",
6290 "type": "record",
6291 "fields": [
6292 {
6293 "name": "f1_1",
6294 "type": "int"
6295 }
6296 ]
6297 }
6298 }, {
6299 "name": "f2",
6300 "type": "ns.record2",
6301 "default": { "f1_1": true }
6302 }
6303 ]
6304 }
6305 "#;
6306 let expected = Details::GetDefaultRecordField(
6307 "f2".to_string(),
6308 "ns.record1".to_string(),
6309 r#""ns.record2""#.to_string(),
6310 )
6311 .to_string();
6312 let result = Schema::parse_str(schema_str);
6313 assert!(result.is_err());
6314 let err = result
6315 .map_err(|e| e.to_string())
6316 .err()
6317 .unwrap_or_else(|| "unexpected".to_string());
6318 assert_eq!(expected, err);
6319
6320 Ok(())
6321 }
6322
6323 #[test]
6324 fn test_avro_3851_validate_default_value_of_enum() -> TestResult {
6325 let schema_str = r#"
6326 {
6327 "name": "enum1",
6328 "namespace": "ns",
6329 "type": "enum",
6330 "symbols": ["a", "b", "c"],
6331 "default": 100
6332 }
6333 "#;
6334 let expected = Details::EnumDefaultWrongType(100.into()).to_string();
6335 let result = Schema::parse_str(schema_str);
6336 assert!(result.is_err());
6337 let err = result
6338 .map_err(|e| e.to_string())
6339 .err()
6340 .unwrap_or_else(|| "unexpected".to_string());
6341 assert_eq!(expected, err);
6342
6343 let schema_str = r#"
6344 {
6345 "name": "enum1",
6346 "namespace": "ns",
6347 "type": "enum",
6348 "symbols": ["a", "b", "c"],
6349 "default": "d"
6350 }
6351 "#;
6352 let expected = Details::GetEnumDefault {
6353 symbol: "d".to_string(),
6354 symbols: vec!["a".to_string(), "b".to_string(), "c".to_string()],
6355 }
6356 .to_string();
6357 let result = Schema::parse_str(schema_str);
6358 assert!(result.is_err());
6359 let err = result
6360 .map_err(|e| e.to_string())
6361 .err()
6362 .unwrap_or_else(|| "unexpected".to_string());
6363 assert_eq!(expected, err);
6364
6365 Ok(())
6366 }
6367
6368 #[test]
6369 fn test_avro_3862_get_aliases() -> TestResult {
6370 let schema_str = r#"
6372 {
6373 "name": "record1",
6374 "namespace": "ns1",
6375 "type": "record",
6376 "aliases": ["r1", "ns2.r2"],
6377 "fields": [
6378 { "name": "f1", "type": "int" },
6379 { "name": "f2", "type": "string" }
6380 ]
6381 }
6382 "#;
6383 let schema = Schema::parse_str(schema_str)?;
6384 let expected = vec![Alias::new("ns1.r1")?, Alias::new("ns2.r2")?];
6385 match schema.aliases() {
6386 Some(aliases) => assert_eq!(aliases, &expected),
6387 None => panic!("Expected Some({expected:?}), got None"),
6388 }
6389
6390 let schema_str = r#"
6391 {
6392 "name": "record1",
6393 "namespace": "ns1",
6394 "type": "record",
6395 "fields": [
6396 { "name": "f1", "type": "int" },
6397 { "name": "f2", "type": "string" }
6398 ]
6399 }
6400 "#;
6401 let schema = Schema::parse_str(schema_str)?;
6402 match schema.aliases() {
6403 None => (),
6404 some => panic!("Expected None, got {some:?}"),
6405 }
6406
6407 let schema_str = r#"
6409 {
6410 "name": "enum1",
6411 "namespace": "ns1",
6412 "type": "enum",
6413 "aliases": ["en1", "ns2.en2"],
6414 "symbols": ["a", "b", "c"]
6415 }
6416 "#;
6417 let schema = Schema::parse_str(schema_str)?;
6418 let expected = vec![Alias::new("ns1.en1")?, Alias::new("ns2.en2")?];
6419 match schema.aliases() {
6420 Some(aliases) => assert_eq!(aliases, &expected),
6421 None => panic!("Expected Some({expected:?}), got None"),
6422 }
6423
6424 let schema_str = r#"
6425 {
6426 "name": "enum1",
6427 "namespace": "ns1",
6428 "type": "enum",
6429 "symbols": ["a", "b", "c"]
6430 }
6431 "#;
6432 let schema = Schema::parse_str(schema_str)?;
6433 match schema.aliases() {
6434 None => (),
6435 some => panic!("Expected None, got {some:?}"),
6436 }
6437
6438 let schema_str = r#"
6440 {
6441 "name": "fixed1",
6442 "namespace": "ns1",
6443 "type": "fixed",
6444 "aliases": ["fx1", "ns2.fx2"],
6445 "size": 10
6446 }
6447 "#;
6448 let schema = Schema::parse_str(schema_str)?;
6449 let expected = vec![Alias::new("ns1.fx1")?, Alias::new("ns2.fx2")?];
6450 match schema.aliases() {
6451 Some(aliases) => assert_eq!(aliases, &expected),
6452 None => panic!("Expected Some({expected:?}), got None"),
6453 }
6454
6455 let schema_str = r#"
6456 {
6457 "name": "fixed1",
6458 "namespace": "ns1",
6459 "type": "fixed",
6460 "size": 10
6461 }
6462 "#;
6463 let schema = Schema::parse_str(schema_str)?;
6464 match schema.aliases() {
6465 None => (),
6466 some => panic!("Expected None, got {some:?}"),
6467 }
6468
6469 let schema = Schema::Int;
6471 match schema.aliases() {
6472 None => (),
6473 some => panic!("Expected None, got {some:?}"),
6474 }
6475
6476 Ok(())
6477 }
6478
6479 #[test]
6480 fn test_avro_3862_get_doc() -> TestResult {
6481 let schema_str = r#"
6483 {
6484 "name": "record1",
6485 "type": "record",
6486 "doc": "Record Document",
6487 "fields": [
6488 { "name": "f1", "type": "int" },
6489 { "name": "f2", "type": "string" }
6490 ]
6491 }
6492 "#;
6493 let schema = Schema::parse_str(schema_str)?;
6494 let expected = "Record Document";
6495 match schema.doc() {
6496 Some(doc) => assert_eq!(doc, expected),
6497 None => panic!("Expected Some({expected:?}), got None"),
6498 }
6499
6500 let schema_str = r#"
6501 {
6502 "name": "record1",
6503 "type": "record",
6504 "fields": [
6505 { "name": "f1", "type": "int" },
6506 { "name": "f2", "type": "string" }
6507 ]
6508 }
6509 "#;
6510 let schema = Schema::parse_str(schema_str)?;
6511 match schema.doc() {
6512 None => (),
6513 some => panic!("Expected None, got {some:?}"),
6514 }
6515
6516 let schema_str = r#"
6518 {
6519 "name": "enum1",
6520 "type": "enum",
6521 "doc": "Enum Document",
6522 "symbols": ["a", "b", "c"]
6523 }
6524 "#;
6525 let schema = Schema::parse_str(schema_str)?;
6526 let expected = "Enum Document";
6527 match schema.doc() {
6528 Some(doc) => assert_eq!(doc, expected),
6529 None => panic!("Expected Some({expected:?}), got None"),
6530 }
6531
6532 let schema_str = r#"
6533 {
6534 "name": "enum1",
6535 "type": "enum",
6536 "symbols": ["a", "b", "c"]
6537 }
6538 "#;
6539 let schema = Schema::parse_str(schema_str)?;
6540 match schema.doc() {
6541 None => (),
6542 some => panic!("Expected None, got {some:?}"),
6543 }
6544
6545 let schema_str = r#"
6547 {
6548 "name": "fixed1",
6549 "type": "fixed",
6550 "doc": "Fixed Document",
6551 "size": 10
6552 }
6553 "#;
6554 let schema = Schema::parse_str(schema_str)?;
6555 let expected = "Fixed Document";
6556 match schema.doc() {
6557 Some(doc) => assert_eq!(doc, expected),
6558 None => panic!("Expected Some({expected:?}), got None"),
6559 }
6560
6561 let schema_str = r#"
6562 {
6563 "name": "fixed1",
6564 "type": "fixed",
6565 "size": 10
6566 }
6567 "#;
6568 let schema = Schema::parse_str(schema_str)?;
6569 match schema.doc() {
6570 None => (),
6571 some => panic!("Expected None, got {some:?}"),
6572 }
6573
6574 let schema = Schema::Int;
6576 match schema.doc() {
6577 None => (),
6578 some => panic!("Expected None, got {some:?}"),
6579 }
6580
6581 Ok(())
6582 }
6583
6584 #[test]
6585 fn avro_3886_serialize_attributes() -> TestResult {
6586 let attributes = BTreeMap::from([
6587 ("string_key".into(), "value".into()),
6588 ("number_key".into(), 1.23.into()),
6589 ("null_key".into(), Value::Null),
6590 (
6591 "array_key".into(),
6592 Value::Array(vec![1.into(), 2.into(), 3.into()]),
6593 ),
6594 ("object_key".into(), Value::Object(Map::default())),
6595 ]);
6596
6597 let schema = Schema::Enum(EnumSchema {
6599 name: Name::new("a")?,
6600 aliases: None,
6601 doc: None,
6602 symbols: vec![],
6603 default: None,
6604 attributes: attributes.clone(),
6605 });
6606 let serialized = serde_json::to_string(&schema)?;
6607 assert_eq!(
6608 r#"{"type":"enum","name":"a","symbols":[],"array_key":[1,2,3],"null_key":null,"number_key":1.23,"object_key":{},"string_key":"value"}"#,
6609 &serialized
6610 );
6611
6612 let schema = Schema::Fixed(FixedSchema {
6614 name: Name::new("a")?,
6615 aliases: None,
6616 doc: None,
6617 size: 1,
6618 default: None,
6619 attributes: attributes.clone(),
6620 });
6621 let serialized = serde_json::to_string(&schema)?;
6622 assert_eq!(
6623 r#"{"type":"fixed","name":"a","size":1,"array_key":[1,2,3],"null_key":null,"number_key":1.23,"object_key":{},"string_key":"value"}"#,
6624 &serialized
6625 );
6626
6627 let schema = Schema::Record(RecordSchema {
6629 name: Name::new("a")?,
6630 aliases: None,
6631 doc: None,
6632 fields: vec![],
6633 lookup: BTreeMap::new(),
6634 attributes,
6635 });
6636 let serialized = serde_json::to_string(&schema)?;
6637 assert_eq!(
6638 r#"{"type":"record","name":"a","fields":[],"array_key":[1,2,3],"null_key":null,"number_key":1.23,"object_key":{},"string_key":"value"}"#,
6639 &serialized
6640 );
6641
6642 Ok(())
6643 }
6644
6645 #[test]
6648 fn test_avro_3897_funny_valid_names_and_namespaces() -> TestResult {
6649 for funny_name in ["_", "_._", "__._", "_.__", "_._._"] {
6650 let name = Name::new(funny_name);
6651 assert!(name.is_ok());
6652 }
6653 Ok(())
6654 }
6655
6656 #[test]
6657 fn test_avro_3896_decimal_schema() -> TestResult {
6658 let schema = json!(
6660 {
6661 "type": "bytes",
6662 "name": "BytesDecimal",
6663 "logicalType": "decimal",
6664 "size": 38,
6665 "precision": 9,
6666 "scale": 2
6667 });
6668 let parse_result = Schema::parse(&schema)?;
6669 assert!(matches!(
6670 parse_result,
6671 Schema::Decimal(DecimalSchema {
6672 precision: 9,
6673 scale: 2,
6674 ..
6675 })
6676 ));
6677
6678 let schema = json!(
6680 {
6681 "type": "long",
6682 "name": "LongDecimal",
6683 "logicalType": "decimal"
6684 });
6685 let parse_result = Schema::parse(&schema)?;
6686 assert_eq!(parse_result, Schema::Long);
6688
6689 Ok(())
6690 }
6691
6692 #[test]
6693 fn avro_3896_uuid_schema_for_string() -> TestResult {
6694 let schema = json!(
6696 {
6697 "type": "string",
6698 "name": "StringUUID",
6699 "logicalType": "uuid"
6700 });
6701 let parse_result = Schema::parse(&schema)?;
6702 assert_eq!(parse_result, Schema::Uuid);
6703
6704 Ok(())
6705 }
6706
6707 #[test]
6708 #[serial(serde_is_human_readable)]
6709 fn avro_rs_53_uuid_with_fixed() -> TestResult {
6710 #[derive(Debug, Serialize, Deserialize)]
6711 struct Comment {
6712 id: crate::Uuid,
6713 }
6714
6715 impl AvroSchema for Comment {
6716 fn get_schema() -> Schema {
6717 Schema::parse_str(
6718 r#"{
6719 "type" : "record",
6720 "name" : "Comment",
6721 "fields" : [ {
6722 "name" : "id",
6723 "type" : {
6724 "type" : "fixed",
6725 "size" : 16,
6726 "logicalType" : "uuid",
6727 "name": "FixedUUID"
6728 }
6729 } ]
6730 }"#,
6731 )
6732 .expect("Invalid Comment Avro schema")
6733 }
6734 }
6735
6736 let payload = Comment {
6737 id: "de2df598-9948-4988-b00a-a41c0e287398".parse()?,
6738 };
6739 let mut buffer = Vec::new();
6740
6741 crate::util::SERDE_HUMAN_READABLE.store(true, Ordering::Release);
6743 let bytes = SpecificSingleObjectWriter::<Comment>::with_capacity(64)?
6744 .write_ref(&payload, &mut buffer)?;
6745 assert_eq!(bytes, 47);
6746
6747 crate::util::SERDE_HUMAN_READABLE.store(false, Ordering::Release);
6749 let bytes = SpecificSingleObjectWriter::<Comment>::with_capacity(64)?
6750 .write_ref(&payload, &mut buffer)?;
6751 assert_eq!(bytes, 27);
6752
6753 Ok(())
6754 }
6755
6756 #[test]
6757 fn avro_3926_uuid_schema_for_fixed_with_size_16() -> TestResult {
6758 let schema = json!(
6759 {
6760 "type": "fixed",
6761 "name": "FixedUUID",
6762 "size": 16,
6763 "logicalType": "uuid"
6764 });
6765 let parse_result = Schema::parse(&schema)?;
6766 assert_eq!(parse_result, Schema::Uuid);
6767 assert_not_logged(
6768 r#"Ignoring uuid logical type for a Fixed schema because its size (6) is not 16! Schema: Fixed(FixedSchema { name: Name { name: "FixedUUID", namespace: None }, aliases: None, doc: None, size: 6, attributes: {"logicalType": String("uuid")} })"#,
6769 );
6770
6771 Ok(())
6772 }
6773
6774 #[test]
6775 fn avro_3926_uuid_schema_for_fixed_with_size_different_than_16() -> TestResult {
6776 let schema = json!(
6777 {
6778 "type": "fixed",
6779 "name": "FixedUUID",
6780 "size": 6,
6781 "logicalType": "uuid"
6782 });
6783 let parse_result = Schema::parse(&schema)?;
6784
6785 assert_eq!(
6786 parse_result,
6787 Schema::Fixed(FixedSchema {
6788 name: Name::new("FixedUUID")?,
6789 aliases: None,
6790 doc: None,
6791 size: 6,
6792 default: None,
6793 attributes: BTreeMap::from([("logicalType".to_string(), "uuid".into())]),
6794 })
6795 );
6796 assert_logged(
6797 r#"Ignoring uuid logical type for a Fixed schema because its size (6) is not 16! Schema: Fixed(FixedSchema { name: Name { name: "FixedUUID", namespace: None }, aliases: None, doc: None, size: 6, default: None, attributes: {"logicalType": String("uuid")} })"#,
6798 );
6799
6800 Ok(())
6801 }
6802
6803 #[test]
6804 fn test_avro_3896_timestamp_millis_schema() -> TestResult {
6805 let schema = json!(
6807 {
6808 "type": "long",
6809 "name": "LongTimestampMillis",
6810 "logicalType": "timestamp-millis"
6811 });
6812 let parse_result = Schema::parse(&schema)?;
6813 assert_eq!(parse_result, Schema::TimestampMillis);
6814
6815 let schema = json!(
6817 {
6818 "type": "int",
6819 "name": "IntTimestampMillis",
6820 "logicalType": "timestamp-millis"
6821 });
6822 let parse_result = Schema::parse(&schema)?;
6823 assert_eq!(parse_result, Schema::Int);
6824
6825 Ok(())
6826 }
6827
6828 #[test]
6829 fn test_avro_3896_custom_bytes_schema() -> TestResult {
6830 let schema = json!(
6832 {
6833 "type": "bytes",
6834 "name": "BytesLog",
6835 "logicalType": "custom"
6836 });
6837 let parse_result = Schema::parse(&schema)?;
6838 assert_eq!(parse_result, Schema::Bytes);
6839 assert_eq!(parse_result.custom_attributes(), None);
6840
6841 Ok(())
6842 }
6843
6844 #[test]
6845 fn test_avro_3899_parse_decimal_type() -> TestResult {
6846 let schema = Schema::parse_str(
6847 r#"{
6848 "name": "InvalidDecimal",
6849 "type": "fixed",
6850 "size": 16,
6851 "logicalType": "decimal",
6852 "precision": 2,
6853 "scale": 3
6854 }"#,
6855 )?;
6856 match schema {
6857 Schema::Fixed(fixed_schema) => {
6858 let attrs = fixed_schema.attributes;
6859 let precision = attrs
6860 .get("precision")
6861 .expect("The 'precision' attribute is missing");
6862 let scale = attrs
6863 .get("scale")
6864 .expect("The 'scale' attribute is missing");
6865 assert_logged(&format!(
6866 "Ignoring invalid decimal logical type: The decimal precision ({precision}) must be bigger or equal to the scale ({scale})"
6867 ));
6868 }
6869 _ => unreachable!("Expected Schema::Fixed, got {:?}", schema),
6870 }
6871
6872 let schema = Schema::parse_str(
6873 r#"{
6874 "name": "ValidDecimal",
6875 "type": "bytes",
6876 "logicalType": "decimal",
6877 "precision": 3,
6878 "scale": 2
6879 }"#,
6880 )?;
6881 match schema {
6882 Schema::Decimal(_) => {
6883 assert_not_logged(
6884 "Ignoring invalid decimal logical type: The decimal precision (2) must be bigger or equal to the scale (3)",
6885 );
6886 }
6887 _ => unreachable!("Expected Schema::Decimal, got {:?}", schema),
6888 }
6889
6890 Ok(())
6891 }
6892
6893 #[test]
6894 fn avro_3920_serialize_record_with_custom_attributes() -> TestResult {
6895 let expected = {
6896 let mut lookup = BTreeMap::new();
6897 lookup.insert("value".to_owned(), 0);
6898 Schema::Record(RecordSchema {
6899 name: Name {
6900 name: "LongList".to_owned(),
6901 namespace: None,
6902 },
6903 aliases: Some(vec![Alias::new("LinkedLongs").unwrap()]),
6904 doc: None,
6905 fields: vec![RecordField {
6906 name: "value".to_string(),
6907 doc: None,
6908 default: None,
6909 aliases: None,
6910 schema: Schema::Long,
6911 order: RecordFieldOrder::Ascending,
6912 position: 0,
6913 custom_attributes: BTreeMap::from([("field-id".to_string(), 1.into())]),
6914 }],
6915 lookup,
6916 attributes: BTreeMap::from([("custom-attribute".to_string(), "value".into())]),
6917 })
6918 };
6919
6920 let value = serde_json::to_value(&expected)?;
6921 let serialized = serde_json::to_string(&value)?;
6922 assert_eq!(
6923 r#"{"aliases":["LinkedLongs"],"custom-attribute":"value","fields":[{"field-id":1,"name":"value","type":"long"}],"name":"LongList","type":"record"}"#,
6924 &serialized
6925 );
6926 assert_eq!(expected, Schema::parse_str(&serialized)?);
6927
6928 Ok(())
6929 }
6930
6931 #[test]
6932 fn test_avro_3925_serialize_decimal_inner_fixed() -> TestResult {
6933 let schema = Schema::Decimal(DecimalSchema {
6934 precision: 36,
6935 scale: 10,
6936 inner: Box::new(Schema::Fixed(FixedSchema {
6937 name: Name::new("decimal_36_10").unwrap(),
6938 aliases: None,
6939 doc: None,
6940 size: 16,
6941 default: None,
6942 attributes: Default::default(),
6943 })),
6944 });
6945
6946 let serialized_json = serde_json::to_string_pretty(&schema)?;
6947
6948 let expected_json = r#"{
6949 "type": "fixed",
6950 "name": "decimal_36_10",
6951 "size": 16,
6952 "logicalType": "decimal",
6953 "scale": 10,
6954 "precision": 36
6955}"#;
6956
6957 assert_eq!(serialized_json, expected_json);
6958
6959 Ok(())
6960 }
6961
6962 #[test]
6963 fn test_avro_3925_serialize_decimal_inner_bytes() -> TestResult {
6964 let schema = Schema::Decimal(DecimalSchema {
6965 precision: 36,
6966 scale: 10,
6967 inner: Box::new(Schema::Bytes),
6968 });
6969
6970 let serialized_json = serde_json::to_string_pretty(&schema)?;
6971
6972 let expected_json = r#"{
6973 "type": "bytes",
6974 "logicalType": "decimal",
6975 "scale": 10,
6976 "precision": 36
6977}"#;
6978
6979 assert_eq!(serialized_json, expected_json);
6980
6981 Ok(())
6982 }
6983
6984 #[test]
6985 fn test_avro_3925_serialize_decimal_inner_invalid() -> TestResult {
6986 let schema = Schema::Decimal(DecimalSchema {
6987 precision: 36,
6988 scale: 10,
6989 inner: Box::new(Schema::String),
6990 });
6991
6992 let serialized_json = serde_json::to_string_pretty(&schema);
6993
6994 assert!(serialized_json.is_err());
6995
6996 Ok(())
6997 }
6998
6999 #[test]
7000 fn test_avro_3927_serialize_array_with_custom_attributes() -> TestResult {
7001 let expected = Schema::array_with_attributes(
7002 Schema::Long,
7003 BTreeMap::from([("field-id".to_string(), "1".into())]),
7004 );
7005
7006 let value = serde_json::to_value(&expected)?;
7007 let serialized = serde_json::to_string(&value)?;
7008 assert_eq!(
7009 r#"{"field-id":"1","items":"long","type":"array"}"#,
7010 &serialized
7011 );
7012 let actual_schema = Schema::parse_str(&serialized)?;
7013 assert_eq!(expected, actual_schema);
7014 assert_eq!(
7015 expected.custom_attributes(),
7016 actual_schema.custom_attributes()
7017 );
7018
7019 Ok(())
7020 }
7021
7022 #[test]
7023 fn test_avro_3927_serialize_map_with_custom_attributes() -> TestResult {
7024 let expected = Schema::map_with_attributes(
7025 Schema::Long,
7026 BTreeMap::from([("field-id".to_string(), "1".into())]),
7027 );
7028
7029 let value = serde_json::to_value(&expected)?;
7030 let serialized = serde_json::to_string(&value)?;
7031 assert_eq!(
7032 r#"{"field-id":"1","type":"map","values":"long"}"#,
7033 &serialized
7034 );
7035 let actual_schema = Schema::parse_str(&serialized)?;
7036 assert_eq!(expected, actual_schema);
7037 assert_eq!(
7038 expected.custom_attributes(),
7039 actual_schema.custom_attributes()
7040 );
7041
7042 Ok(())
7043 }
7044
7045 #[test]
7046 fn avro_3928_parse_int_based_schema_with_default() -> TestResult {
7047 let schema = r#"
7048 {
7049 "type": "record",
7050 "name": "DateLogicalType",
7051 "fields": [ {
7052 "name": "birthday",
7053 "type": {"type": "int", "logicalType": "date"},
7054 "default": 1681601653
7055 } ]
7056 }"#;
7057
7058 match Schema::parse_str(schema)? {
7059 Schema::Record(record_schema) => {
7060 assert_eq!(record_schema.fields.len(), 1);
7061 let field = record_schema.fields.first().unwrap();
7062 assert_eq!(field.name, "birthday");
7063 assert_eq!(field.schema, Schema::Date);
7064 assert_eq!(
7065 types::Value::from(field.default.clone().unwrap()),
7066 types::Value::Int(1681601653)
7067 );
7068 }
7069 _ => unreachable!("Expected Schema::Record"),
7070 }
7071
7072 Ok(())
7073 }
7074
7075 #[test]
7076 fn avro_3946_union_with_single_type() -> TestResult {
7077 let schema = r#"
7078 {
7079 "type": "record",
7080 "name": "Issue",
7081 "namespace": "invalid.example",
7082 "fields": [
7083 {
7084 "name": "myField",
7085 "type": ["long"]
7086 }
7087 ]
7088 }"#;
7089
7090 let _ = Schema::parse_str(schema)?;
7091
7092 assert_logged(
7093 "Union schema with just one member! Consider dropping the union! \
7094 Please enable debug logging to find out which Record schema \
7095 declares the union with 'RUST_LOG=apache_avro::schema=debug'.",
7096 );
7097
7098 Ok(())
7099 }
7100
7101 #[test]
7102 fn avro_3946_union_without_any_types() -> TestResult {
7103 let schema = r#"
7104 {
7105 "type": "record",
7106 "name": "Issue",
7107 "namespace": "invalid.example",
7108 "fields": [
7109 {
7110 "name": "myField",
7111 "type": []
7112 }
7113 ]
7114 }"#;
7115
7116 let _ = Schema::parse_str(schema)?;
7117
7118 assert_logged(
7119 "Union schemas should have at least two members! \
7120 Please enable debug logging to find out which Record schema \
7121 declares the union with 'RUST_LOG=apache_avro::schema=debug'.",
7122 );
7123
7124 Ok(())
7125 }
7126
7127 #[test]
7128 fn avro_3965_fixed_schema_with_default_bigger_than_size() -> TestResult {
7129 match Schema::parse_str(
7130 r#"{
7131 "type": "fixed",
7132 "name": "test",
7133 "size": 1,
7134 "default": "123456789"
7135 }"#,
7136 ) {
7137 Ok(_schema) => panic!("Must fail!"),
7138 Err(err) => {
7139 assert_eq!(
7140 err.to_string(),
7141 "Fixed schema's default value length (9) does not match its size (1)"
7142 );
7143 }
7144 }
7145
7146 Ok(())
7147 }
7148
7149 #[test]
7150 fn avro_4004_canonical_form_strip_logical_types() -> TestResult {
7151 let schema_str = r#"
7152 {
7153 "type": "record",
7154 "name": "test",
7155 "fields": [
7156 {"name": "a", "type": "long", "default": 42, "doc": "The field a"},
7157 {"name": "b", "type": "string", "namespace": "test.a"},
7158 {"name": "c", "type": "long", "logicalType": "timestamp-micros"}
7159 ]
7160 }"#;
7161
7162 let schema = Schema::parse_str(schema_str)?;
7163 let canonical_form = schema.canonical_form();
7164 let fp_rabin = schema.fingerprint::<Rabin>();
7165 assert_eq!(
7166 r#"{"name":"test","type":"record","fields":[{"name":"a","type":"long"},{"name":"b","type":"string"},{"name":"c","type":{"type":"long"}}]}"#,
7167 canonical_form
7168 );
7169 assert_eq!("92f2ccef718c6754", fp_rabin.to_string());
7170 Ok(())
7171 }
7172
7173 #[test]
7174 fn avro_4055_should_fail_to_parse_invalid_schema() -> TestResult {
7175 let invalid_schema_str = r#"
7177 {
7178 "type": "record",
7179 "name": "SampleSchema",
7180 "fields": [
7181 {
7182 "name": "order",
7183 "type": "record",
7184 "fields": [
7185 {
7186 "name": "order_number",
7187 "type": ["null", "string"],
7188 "default": null
7189 },
7190 { "name": "order_date", "type": "string" }
7191 ]
7192 }
7193 ]
7194 }"#;
7195
7196 let schema = Schema::parse_str(invalid_schema_str);
7197 assert!(schema.is_err());
7198 assert_eq!(
7199 schema.unwrap_err().to_string(),
7200 "Invalid schema: There is no type called 'record', if you meant to define a non-primitive schema, it should be defined inside `type` attribute. Please review the specification"
7201 );
7202
7203 let valid_schema = r#"
7204 {
7205 "type": "record",
7206 "name": "SampleSchema",
7207 "fields": [
7208 {
7209 "name": "order",
7210 "type": {
7211 "type": "record",
7212 "name": "Order",
7213 "fields": [
7214 {
7215 "name": "order_number",
7216 "type": ["null", "string"],
7217 "default": null
7218 },
7219 { "name": "order_date", "type": "string" }
7220 ]
7221 }
7222 }
7223 ]
7224 }"#;
7225 let schema = Schema::parse_str(valid_schema);
7226 assert!(schema.is_ok());
7227
7228 Ok(())
7229 }
7230}