1use crate::{
20 error::Error,
21 schema_equality, types,
22 util::MapHelper,
23 validator::{
24 validate_enum_symbol_name, validate_namespace, validate_record_field_name,
25 validate_schema_name,
26 },
27 AvroResult,
28};
29use digest::Digest;
30use log::{debug, error, warn};
31use serde::{
32 ser::{SerializeMap, SerializeSeq},
33 Deserialize, Serialize, Serializer,
34};
35use serde_json::{Map, Value};
36use std::{
37 borrow::Borrow,
38 collections::{BTreeMap, HashMap, HashSet},
39 fmt,
40 fmt::Debug,
41 hash::Hash,
42 io::Read,
43 str::FromStr,
44};
45use strum_macros::{Display, EnumDiscriminants, EnumString};
46
47pub struct SchemaFingerprint {
51 pub bytes: Vec<u8>,
52}
53
54impl fmt::Display for SchemaFingerprint {
55 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
56 write!(
57 f,
58 "{}",
59 self.bytes
60 .iter()
61 .map(|byte| format!("{byte:02x}"))
62 .collect::<Vec<String>>()
63 .join("")
64 )
65 }
66}
67
68#[derive(Clone, Debug, EnumDiscriminants, Display)]
72#[strum_discriminants(name(SchemaKind), derive(Hash, Ord, PartialOrd))]
73pub enum Schema {
74 Null,
76 Boolean,
78 Int,
80 Long,
82 Float,
84 Double,
86 Bytes,
89 String,
92 Array(ArraySchema),
95 Map(MapSchema),
99 Union(UnionSchema),
101 Record(RecordSchema),
103 Enum(EnumSchema),
105 Fixed(FixedSchema),
107 Decimal(DecimalSchema),
110 BigDecimal,
113 Uuid,
115 Date,
118 TimeMillis,
121 TimeMicros,
124 TimestampMillis,
126 TimestampMicros,
128 TimestampNanos,
130 LocalTimestampMillis,
132 LocalTimestampMicros,
134 LocalTimestampNanos,
136 Duration,
138 Ref { name: Name },
140}
141
142#[derive(Clone, Debug, PartialEq)]
143pub struct MapSchema {
144 pub types: Box<Schema>,
145 pub attributes: BTreeMap<String, Value>,
146}
147
148#[derive(Clone, Debug, PartialEq)]
149pub struct ArraySchema {
150 pub items: Box<Schema>,
151 pub attributes: BTreeMap<String, Value>,
152}
153
154impl PartialEq for Schema {
155 fn eq(&self, other: &Self) -> bool {
160 schema_equality::compare_schemata(self, other)
161 }
162}
163
164impl SchemaKind {
165 pub fn is_primitive(self) -> bool {
166 matches!(
167 self,
168 SchemaKind::Null
169 | SchemaKind::Boolean
170 | SchemaKind::Int
171 | SchemaKind::Long
172 | SchemaKind::Double
173 | SchemaKind::Float
174 | SchemaKind::Bytes
175 | SchemaKind::String,
176 )
177 }
178
179 pub fn is_named(self) -> bool {
180 matches!(
181 self,
182 SchemaKind::Record | SchemaKind::Enum | SchemaKind::Fixed | SchemaKind::Ref
183 )
184 }
185}
186
187impl From<&types::Value> for SchemaKind {
188 fn from(value: &types::Value) -> Self {
189 use crate::types::Value;
190 match value {
191 Value::Null => Self::Null,
192 Value::Boolean(_) => Self::Boolean,
193 Value::Int(_) => Self::Int,
194 Value::Long(_) => Self::Long,
195 Value::Float(_) => Self::Float,
196 Value::Double(_) => Self::Double,
197 Value::Bytes(_) => Self::Bytes,
198 Value::String(_) => Self::String,
199 Value::Array(_) => Self::Array,
200 Value::Map(_) => Self::Map,
201 Value::Union(_, _) => Self::Union,
202 Value::Record(_) => Self::Record,
203 Value::Enum(_, _) => Self::Enum,
204 Value::Fixed(_, _) => Self::Fixed,
205 Value::Decimal { .. } => Self::Decimal,
206 Value::BigDecimal(_) => Self::BigDecimal,
207 Value::Uuid(_) => Self::Uuid,
208 Value::Date(_) => Self::Date,
209 Value::TimeMillis(_) => Self::TimeMillis,
210 Value::TimeMicros(_) => Self::TimeMicros,
211 Value::TimestampMillis(_) => Self::TimestampMillis,
212 Value::TimestampMicros(_) => Self::TimestampMicros,
213 Value::TimestampNanos(_) => Self::TimestampNanos,
214 Value::LocalTimestampMillis(_) => Self::LocalTimestampMillis,
215 Value::LocalTimestampMicros(_) => Self::LocalTimestampMicros,
216 Value::LocalTimestampNanos(_) => Self::LocalTimestampNanos,
217 Value::Duration { .. } => Self::Duration,
218 }
219 }
220}
221
222#[derive(Clone, Debug, Hash, PartialEq, Eq)]
233pub struct Name {
234 pub name: String,
235 pub namespace: Namespace,
236}
237
238pub type Documentation = Option<String>;
240pub type Aliases = Option<Vec<Alias>>;
242pub(crate) type Names = HashMap<Name, Schema>;
244pub type NamesRef<'a> = HashMap<Name, &'a Schema>;
246pub type Namespace = Option<String>;
248
249impl Name {
250 pub fn new(name: &str) -> AvroResult<Self> {
254 let (name, namespace) = Name::get_name_and_namespace(name)?;
255 Ok(Self {
256 name,
257 namespace: namespace.filter(|ns| !ns.is_empty()),
258 })
259 }
260
261 fn get_name_and_namespace(name: &str) -> AvroResult<(String, Namespace)> {
262 validate_schema_name(name)
263 }
264
265 pub(crate) fn parse(
267 complex: &Map<String, Value>,
268 enclosing_namespace: &Namespace,
269 ) -> AvroResult<Self> {
270 let (name, namespace_from_name) = complex
271 .name()
272 .map(|name| Name::get_name_and_namespace(name.as_str()).unwrap())
273 .ok_or(Error::GetNameField)?;
274 let type_name = match complex.get("type") {
276 Some(Value::Object(complex_type)) => complex_type.name().or(None),
277 _ => None,
278 };
279
280 let namespace = namespace_from_name
281 .or_else(|| {
282 complex
283 .string("namespace")
284 .or_else(|| enclosing_namespace.clone())
285 })
286 .filter(|ns| !ns.is_empty());
287
288 if let Some(ref ns) = namespace {
289 validate_namespace(ns)?;
290 }
291
292 Ok(Self {
293 name: type_name.unwrap_or(name),
294 namespace,
295 })
296 }
297
298 pub fn fullname(&self, default_namespace: Namespace) -> String {
303 if self.name.contains('.') {
304 self.name.clone()
305 } else {
306 let namespace = self.namespace.clone().or(default_namespace);
307
308 match namespace {
309 Some(ref namespace) if !namespace.is_empty() => {
310 format!("{}.{}", namespace, self.name)
311 }
312 _ => self.name.clone(),
313 }
314 }
315 }
316
317 pub fn fully_qualified_name(&self, enclosing_namespace: &Namespace) -> Name {
331 Name {
332 name: self.name.clone(),
333 namespace: self
334 .namespace
335 .clone()
336 .or_else(|| enclosing_namespace.clone().filter(|ns| !ns.is_empty())),
337 }
338 }
339}
340
341impl From<&str> for Name {
342 fn from(name: &str) -> Self {
343 Name::new(name).unwrap()
344 }
345}
346
347impl fmt::Display for Name {
348 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
349 f.write_str(&self.fullname(None)[..])
350 }
351}
352
353impl<'de> Deserialize<'de> for Name {
354 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
355 where
356 D: serde::de::Deserializer<'de>,
357 {
358 Value::deserialize(deserializer).and_then(|value| {
359 use serde::de::Error;
360 if let Value::Object(json) = value {
361 Name::parse(&json, &None).map_err(Error::custom)
362 } else {
363 Err(Error::custom(format!("Expected a JSON object: {value:?}")))
364 }
365 })
366 }
367}
368
369#[derive(Clone, Debug, Hash, PartialEq, Eq)]
372pub struct Alias(Name);
373
374impl Alias {
375 pub fn new(name: &str) -> AvroResult<Self> {
376 Name::new(name).map(Self)
377 }
378
379 pub fn name(&self) -> String {
380 self.0.name.clone()
381 }
382
383 pub fn namespace(&self) -> Namespace {
384 self.0.namespace.clone()
385 }
386
387 pub fn fullname(&self, default_namespace: Namespace) -> String {
388 self.0.fullname(default_namespace)
389 }
390
391 pub fn fully_qualified_name(&self, default_namespace: &Namespace) -> Name {
392 self.0.fully_qualified_name(default_namespace)
393 }
394}
395
396impl From<&str> for Alias {
397 fn from(name: &str) -> Self {
398 Alias::new(name).unwrap()
399 }
400}
401
402impl Serialize for Alias {
403 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
404 where
405 S: Serializer,
406 {
407 serializer.serialize_str(&self.fullname(None))
408 }
409}
410
411#[derive(Debug)]
412pub struct ResolvedSchema<'s> {
413 names_ref: NamesRef<'s>,
414 schemata: Vec<&'s Schema>,
415}
416
417impl<'s> TryFrom<&'s Schema> for ResolvedSchema<'s> {
418 type Error = Error;
419
420 fn try_from(schema: &'s Schema) -> AvroResult<Self> {
421 let names = HashMap::new();
422 let mut rs = ResolvedSchema {
423 names_ref: names,
424 schemata: vec![schema],
425 };
426 rs.resolve(rs.get_schemata(), &None, None)?;
427 Ok(rs)
428 }
429}
430
431impl<'s> TryFrom<Vec<&'s Schema>> for ResolvedSchema<'s> {
432 type Error = Error;
433
434 fn try_from(schemata: Vec<&'s Schema>) -> AvroResult<Self> {
435 let names = HashMap::new();
436 let mut rs = ResolvedSchema {
437 names_ref: names,
438 schemata,
439 };
440 rs.resolve(rs.get_schemata(), &None, None)?;
441 Ok(rs)
442 }
443}
444
445impl<'s> ResolvedSchema<'s> {
446 pub fn get_schemata(&self) -> Vec<&'s Schema> {
447 self.schemata.clone()
448 }
449
450 pub fn get_names(&self) -> &NamesRef<'s> {
451 &self.names_ref
452 }
453
454 pub fn new_with_known_schemata<'n>(
458 schemata_to_resolve: Vec<&'s Schema>,
459 enclosing_namespace: &Namespace,
460 known_schemata: &'n NamesRef<'n>,
461 ) -> AvroResult<Self> {
462 let names = HashMap::new();
463 let mut rs = ResolvedSchema {
464 names_ref: names,
465 schemata: schemata_to_resolve,
466 };
467 rs.resolve(rs.get_schemata(), enclosing_namespace, Some(known_schemata))?;
468 Ok(rs)
469 }
470
471 fn resolve<'n>(
472 &mut self,
473 schemata: Vec<&'s Schema>,
474 enclosing_namespace: &Namespace,
475 known_schemata: Option<&'n NamesRef<'n>>,
476 ) -> AvroResult<()> {
477 for schema in schemata {
478 match schema {
479 Schema::Array(schema) => {
480 self.resolve(vec![&schema.items], enclosing_namespace, known_schemata)?
481 }
482 Schema::Map(schema) => {
483 self.resolve(vec![&schema.types], enclosing_namespace, known_schemata)?
484 }
485 Schema::Union(UnionSchema { schemas, .. }) => {
486 for schema in schemas {
487 self.resolve(vec![schema], enclosing_namespace, known_schemata)?
488 }
489 }
490 Schema::Enum(EnumSchema { name, .. }) | Schema::Fixed(FixedSchema { name, .. }) => {
491 let fully_qualified_name = name.fully_qualified_name(enclosing_namespace);
492 if self
493 .names_ref
494 .insert(fully_qualified_name.clone(), schema)
495 .is_some()
496 {
497 return Err(Error::AmbiguousSchemaDefinition(fully_qualified_name));
498 }
499 }
500 Schema::Record(RecordSchema { name, fields, .. }) => {
501 let fully_qualified_name = name.fully_qualified_name(enclosing_namespace);
502 if self
503 .names_ref
504 .insert(fully_qualified_name.clone(), schema)
505 .is_some()
506 {
507 return Err(Error::AmbiguousSchemaDefinition(fully_qualified_name));
508 } else {
509 let record_namespace = fully_qualified_name.namespace;
510 for field in fields {
511 self.resolve(vec![&field.schema], &record_namespace, known_schemata)?
512 }
513 }
514 }
515 Schema::Ref { name } => {
516 let fully_qualified_name = name.fully_qualified_name(enclosing_namespace);
517 if !self.names_ref.contains_key(&fully_qualified_name) {
519 let is_resolved_with_known_schemas = known_schemata
520 .as_ref()
521 .map(|names| names.contains_key(&fully_qualified_name))
522 .unwrap_or(false);
523 if !is_resolved_with_known_schemas {
524 return Err(Error::SchemaResolutionError(fully_qualified_name));
525 }
526 }
527 }
528 _ => (),
529 }
530 }
531 Ok(())
532 }
533}
534
535pub(crate) struct ResolvedOwnedSchema {
536 names: Names,
537 root_schema: Schema,
538}
539
540impl TryFrom<Schema> for ResolvedOwnedSchema {
541 type Error = Error;
542
543 fn try_from(schema: Schema) -> AvroResult<Self> {
544 let names = HashMap::new();
545 let mut rs = ResolvedOwnedSchema {
546 names,
547 root_schema: schema,
548 };
549 resolve_names(&rs.root_schema, &mut rs.names, &None)?;
550 Ok(rs)
551 }
552}
553
554impl ResolvedOwnedSchema {
555 pub(crate) fn get_root_schema(&self) -> &Schema {
556 &self.root_schema
557 }
558 pub(crate) fn get_names(&self) -> &Names {
559 &self.names
560 }
561}
562
563pub(crate) fn resolve_names(
564 schema: &Schema,
565 names: &mut Names,
566 enclosing_namespace: &Namespace,
567) -> AvroResult<()> {
568 match schema {
569 Schema::Array(schema) => resolve_names(&schema.items, names, enclosing_namespace),
570 Schema::Map(schema) => resolve_names(&schema.types, names, enclosing_namespace),
571 Schema::Union(UnionSchema { schemas, .. }) => {
572 for schema in schemas {
573 resolve_names(schema, names, enclosing_namespace)?
574 }
575 Ok(())
576 }
577 Schema::Enum(EnumSchema { name, .. }) | Schema::Fixed(FixedSchema { name, .. }) => {
578 let fully_qualified_name = name.fully_qualified_name(enclosing_namespace);
579 if names
580 .insert(fully_qualified_name.clone(), schema.clone())
581 .is_some()
582 {
583 Err(Error::AmbiguousSchemaDefinition(fully_qualified_name))
584 } else {
585 Ok(())
586 }
587 }
588 Schema::Record(RecordSchema { name, fields, .. }) => {
589 let fully_qualified_name = name.fully_qualified_name(enclosing_namespace);
590 if names
591 .insert(fully_qualified_name.clone(), schema.clone())
592 .is_some()
593 {
594 Err(Error::AmbiguousSchemaDefinition(fully_qualified_name))
595 } else {
596 let record_namespace = fully_qualified_name.namespace;
597 for field in fields {
598 resolve_names(&field.schema, names, &record_namespace)?
599 }
600 Ok(())
601 }
602 }
603 Schema::Ref { name } => {
604 let fully_qualified_name = name.fully_qualified_name(enclosing_namespace);
605 names
606 .get(&fully_qualified_name)
607 .map(|_| ())
608 .ok_or(Error::SchemaResolutionError(fully_qualified_name))
609 }
610 _ => Ok(()),
611 }
612}
613
614pub(crate) fn resolve_names_with_schemata(
615 schemata: &Vec<&Schema>,
616 names: &mut Names,
617 enclosing_namespace: &Namespace,
618) -> AvroResult<()> {
619 for schema in schemata {
620 resolve_names(schema, names, enclosing_namespace)?;
621 }
622 Ok(())
623}
624
625#[derive(bon::Builder, Clone, Debug, PartialEq)]
627pub struct RecordField {
628 pub name: String,
630 #[builder(default)]
632 pub doc: Documentation,
633 pub aliases: Option<Vec<String>>,
635 pub default: Option<Value>,
639 pub schema: Schema,
641 #[builder(default = RecordFieldOrder::Ignore)]
645 pub order: RecordFieldOrder,
646 #[builder(default)]
648 pub position: usize,
649 #[builder(default = BTreeMap::new())]
651 pub custom_attributes: BTreeMap<String, Value>,
652}
653
654#[derive(Clone, Debug, Eq, PartialEq, EnumString)]
656#[strum(serialize_all = "kebab_case")]
657pub enum RecordFieldOrder {
658 Ascending,
659 Descending,
660 Ignore,
661}
662
663impl RecordField {
664 fn parse(
666 field: &Map<String, Value>,
667 position: usize,
668 parser: &mut Parser,
669 enclosing_record: &Name,
670 ) -> AvroResult<Self> {
671 let name = field.name().ok_or(Error::GetNameFieldFromRecord)?;
672
673 validate_record_field_name(&name)?;
674
675 let schema = parser.parse_complex(
677 field,
678 &enclosing_record.namespace,
679 RecordSchemaParseLocation::FromField,
680 )?;
681
682 let default = field.get("default").cloned();
683 Self::resolve_default_value(
684 &schema,
685 &name,
686 &enclosing_record.fullname(None),
687 &parser.parsed_schemas,
688 &default,
689 )?;
690
691 let aliases = field.get("aliases").and_then(|aliases| {
692 aliases.as_array().map(|aliases| {
693 aliases
694 .iter()
695 .flat_map(|alias| alias.as_str())
696 .map(|alias| alias.to_string())
697 .collect::<Vec<String>>()
698 })
699 });
700
701 let order = field
702 .get("order")
703 .and_then(|order| order.as_str())
704 .and_then(|order| RecordFieldOrder::from_str(order).ok())
705 .unwrap_or(RecordFieldOrder::Ascending);
706
707 Ok(RecordField {
708 name,
709 doc: field.doc(),
710 default,
711 aliases,
712 order,
713 position,
714 custom_attributes: RecordField::get_field_custom_attributes(field, &schema),
715 schema,
716 })
717 }
718
719 fn resolve_default_value(
720 field_schema: &Schema,
721 field_name: &str,
722 record_name: &str,
723 names: &Names,
724 default: &Option<Value>,
725 ) -> AvroResult<()> {
726 if let Some(value) = default {
727 let avro_value = types::Value::from(value.clone());
728 match field_schema {
729 Schema::Union(union_schema) => {
730 let schemas = &union_schema.schemas;
731 let resolved = schemas.iter().any(|schema| {
732 avro_value
733 .to_owned()
734 .resolve_internal(schema, names, &schema.namespace(), &None)
735 .is_ok()
736 });
737
738 if !resolved {
739 let schema: Option<&Schema> = schemas.first();
740 return match schema {
741 Some(first_schema) => Err(Error::GetDefaultUnion(
742 SchemaKind::from(first_schema),
743 types::ValueKind::from(avro_value),
744 )),
745 None => Err(Error::EmptyUnion),
746 };
747 }
748 }
749 _ => {
750 let resolved = avro_value
751 .resolve_internal(field_schema, names, &field_schema.namespace(), &None)
752 .is_ok();
753
754 if !resolved {
755 return Err(Error::GetDefaultRecordField(
756 field_name.to_string(),
757 record_name.to_string(),
758 field_schema.canonical_form(),
759 ));
760 }
761 }
762 };
763 }
764
765 Ok(())
766 }
767
768 fn get_field_custom_attributes(
769 field: &Map<String, Value>,
770 schema: &Schema,
771 ) -> BTreeMap<String, Value> {
772 let mut custom_attributes: BTreeMap<String, Value> = BTreeMap::new();
773 for (key, value) in field {
774 match key.as_str() {
775 "type" | "name" | "doc" | "default" | "order" | "position" | "aliases"
776 | "logicalType" => continue,
777 key if key == "symbols" && matches!(schema, Schema::Enum(_)) => continue,
778 key if key == "size" && matches!(schema, Schema::Fixed(_)) => continue,
779 _ => custom_attributes.insert(key.clone(), value.clone()),
780 };
781 }
782 custom_attributes
783 }
784
785 pub fn is_nullable(&self) -> bool {
787 match self.schema {
788 Schema::Union(ref inner) => inner.is_nullable(),
789 _ => false,
790 }
791 }
792}
793
794#[derive(bon::Builder, Debug, Clone)]
796pub struct RecordSchema {
797 pub name: Name,
799 #[builder(default)]
801 pub aliases: Aliases,
802 #[builder(default)]
804 pub doc: Documentation,
805 pub fields: Vec<RecordField>,
807 pub lookup: BTreeMap<String, usize>,
810 #[builder(default = BTreeMap::new())]
812 pub attributes: BTreeMap<String, Value>,
813}
814
815#[derive(bon::Builder, Debug, Clone)]
817pub struct EnumSchema {
818 pub name: Name,
820 #[builder(default)]
822 pub aliases: Aliases,
823 #[builder(default)]
825 pub doc: Documentation,
826 pub symbols: Vec<String>,
828 pub default: Option<String>,
830 #[builder(default = BTreeMap::new())]
832 pub attributes: BTreeMap<String, Value>,
833}
834
835#[derive(bon::Builder, Debug, Clone)]
837pub struct FixedSchema {
838 pub name: Name,
840 #[builder(default)]
842 pub aliases: Aliases,
843 #[builder(default)]
845 pub doc: Documentation,
846 pub size: usize,
848 pub default: Option<String>,
850 #[builder(default = BTreeMap::new())]
852 pub attributes: BTreeMap<String, Value>,
853}
854
855impl FixedSchema {
856 fn serialize_to_map<S>(&self, mut map: S::SerializeMap) -> Result<S::SerializeMap, S::Error>
857 where
858 S: Serializer,
859 {
860 map.serialize_entry("type", "fixed")?;
861 if let Some(ref n) = self.name.namespace {
862 map.serialize_entry("namespace", n)?;
863 }
864 map.serialize_entry("name", &self.name.name)?;
865 if let Some(ref docstr) = self.doc {
866 map.serialize_entry("doc", docstr)?;
867 }
868 map.serialize_entry("size", &self.size)?;
869
870 if let Some(ref aliases) = self.aliases {
871 map.serialize_entry("aliases", aliases)?;
872 }
873
874 for attr in &self.attributes {
875 map.serialize_entry(attr.0, attr.1)?;
876 }
877
878 Ok(map)
879 }
880}
881
882#[derive(Debug, Clone)]
887pub struct DecimalSchema {
888 pub precision: DecimalMetadata,
890 pub scale: DecimalMetadata,
892 pub inner: Box<Schema>,
894}
895
896#[derive(Debug, Clone)]
898pub struct UnionSchema {
899 pub(crate) schemas: Vec<Schema>,
901 variant_index: BTreeMap<SchemaKind, usize>,
906}
907
908impl UnionSchema {
909 pub fn new(schemas: Vec<Schema>) -> AvroResult<Self> {
911 let mut vindex = BTreeMap::new();
912 for (i, schema) in schemas.iter().enumerate() {
913 if let Schema::Union(_) = schema {
914 return Err(Error::GetNestedUnion);
915 }
916 let kind = SchemaKind::from(schema);
917 if !kind.is_named() && vindex.insert(kind, i).is_some() {
918 return Err(Error::GetUnionDuplicate);
919 }
920 }
921 Ok(UnionSchema {
922 schemas,
923 variant_index: vindex,
924 })
925 }
926
927 pub fn variants(&self) -> &[Schema] {
929 &self.schemas
930 }
931
932 pub fn is_nullable(&self) -> bool {
934 self.schemas.iter().any(|x| matches!(x, Schema::Null))
935 }
936
937 #[deprecated(
940 since = "0.15.0",
941 note = "Please use `find_schema_with_known_schemata` instead"
942 )]
943 pub fn find_schema(&self, value: &types::Value) -> Option<(usize, &Schema)> {
944 self.find_schema_with_known_schemata::<Schema>(value, None, &None)
945 }
946
947 pub fn find_schema_with_known_schemata<S: Borrow<Schema> + Debug>(
953 &self,
954 value: &types::Value,
955 known_schemata: Option<&HashMap<Name, S>>,
956 enclosing_namespace: &Namespace,
957 ) -> Option<(usize, &Schema)> {
958 let schema_kind = SchemaKind::from(value);
959 if let Some(&i) = self.variant_index.get(&schema_kind) {
960 Some((i, &self.schemas[i]))
962 } else {
963 let mut collected_names: HashMap<Name, &Schema> = known_schemata
967 .map(|names| {
968 names
969 .iter()
970 .map(|(name, schema)| (name.clone(), schema.borrow()))
971 .collect()
972 })
973 .unwrap_or_default();
974
975 self.schemas.iter().enumerate().find(|(_, schema)| {
976 let resolved_schema = ResolvedSchema::new_with_known_schemata(
977 vec![*schema],
978 enclosing_namespace,
979 &collected_names,
980 )
981 .expect("Schema didn't successfully parse");
982 let resolved_names = resolved_schema.names_ref;
983
984 collected_names.extend(resolved_names);
986 let namespace = &schema.namespace().or_else(|| enclosing_namespace.clone());
987
988 value
989 .clone()
990 .resolve_internal(schema, &collected_names, namespace, &None)
991 .is_ok()
992 })
993 }
994 }
995}
996
997impl PartialEq for UnionSchema {
999 fn eq(&self, other: &UnionSchema) -> bool {
1000 self.schemas.eq(&other.schemas)
1001 }
1002}
1003
1004type DecimalMetadata = usize;
1005pub(crate) type Precision = DecimalMetadata;
1006pub(crate) type Scale = DecimalMetadata;
1007
1008fn parse_json_integer_for_decimal(value: &serde_json::Number) -> Result<DecimalMetadata, Error> {
1009 Ok(if value.is_u64() {
1010 let num = value
1011 .as_u64()
1012 .ok_or_else(|| Error::GetU64FromJson(value.clone()))?;
1013 num.try_into()
1014 .map_err(|e| Error::ConvertU64ToUsize(e, num))?
1015 } else if value.is_i64() {
1016 let num = value
1017 .as_i64()
1018 .ok_or_else(|| Error::GetI64FromJson(value.clone()))?;
1019 num.try_into()
1020 .map_err(|e| Error::ConvertI64ToUsize(e, num))?
1021 } else {
1022 return Err(Error::GetPrecisionOrScaleFromJson(value.clone()));
1023 })
1024}
1025
1026#[derive(Debug, Default)]
1027enum RecordSchemaParseLocation {
1028 #[default]
1030 Root,
1031
1032 FromField,
1034}
1035
1036#[derive(Default)]
1037struct Parser {
1038 input_schemas: HashMap<Name, Value>,
1039 resolving_schemas: Names,
1043 input_order: Vec<Name>,
1044 parsed_schemas: Names,
1047}
1048
1049impl Schema {
1050 pub fn canonical_form(&self) -> String {
1055 let json = serde_json::to_value(self)
1056 .unwrap_or_else(|e| panic!("Cannot parse Schema from JSON: {e}"));
1057 let mut defined_names = HashSet::new();
1058 parsing_canonical_form(&json, &mut defined_names)
1059 }
1060
1061 pub fn independent_canonical_form(&self, schemata: &Vec<Schema>) -> Result<String, Error> {
1067 let mut this = self.clone();
1068 this.denormalize(schemata)?;
1069 Ok(this.canonical_form())
1070 }
1071
1072 pub fn fingerprint<D: Digest>(&self) -> SchemaFingerprint {
1079 let mut d = D::new();
1080 d.update(self.canonical_form());
1081 SchemaFingerprint {
1082 bytes: d.finalize().to_vec(),
1083 }
1084 }
1085
1086 pub fn parse_str(input: &str) -> Result<Schema, Error> {
1088 let mut parser = Parser::default();
1089 parser.parse_str(input)
1090 }
1091
1092 pub fn parse_list(input: impl IntoIterator<Item = impl AsRef<str>>) -> AvroResult<Vec<Schema>> {
1100 let input = input.into_iter();
1101 let input_len = input.size_hint().0;
1102 let mut input_schemas: HashMap<Name, Value> = HashMap::with_capacity(input_len);
1103 let mut input_order: Vec<Name> = Vec::with_capacity(input_len);
1104 for json in input {
1105 let json = json.as_ref();
1106 let schema: Value = serde_json::from_str(json).map_err(Error::ParseSchemaJson)?;
1107 if let Value::Object(inner) = &schema {
1108 let name = Name::parse(inner, &None)?;
1109 let previous_value = input_schemas.insert(name.clone(), schema);
1110 if previous_value.is_some() {
1111 return Err(Error::NameCollision(name.fullname(None)));
1112 }
1113 input_order.push(name);
1114 } else {
1115 return Err(Error::GetNameField);
1116 }
1117 }
1118 let mut parser = Parser {
1119 input_schemas,
1120 resolving_schemas: HashMap::default(),
1121 input_order,
1122 parsed_schemas: HashMap::with_capacity(input_len),
1123 };
1124 parser.parse_list()
1125 }
1126
1127 pub fn parse_str_with_list(
1140 schema: &str,
1141 schemata: impl IntoIterator<Item = impl AsRef<str>>,
1142 ) -> AvroResult<(Schema, Vec<Schema>)> {
1143 let schemata = schemata.into_iter();
1144 let schemata_len = schemata.size_hint().0;
1145 let mut input_schemas: HashMap<Name, Value> = HashMap::with_capacity(schemata_len);
1146 let mut input_order: Vec<Name> = Vec::with_capacity(schemata_len);
1147 for json in schemata {
1148 let json = json.as_ref();
1149 let schema: Value = serde_json::from_str(json).map_err(Error::ParseSchemaJson)?;
1150 if let Value::Object(inner) = &schema {
1151 let name = Name::parse(inner, &None)?;
1152 if let Some(_previous) = input_schemas.insert(name.clone(), schema) {
1153 return Err(Error::NameCollision(name.fullname(None)));
1154 }
1155 input_order.push(name);
1156 } else {
1157 return Err(Error::GetNameField);
1158 }
1159 }
1160 let mut parser = Parser {
1161 input_schemas,
1162 resolving_schemas: HashMap::default(),
1163 input_order,
1164 parsed_schemas: HashMap::with_capacity(schemata_len),
1165 };
1166 parser.parse_input_schemas()?;
1167
1168 let value = serde_json::from_str(schema).map_err(Error::ParseSchemaJson)?;
1169 let schema = parser.parse(&value, &None)?;
1170 let schemata = parser.parse_list()?;
1171 Ok((schema, schemata))
1172 }
1173
1174 pub fn parse_reader(reader: &mut (impl Read + ?Sized)) -> AvroResult<Schema> {
1176 let mut buf = String::new();
1177 match reader.read_to_string(&mut buf) {
1178 Ok(_) => Self::parse_str(&buf),
1179 Err(e) => Err(Error::ReadSchemaFromReader(e)),
1180 }
1181 }
1182
1183 pub fn parse(value: &Value) -> AvroResult<Schema> {
1185 let mut parser = Parser::default();
1186 parser.parse(value, &None)
1187 }
1188
1189 pub(crate) fn parse_with_names(value: &Value, names: Names) -> AvroResult<Schema> {
1192 let mut parser = Parser {
1193 input_schemas: HashMap::with_capacity(1),
1194 resolving_schemas: Names::default(),
1195 input_order: Vec::with_capacity(1),
1196 parsed_schemas: names,
1197 };
1198 parser.parse(value, &None)
1199 }
1200
1201 pub fn custom_attributes(&self) -> Option<&BTreeMap<String, Value>> {
1203 match self {
1204 Schema::Record(RecordSchema { attributes, .. })
1205 | Schema::Enum(EnumSchema { attributes, .. })
1206 | Schema::Fixed(FixedSchema { attributes, .. })
1207 | Schema::Array(ArraySchema { attributes, .. })
1208 | Schema::Map(MapSchema { attributes, .. }) => Some(attributes),
1209 _ => None,
1210 }
1211 }
1212
1213 pub fn name(&self) -> Option<&Name> {
1215 match self {
1216 Schema::Ref { name, .. }
1217 | Schema::Record(RecordSchema { name, .. })
1218 | Schema::Enum(EnumSchema { name, .. })
1219 | Schema::Fixed(FixedSchema { name, .. }) => Some(name),
1220 _ => None,
1221 }
1222 }
1223
1224 pub fn namespace(&self) -> Namespace {
1226 self.name().and_then(|n| n.namespace.clone())
1227 }
1228
1229 pub fn aliases(&self) -> Option<&Vec<Alias>> {
1231 match self {
1232 Schema::Record(RecordSchema { aliases, .. })
1233 | Schema::Enum(EnumSchema { aliases, .. })
1234 | Schema::Fixed(FixedSchema { aliases, .. }) => aliases.as_ref(),
1235 _ => None,
1236 }
1237 }
1238
1239 pub fn doc(&self) -> Option<&String> {
1241 match self {
1242 Schema::Record(RecordSchema { doc, .. })
1243 | Schema::Enum(EnumSchema { doc, .. })
1244 | Schema::Fixed(FixedSchema { doc, .. }) => doc.as_ref(),
1245 _ => None,
1246 }
1247 }
1248
1249 pub fn map(types: Schema) -> Self {
1251 Schema::Map(MapSchema {
1252 types: Box::new(types),
1253 attributes: Default::default(),
1254 })
1255 }
1256
1257 pub fn map_with_attributes(types: Schema, attributes: BTreeMap<String, Value>) -> Self {
1259 Schema::Map(MapSchema {
1260 types: Box::new(types),
1261 attributes,
1262 })
1263 }
1264
1265 pub fn array(items: Schema) -> Self {
1267 Schema::Array(ArraySchema {
1268 items: Box::new(items),
1269 attributes: Default::default(),
1270 })
1271 }
1272
1273 pub fn array_with_attributes(items: Schema, attributes: BTreeMap<String, Value>) -> Self {
1275 Schema::Array(ArraySchema {
1276 items: Box::new(items),
1277 attributes,
1278 })
1279 }
1280
1281 fn denormalize(&mut self, schemata: &Vec<Schema>) -> AvroResult<()> {
1282 match self {
1283 Schema::Ref { name } => {
1284 let replacement_schema = schemata
1285 .iter()
1286 .find(|s| s.name().map(|n| *n == *name).unwrap_or(false));
1287 if let Some(schema) = replacement_schema {
1288 let mut denorm = schema.clone();
1289 denorm.denormalize(schemata)?;
1290 *self = denorm;
1291 } else {
1292 return Err(Error::SchemaResolutionError(name.clone()));
1293 }
1294 }
1295 Schema::Record(record_schema) => {
1296 for field in &mut record_schema.fields {
1297 field.schema.denormalize(schemata)?;
1298 }
1299 }
1300 Schema::Array(array_schema) => {
1301 array_schema.items.denormalize(schemata)?;
1302 }
1303 Schema::Map(map_schema) => {
1304 map_schema.types.denormalize(schemata)?;
1305 }
1306 Schema::Union(union_schema) => {
1307 for schema in &mut union_schema.schemas {
1308 schema.denormalize(schemata)?;
1309 }
1310 }
1311 _ => (),
1312 }
1313 Ok(())
1314 }
1315}
1316
1317impl Parser {
1318 fn parse_str(&mut self, input: &str) -> Result<Schema, Error> {
1320 let value = serde_json::from_str(input).map_err(Error::ParseSchemaJson)?;
1321 self.parse(&value, &None)
1322 }
1323
1324 fn parse_list(&mut self) -> Result<Vec<Schema>, Error> {
1327 self.parse_input_schemas()?;
1328
1329 let mut parsed_schemas = Vec::with_capacity(self.parsed_schemas.len());
1330 for name in self.input_order.drain(0..) {
1331 let parsed = self
1332 .parsed_schemas
1333 .remove(&name)
1334 .expect("One of the input schemas was unexpectedly not parsed");
1335 parsed_schemas.push(parsed);
1336 }
1337 Ok(parsed_schemas)
1338 }
1339
1340 fn parse_input_schemas(&mut self) -> Result<(), Error> {
1342 while !self.input_schemas.is_empty() {
1343 let next_name = self
1344 .input_schemas
1345 .keys()
1346 .next()
1347 .expect("Input schemas unexpectedly empty")
1348 .to_owned();
1349 let (name, value) = self
1350 .input_schemas
1351 .remove_entry(&next_name)
1352 .expect("Key unexpectedly missing");
1353 let parsed = self.parse(&value, &None)?;
1354 self.parsed_schemas
1355 .insert(get_schema_type_name(name, value), parsed);
1356 }
1357 Ok(())
1358 }
1359
1360 fn parse(&mut self, value: &Value, enclosing_namespace: &Namespace) -> AvroResult<Schema> {
1363 match *value {
1364 Value::String(ref t) => self.parse_known_schema(t.as_str(), enclosing_namespace),
1365 Value::Object(ref data) => {
1366 self.parse_complex(data, enclosing_namespace, RecordSchemaParseLocation::Root)
1367 }
1368 Value::Array(ref data) => self.parse_union(data, enclosing_namespace),
1369 _ => Err(Error::ParseSchemaFromValidJson),
1370 }
1371 }
1372
1373 fn parse_known_schema(
1377 &mut self,
1378 name: &str,
1379 enclosing_namespace: &Namespace,
1380 ) -> AvroResult<Schema> {
1381 match name {
1382 "null" => Ok(Schema::Null),
1383 "boolean" => Ok(Schema::Boolean),
1384 "int" => Ok(Schema::Int),
1385 "long" => Ok(Schema::Long),
1386 "double" => Ok(Schema::Double),
1387 "float" => Ok(Schema::Float),
1388 "bytes" => Ok(Schema::Bytes),
1389 "string" => Ok(Schema::String),
1390 _ => self.fetch_schema_ref(name, enclosing_namespace),
1391 }
1392 }
1393
1394 fn fetch_schema_ref(
1404 &mut self,
1405 name: &str,
1406 enclosing_namespace: &Namespace,
1407 ) -> AvroResult<Schema> {
1408 fn get_schema_ref(parsed: &Schema) -> Schema {
1409 match &parsed {
1410 Schema::Record(RecordSchema { ref name, .. })
1411 | Schema::Enum(EnumSchema { ref name, .. })
1412 | Schema::Fixed(FixedSchema { ref name, .. }) => Schema::Ref { name: name.clone() },
1413 _ => parsed.clone(),
1414 }
1415 }
1416
1417 let name = Name::new(name)?;
1418 let fully_qualified_name = name.fully_qualified_name(enclosing_namespace);
1419
1420 if self.parsed_schemas.contains_key(&fully_qualified_name) {
1421 return Ok(Schema::Ref {
1422 name: fully_qualified_name,
1423 });
1424 }
1425 if let Some(resolving_schema) = self.resolving_schemas.get(&fully_qualified_name) {
1426 return Ok(resolving_schema.clone());
1427 }
1428
1429 match name.name.as_str() {
1431 "record" | "enum" | "fixed" => {
1432 return Err(Error::InvalidSchemaRecord(name.to_string()));
1433 }
1434 _ => (),
1435 }
1436
1437 let value = self
1438 .input_schemas
1439 .remove(&fully_qualified_name)
1440 .ok_or_else(|| Error::ParsePrimitive(fully_qualified_name.fullname(None)))?;
1442
1443 let parsed = self.parse(&value, &None)?;
1445 self.parsed_schemas
1446 .insert(get_schema_type_name(name, value), parsed.clone());
1447
1448 Ok(get_schema_ref(&parsed))
1449 }
1450
1451 fn parse_precision_and_scale(
1452 complex: &Map<String, Value>,
1453 ) -> Result<(Precision, Scale), Error> {
1454 fn get_decimal_integer(
1455 complex: &Map<String, Value>,
1456 key: &'static str,
1457 ) -> Result<DecimalMetadata, Error> {
1458 match complex.get(key) {
1459 Some(Value::Number(value)) => parse_json_integer_for_decimal(value),
1460 None => {
1461 if key == "scale" {
1462 Ok(0)
1463 } else {
1464 Err(Error::GetDecimalMetadataFromJson(key))
1465 }
1466 }
1467 Some(value) => Err(Error::GetDecimalMetadataValueFromJson {
1468 key: key.into(),
1469 value: value.clone(),
1470 }),
1471 }
1472 }
1473 let precision = get_decimal_integer(complex, "precision")?;
1474 let scale = get_decimal_integer(complex, "scale")?;
1475
1476 if precision < 1 {
1477 return Err(Error::DecimalPrecisionMuBePositive { precision });
1478 }
1479
1480 if precision < scale {
1481 Err(Error::DecimalPrecisionLessThanScale { precision, scale })
1482 } else {
1483 Ok((precision, scale))
1484 }
1485 }
1486
1487 fn parse_complex(
1493 &mut self,
1494 complex: &Map<String, Value>,
1495 enclosing_namespace: &Namespace,
1496 parse_location: RecordSchemaParseLocation,
1497 ) -> AvroResult<Schema> {
1498 fn parse_as_native_complex(
1500 complex: &Map<String, Value>,
1501 parser: &mut Parser,
1502 enclosing_namespace: &Namespace,
1503 ) -> AvroResult<Schema> {
1504 match complex.get("type") {
1505 Some(value) => match value {
1506 Value::String(s) if s == "fixed" => {
1507 parser.parse_fixed(complex, enclosing_namespace)
1508 }
1509 _ => parser.parse(value, enclosing_namespace),
1510 },
1511 None => Err(Error::GetLogicalTypeField),
1512 }
1513 }
1514
1515 fn try_convert_to_logical_type<F>(
1522 logical_type: &str,
1523 schema: Schema,
1524 supported_schema_kinds: &[SchemaKind],
1525 convert: F,
1526 ) -> AvroResult<Schema>
1527 where
1528 F: Fn(Schema) -> AvroResult<Schema>,
1529 {
1530 let kind = SchemaKind::from(schema.clone());
1531 if supported_schema_kinds.contains(&kind) {
1532 convert(schema)
1533 } else {
1534 warn!(
1535 "Ignoring unknown logical type '{logical_type}' for schema of type: {schema:?}!"
1536 );
1537 Ok(schema)
1538 }
1539 }
1540
1541 match complex.get("logicalType") {
1542 Some(Value::String(t)) => match t.as_str() {
1543 "decimal" => {
1544 return try_convert_to_logical_type(
1545 "decimal",
1546 parse_as_native_complex(complex, self, enclosing_namespace)?,
1547 &[SchemaKind::Fixed, SchemaKind::Bytes],
1548 |inner| -> AvroResult<Schema> {
1549 match Self::parse_precision_and_scale(complex) {
1550 Ok((precision, scale)) => Ok(Schema::Decimal(DecimalSchema {
1551 precision,
1552 scale,
1553 inner: Box::new(inner),
1554 })),
1555 Err(err) => {
1556 warn!("Ignoring invalid decimal logical type: {err}");
1557 Ok(inner)
1558 }
1559 }
1560 },
1561 );
1562 }
1563 "big-decimal" => {
1564 return try_convert_to_logical_type(
1565 "big-decimal",
1566 parse_as_native_complex(complex, self, enclosing_namespace)?,
1567 &[SchemaKind::Bytes],
1568 |_| -> AvroResult<Schema> { Ok(Schema::BigDecimal) },
1569 );
1570 }
1571 "uuid" => {
1572 return try_convert_to_logical_type(
1573 "uuid",
1574 parse_as_native_complex(complex, self, enclosing_namespace)?,
1575 &[SchemaKind::String, SchemaKind::Fixed],
1576 |schema| match schema {
1577 Schema::String => Ok(Schema::Uuid),
1578 Schema::Fixed(FixedSchema { size: 16, .. }) => Ok(Schema::Uuid),
1579 Schema::Fixed(FixedSchema { size, .. }) => {
1580 warn!("Ignoring uuid logical type for a Fixed schema because its size ({size:?}) is not 16! Schema: {schema:?}");
1581 Ok(schema)
1582 }
1583 _ => {
1584 warn!("Ignoring invalid uuid logical type for schema: {schema:?}");
1585 Ok(schema)
1586 }
1587 },
1588 );
1589 }
1590 "date" => {
1591 return try_convert_to_logical_type(
1592 "date",
1593 parse_as_native_complex(complex, self, enclosing_namespace)?,
1594 &[SchemaKind::Int],
1595 |_| -> AvroResult<Schema> { Ok(Schema::Date) },
1596 );
1597 }
1598 "time-millis" => {
1599 return try_convert_to_logical_type(
1600 "date",
1601 parse_as_native_complex(complex, self, enclosing_namespace)?,
1602 &[SchemaKind::Int],
1603 |_| -> AvroResult<Schema> { Ok(Schema::TimeMillis) },
1604 );
1605 }
1606 "time-micros" => {
1607 return try_convert_to_logical_type(
1608 "time-micros",
1609 parse_as_native_complex(complex, self, enclosing_namespace)?,
1610 &[SchemaKind::Long],
1611 |_| -> AvroResult<Schema> { Ok(Schema::TimeMicros) },
1612 );
1613 }
1614 "timestamp-millis" => {
1615 return try_convert_to_logical_type(
1616 "timestamp-millis",
1617 parse_as_native_complex(complex, self, enclosing_namespace)?,
1618 &[SchemaKind::Long],
1619 |_| -> AvroResult<Schema> { Ok(Schema::TimestampMillis) },
1620 );
1621 }
1622 "timestamp-micros" => {
1623 return try_convert_to_logical_type(
1624 "timestamp-micros",
1625 parse_as_native_complex(complex, self, enclosing_namespace)?,
1626 &[SchemaKind::Long],
1627 |_| -> AvroResult<Schema> { Ok(Schema::TimestampMicros) },
1628 );
1629 }
1630 "timestamp-nanos" => {
1631 return try_convert_to_logical_type(
1632 "timestamp-nanos",
1633 parse_as_native_complex(complex, self, enclosing_namespace)?,
1634 &[SchemaKind::Long],
1635 |_| -> AvroResult<Schema> { Ok(Schema::TimestampNanos) },
1636 );
1637 }
1638 "local-timestamp-millis" => {
1639 return try_convert_to_logical_type(
1640 "local-timestamp-millis",
1641 parse_as_native_complex(complex, self, enclosing_namespace)?,
1642 &[SchemaKind::Long],
1643 |_| -> AvroResult<Schema> { Ok(Schema::LocalTimestampMillis) },
1644 );
1645 }
1646 "local-timestamp-micros" => {
1647 return try_convert_to_logical_type(
1648 "local-timestamp-micros",
1649 parse_as_native_complex(complex, self, enclosing_namespace)?,
1650 &[SchemaKind::Long],
1651 |_| -> AvroResult<Schema> { Ok(Schema::LocalTimestampMicros) },
1652 );
1653 }
1654 "local-timestamp-nanos" => {
1655 return try_convert_to_logical_type(
1656 "local-timestamp-nanos",
1657 parse_as_native_complex(complex, self, enclosing_namespace)?,
1658 &[SchemaKind::Long],
1659 |_| -> AvroResult<Schema> { Ok(Schema::LocalTimestampNanos) },
1660 );
1661 }
1662 "duration" => {
1663 return try_convert_to_logical_type(
1664 "duration",
1665 parse_as_native_complex(complex, self, enclosing_namespace)?,
1666 &[SchemaKind::Fixed],
1667 |_| -> AvroResult<Schema> { Ok(Schema::Duration) },
1668 );
1669 }
1670 _ => {}
1673 },
1674 Some(value) => return Err(Error::GetLogicalTypeFieldType(value.clone())),
1678 _ => {}
1679 }
1680 match complex.get("type") {
1681 Some(Value::String(t)) => match t.as_str() {
1682 "record" => match parse_location {
1683 RecordSchemaParseLocation::Root => {
1684 self.parse_record(complex, enclosing_namespace)
1685 }
1686 RecordSchemaParseLocation::FromField => {
1687 self.fetch_schema_ref(t, enclosing_namespace)
1688 }
1689 },
1690 "enum" => self.parse_enum(complex, enclosing_namespace),
1691 "array" => self.parse_array(complex, enclosing_namespace),
1692 "map" => self.parse_map(complex, enclosing_namespace),
1693 "fixed" => self.parse_fixed(complex, enclosing_namespace),
1694 other => self.parse_known_schema(other, enclosing_namespace),
1695 },
1696 Some(Value::Object(data)) => {
1697 self.parse_complex(data, enclosing_namespace, RecordSchemaParseLocation::Root)
1698 }
1699 Some(Value::Array(variants)) => self.parse_union(variants, enclosing_namespace),
1700 Some(unknown) => Err(Error::GetComplexType(unknown.clone())),
1701 None => Err(Error::GetComplexTypeField),
1702 }
1703 }
1704
1705 fn register_resolving_schema(&mut self, name: &Name, aliases: &Aliases) {
1706 let resolving_schema = Schema::Ref { name: name.clone() };
1707 self.resolving_schemas
1708 .insert(name.clone(), resolving_schema.clone());
1709
1710 let namespace = &name.namespace;
1711
1712 if let Some(ref aliases) = aliases {
1713 aliases.iter().for_each(|alias| {
1714 let alias_fullname = alias.fully_qualified_name(namespace);
1715 self.resolving_schemas
1716 .insert(alias_fullname, resolving_schema.clone());
1717 });
1718 }
1719 }
1720
1721 fn register_parsed_schema(
1722 &mut self,
1723 fully_qualified_name: &Name,
1724 schema: &Schema,
1725 aliases: &Aliases,
1726 ) {
1727 self.parsed_schemas
1730 .insert(fully_qualified_name.clone(), schema.clone());
1731 self.resolving_schemas.remove(fully_qualified_name);
1732
1733 let namespace = &fully_qualified_name.namespace;
1734
1735 if let Some(ref aliases) = aliases {
1736 aliases.iter().for_each(|alias| {
1737 let alias_fullname = alias.fully_qualified_name(namespace);
1738 self.resolving_schemas.remove(&alias_fullname);
1739 self.parsed_schemas.insert(alias_fullname, schema.clone());
1740 });
1741 }
1742 }
1743
1744 fn get_already_seen_schema(
1746 &self,
1747 complex: &Map<String, Value>,
1748 enclosing_namespace: &Namespace,
1749 ) -> Option<&Schema> {
1750 match complex.get("type") {
1751 Some(Value::String(ref typ)) => {
1752 let name = Name::new(typ.as_str())
1753 .unwrap()
1754 .fully_qualified_name(enclosing_namespace);
1755 self.resolving_schemas
1756 .get(&name)
1757 .or_else(|| self.parsed_schemas.get(&name))
1758 }
1759 _ => None,
1760 }
1761 }
1762
1763 fn parse_record(
1766 &mut self,
1767 complex: &Map<String, Value>,
1768 enclosing_namespace: &Namespace,
1769 ) -> AvroResult<Schema> {
1770 let fields_opt = complex.get("fields");
1771
1772 if fields_opt.is_none() {
1773 if let Some(seen) = self.get_already_seen_schema(complex, enclosing_namespace) {
1774 return Ok(seen.clone());
1775 }
1776 }
1777
1778 let fully_qualified_name = Name::parse(complex, enclosing_namespace)?;
1779 let aliases = fix_aliases_namespace(complex.aliases(), &fully_qualified_name.namespace);
1780
1781 let mut lookup = BTreeMap::new();
1782
1783 self.register_resolving_schema(&fully_qualified_name, &aliases);
1784
1785 debug!("Going to parse record schema: {:?}", &fully_qualified_name);
1786
1787 let fields: Vec<RecordField> = fields_opt
1788 .and_then(|fields| fields.as_array())
1789 .ok_or(Error::GetRecordFieldsJson)
1790 .and_then(|fields| {
1791 fields
1792 .iter()
1793 .filter_map(|field| field.as_object())
1794 .enumerate()
1795 .map(|(position, field)| {
1796 RecordField::parse(field, position, self, &fully_qualified_name)
1797 })
1798 .collect::<Result<_, _>>()
1799 })?;
1800
1801 for field in &fields {
1802 if let Some(_old) = lookup.insert(field.name.clone(), field.position) {
1803 return Err(Error::FieldNameDuplicate(field.name.clone()));
1804 }
1805
1806 if let Some(ref field_aliases) = field.aliases {
1807 for alias in field_aliases {
1808 lookup.insert(alias.clone(), field.position);
1809 }
1810 }
1811 }
1812
1813 let schema = Schema::Record(RecordSchema {
1814 name: fully_qualified_name.clone(),
1815 aliases: aliases.clone(),
1816 doc: complex.doc(),
1817 fields,
1818 lookup,
1819 attributes: self.get_custom_attributes(complex, vec!["fields"]),
1820 });
1821
1822 self.register_parsed_schema(&fully_qualified_name, &schema, &aliases);
1823 Ok(schema)
1824 }
1825
1826 fn get_custom_attributes(
1827 &self,
1828 complex: &Map<String, Value>,
1829 excluded: Vec<&'static str>,
1830 ) -> BTreeMap<String, Value> {
1831 let mut custom_attributes: BTreeMap<String, Value> = BTreeMap::new();
1832 for (key, value) in complex {
1833 match key.as_str() {
1834 "type" | "name" | "namespace" | "doc" | "aliases" => continue,
1835 candidate if excluded.contains(&candidate) => continue,
1836 _ => custom_attributes.insert(key.clone(), value.clone()),
1837 };
1838 }
1839 custom_attributes
1840 }
1841
1842 fn parse_enum(
1845 &mut self,
1846 complex: &Map<String, Value>,
1847 enclosing_namespace: &Namespace,
1848 ) -> AvroResult<Schema> {
1849 let symbols_opt = complex.get("symbols");
1850
1851 if symbols_opt.is_none() {
1852 if let Some(seen) = self.get_already_seen_schema(complex, enclosing_namespace) {
1853 return Ok(seen.clone());
1854 }
1855 }
1856
1857 let fully_qualified_name = Name::parse(complex, enclosing_namespace)?;
1858 let aliases = fix_aliases_namespace(complex.aliases(), &fully_qualified_name.namespace);
1859
1860 let symbols: Vec<String> = symbols_opt
1861 .and_then(|v| v.as_array())
1862 .ok_or(Error::GetEnumSymbolsField)
1863 .and_then(|symbols| {
1864 symbols
1865 .iter()
1866 .map(|symbol| symbol.as_str().map(|s| s.to_string()))
1867 .collect::<Option<_>>()
1868 .ok_or(Error::GetEnumSymbols)
1869 })?;
1870
1871 let mut existing_symbols: HashSet<&String> = HashSet::with_capacity(symbols.len());
1872 for symbol in symbols.iter() {
1873 validate_enum_symbol_name(symbol)?;
1874
1875 if existing_symbols.contains(&symbol) {
1877 return Err(Error::EnumSymbolDuplicate(symbol.to_string()));
1878 }
1879
1880 existing_symbols.insert(symbol);
1881 }
1882
1883 let mut default: Option<String> = None;
1884 if let Some(value) = complex.get("default") {
1885 if let Value::String(ref s) = *value {
1886 default = Some(s.clone());
1887 } else {
1888 return Err(Error::EnumDefaultWrongType(value.clone()));
1889 }
1890 }
1891
1892 if let Some(ref value) = default {
1893 let resolved = types::Value::from(value.clone())
1894 .resolve_enum(&symbols, &Some(value.to_string()), &None)
1895 .is_ok();
1896 if !resolved {
1897 return Err(Error::GetEnumDefault {
1898 symbol: value.to_string(),
1899 symbols,
1900 });
1901 }
1902 }
1903
1904 let schema = Schema::Enum(EnumSchema {
1905 name: fully_qualified_name.clone(),
1906 aliases: aliases.clone(),
1907 doc: complex.doc(),
1908 symbols,
1909 default,
1910 attributes: self.get_custom_attributes(complex, vec!["symbols"]),
1911 });
1912
1913 self.register_parsed_schema(&fully_qualified_name, &schema, &aliases);
1914
1915 Ok(schema)
1916 }
1917
1918 fn parse_array(
1921 &mut self,
1922 complex: &Map<String, Value>,
1923 enclosing_namespace: &Namespace,
1924 ) -> AvroResult<Schema> {
1925 complex
1926 .get("items")
1927 .ok_or(Error::GetArrayItemsField)
1928 .and_then(|items| self.parse(items, enclosing_namespace))
1929 .map(|items| {
1930 Schema::array_with_attributes(
1931 items,
1932 self.get_custom_attributes(complex, vec!["items"]),
1933 )
1934 })
1935 }
1936
1937 fn parse_map(
1940 &mut self,
1941 complex: &Map<String, Value>,
1942 enclosing_namespace: &Namespace,
1943 ) -> AvroResult<Schema> {
1944 complex
1945 .get("values")
1946 .ok_or(Error::GetMapValuesField)
1947 .and_then(|items| self.parse(items, enclosing_namespace))
1948 .map(|items| {
1949 Schema::map_with_attributes(
1950 items,
1951 self.get_custom_attributes(complex, vec!["values"]),
1952 )
1953 })
1954 }
1955
1956 fn parse_union(
1959 &mut self,
1960 items: &[Value],
1961 enclosing_namespace: &Namespace,
1962 ) -> AvroResult<Schema> {
1963 items
1964 .iter()
1965 .map(|v| self.parse(v, enclosing_namespace))
1966 .collect::<Result<Vec<_>, _>>()
1967 .and_then(|schemas| {
1968 if schemas.is_empty() {
1969 error!(
1970 "Union schemas should have at least two members! \
1971 Please enable debug logging to find out which Record schema \
1972 declares the union with 'RUST_LOG=apache_avro::schema=debug'."
1973 );
1974 } else if schemas.len() == 1 {
1975 warn!(
1976 "Union schema with just one member! Consider dropping the union! \
1977 Please enable debug logging to find out which Record schema \
1978 declares the union with 'RUST_LOG=apache_avro::schema=debug'."
1979 );
1980 }
1981 Ok(Schema::Union(UnionSchema::new(schemas)?))
1982 })
1983 }
1984
1985 fn parse_fixed(
1988 &mut self,
1989 complex: &Map<String, Value>,
1990 enclosing_namespace: &Namespace,
1991 ) -> AvroResult<Schema> {
1992 let size_opt = complex.get("size");
1993 if size_opt.is_none() {
1994 if let Some(seen) = self.get_already_seen_schema(complex, enclosing_namespace) {
1995 return Ok(seen.clone());
1996 }
1997 }
1998
1999 let doc = complex.get("doc").and_then(|v| match &v {
2000 Value::String(ref docstr) => Some(docstr.clone()),
2001 _ => None,
2002 });
2003
2004 let size = match size_opt {
2005 Some(size) => size
2006 .as_u64()
2007 .ok_or_else(|| Error::GetFixedSizeFieldPositive(size.clone())),
2008 None => Err(Error::GetFixedSizeField),
2009 }?;
2010
2011 let default = complex.get("default").and_then(|v| match &v {
2012 Value::String(ref default) => Some(default.clone()),
2013 _ => None,
2014 });
2015
2016 if default.is_some() {
2017 let len = default.clone().unwrap().len();
2018 if len != size as usize {
2019 return Err(Error::FixedDefaultLenSizeMismatch(len, size));
2020 }
2021 }
2022
2023 let fully_qualified_name = Name::parse(complex, enclosing_namespace)?;
2024 let aliases = fix_aliases_namespace(complex.aliases(), &fully_qualified_name.namespace);
2025
2026 let schema = Schema::Fixed(FixedSchema {
2027 name: fully_qualified_name.clone(),
2028 aliases: aliases.clone(),
2029 doc,
2030 size: size as usize,
2031 default,
2032 attributes: self.get_custom_attributes(complex, vec!["size"]),
2033 });
2034
2035 self.register_parsed_schema(&fully_qualified_name, &schema, &aliases);
2036
2037 Ok(schema)
2038 }
2039}
2040
2041fn fix_aliases_namespace(aliases: Option<Vec<String>>, namespace: &Namespace) -> Aliases {
2047 aliases.map(|aliases| {
2048 aliases
2049 .iter()
2050 .map(|alias| {
2051 if alias.find('.').is_none() {
2052 match namespace {
2053 Some(ref ns) => format!("{ns}.{alias}"),
2054 None => alias.clone(),
2055 }
2056 } else {
2057 alias.clone()
2058 }
2059 })
2060 .map(|alias| Alias::new(alias.as_str()).unwrap())
2061 .collect()
2062 })
2063}
2064
2065fn get_schema_type_name(name: Name, value: Value) -> Name {
2066 match value.get("type") {
2067 Some(Value::Object(complex_type)) => match complex_type.name() {
2068 Some(name) => Name::new(name.as_str()).unwrap(),
2069 _ => name,
2070 },
2071 _ => name,
2072 }
2073}
2074
2075impl Serialize for Schema {
2076 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
2077 where
2078 S: Serializer,
2079 {
2080 match *self {
2081 Schema::Ref { ref name } => serializer.serialize_str(&name.fullname(None)),
2082 Schema::Null => serializer.serialize_str("null"),
2083 Schema::Boolean => serializer.serialize_str("boolean"),
2084 Schema::Int => serializer.serialize_str("int"),
2085 Schema::Long => serializer.serialize_str("long"),
2086 Schema::Float => serializer.serialize_str("float"),
2087 Schema::Double => serializer.serialize_str("double"),
2088 Schema::Bytes => serializer.serialize_str("bytes"),
2089 Schema::String => serializer.serialize_str("string"),
2090 Schema::Array(ref inner) => {
2091 let mut map = serializer.serialize_map(Some(2 + inner.attributes.len()))?;
2092 map.serialize_entry("type", "array")?;
2093 map.serialize_entry("items", &*inner.items.clone())?;
2094 for attr in &inner.attributes {
2095 map.serialize_entry(attr.0, attr.1)?;
2096 }
2097 map.end()
2098 }
2099 Schema::Map(ref inner) => {
2100 let mut map = serializer.serialize_map(Some(2 + inner.attributes.len()))?;
2101 map.serialize_entry("type", "map")?;
2102 map.serialize_entry("values", &*inner.types.clone())?;
2103 for attr in &inner.attributes {
2104 map.serialize_entry(attr.0, attr.1)?;
2105 }
2106 map.end()
2107 }
2108 Schema::Union(ref inner) => {
2109 let variants = inner.variants();
2110 let mut seq = serializer.serialize_seq(Some(variants.len()))?;
2111 for v in variants {
2112 seq.serialize_element(v)?;
2113 }
2114 seq.end()
2115 }
2116 Schema::Record(RecordSchema {
2117 ref name,
2118 ref aliases,
2119 ref doc,
2120 ref fields,
2121 ref attributes,
2122 ..
2123 }) => {
2124 let mut map = serializer.serialize_map(None)?;
2125 map.serialize_entry("type", "record")?;
2126 if let Some(ref n) = name.namespace {
2127 map.serialize_entry("namespace", n)?;
2128 }
2129 map.serialize_entry("name", &name.name)?;
2130 if let Some(ref docstr) = doc {
2131 map.serialize_entry("doc", docstr)?;
2132 }
2133 if let Some(ref aliases) = aliases {
2134 map.serialize_entry("aliases", aliases)?;
2135 }
2136 map.serialize_entry("fields", fields)?;
2137 for attr in attributes {
2138 map.serialize_entry(attr.0, attr.1)?;
2139 }
2140 map.end()
2141 }
2142 Schema::Enum(EnumSchema {
2143 ref name,
2144 ref symbols,
2145 ref aliases,
2146 ref attributes,
2147 ..
2148 }) => {
2149 let mut map = serializer.serialize_map(None)?;
2150 map.serialize_entry("type", "enum")?;
2151 if let Some(ref n) = name.namespace {
2152 map.serialize_entry("namespace", n)?;
2153 }
2154 map.serialize_entry("name", &name.name)?;
2155 map.serialize_entry("symbols", symbols)?;
2156
2157 if let Some(ref aliases) = aliases {
2158 map.serialize_entry("aliases", aliases)?;
2159 }
2160 for attr in attributes {
2161 map.serialize_entry(attr.0, attr.1)?;
2162 }
2163 map.end()
2164 }
2165 Schema::Fixed(ref fixed_schema) => {
2166 let mut map = serializer.serialize_map(None)?;
2167 map = fixed_schema.serialize_to_map::<S>(map)?;
2168 map.end()
2169 }
2170 Schema::Decimal(DecimalSchema {
2171 ref scale,
2172 ref precision,
2173 ref inner,
2174 }) => {
2175 let mut map = serializer.serialize_map(None)?;
2176 match inner.as_ref() {
2177 Schema::Fixed(fixed_schema) => {
2178 map = fixed_schema.serialize_to_map::<S>(map)?;
2179 }
2180 Schema::Bytes => {
2181 map.serialize_entry("type", "bytes")?;
2182 }
2183 others => {
2184 return Err(serde::ser::Error::custom(format!(
2185 "DecimalSchema inner type must be Fixed or Bytes, got {:?}",
2186 SchemaKind::from(others)
2187 )));
2188 }
2189 }
2190 map.serialize_entry("logicalType", "decimal")?;
2191 map.serialize_entry("scale", scale)?;
2192 map.serialize_entry("precision", precision)?;
2193 map.end()
2194 }
2195
2196 Schema::BigDecimal => {
2197 let mut map = serializer.serialize_map(None)?;
2198 map.serialize_entry("type", "bytes")?;
2199 map.serialize_entry("logicalType", "big-decimal")?;
2200 map.end()
2201 }
2202 Schema::Uuid => {
2203 let mut map = serializer.serialize_map(None)?;
2204 map.serialize_entry("type", "string")?;
2205 map.serialize_entry("logicalType", "uuid")?;
2206 map.end()
2207 }
2208 Schema::Date => {
2209 let mut map = serializer.serialize_map(None)?;
2210 map.serialize_entry("type", "int")?;
2211 map.serialize_entry("logicalType", "date")?;
2212 map.end()
2213 }
2214 Schema::TimeMillis => {
2215 let mut map = serializer.serialize_map(None)?;
2216 map.serialize_entry("type", "int")?;
2217 map.serialize_entry("logicalType", "time-millis")?;
2218 map.end()
2219 }
2220 Schema::TimeMicros => {
2221 let mut map = serializer.serialize_map(None)?;
2222 map.serialize_entry("type", "long")?;
2223 map.serialize_entry("logicalType", "time-micros")?;
2224 map.end()
2225 }
2226 Schema::TimestampMillis => {
2227 let mut map = serializer.serialize_map(None)?;
2228 map.serialize_entry("type", "long")?;
2229 map.serialize_entry("logicalType", "timestamp-millis")?;
2230 map.end()
2231 }
2232 Schema::TimestampMicros => {
2233 let mut map = serializer.serialize_map(None)?;
2234 map.serialize_entry("type", "long")?;
2235 map.serialize_entry("logicalType", "timestamp-micros")?;
2236 map.end()
2237 }
2238 Schema::TimestampNanos => {
2239 let mut map = serializer.serialize_map(None)?;
2240 map.serialize_entry("type", "long")?;
2241 map.serialize_entry("logicalType", "timestamp-nanos")?;
2242 map.end()
2243 }
2244 Schema::LocalTimestampMillis => {
2245 let mut map = serializer.serialize_map(None)?;
2246 map.serialize_entry("type", "long")?;
2247 map.serialize_entry("logicalType", "local-timestamp-millis")?;
2248 map.end()
2249 }
2250 Schema::LocalTimestampMicros => {
2251 let mut map = serializer.serialize_map(None)?;
2252 map.serialize_entry("type", "long")?;
2253 map.serialize_entry("logicalType", "local-timestamp-micros")?;
2254 map.end()
2255 }
2256 Schema::LocalTimestampNanos => {
2257 let mut map = serializer.serialize_map(None)?;
2258 map.serialize_entry("type", "long")?;
2259 map.serialize_entry("logicalType", "local-timestamp-nanos")?;
2260 map.end()
2261 }
2262 Schema::Duration => {
2263 let mut map = serializer.serialize_map(None)?;
2264
2265 let inner = Schema::Fixed(FixedSchema {
2268 name: Name::new("duration").unwrap(),
2269 aliases: None,
2270 doc: None,
2271 size: 12,
2272 default: None,
2273 attributes: Default::default(),
2274 });
2275 map.serialize_entry("type", &inner)?;
2276 map.serialize_entry("logicalType", "duration")?;
2277 map.end()
2278 }
2279 }
2280 }
2281}
2282
2283impl Serialize for RecordField {
2284 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
2285 where
2286 S: Serializer,
2287 {
2288 let mut map = serializer.serialize_map(None)?;
2289 map.serialize_entry("name", &self.name)?;
2290 map.serialize_entry("type", &self.schema)?;
2291
2292 if let Some(ref default) = self.default {
2293 map.serialize_entry("default", default)?;
2294 }
2295
2296 if let Some(ref aliases) = self.aliases {
2297 map.serialize_entry("aliases", aliases)?;
2298 }
2299
2300 for attr in &self.custom_attributes {
2301 map.serialize_entry(attr.0, attr.1)?;
2302 }
2303
2304 map.end()
2305 }
2306}
2307
2308fn parsing_canonical_form(schema: &Value, defined_names: &mut HashSet<String>) -> String {
2311 match schema {
2312 Value::Object(map) => pcf_map(map, defined_names),
2313 Value::String(s) => pcf_string(s),
2314 Value::Array(v) => pcf_array(v, defined_names),
2315 json => panic!("got invalid JSON value for canonical form of schema: {json}"),
2316 }
2317}
2318
2319fn pcf_map(schema: &Map<String, Value>, defined_names: &mut HashSet<String>) -> String {
2320 let ns = schema.get("namespace").and_then(|v| v.as_str());
2322 let typ = schema.get("type").and_then(|v| v.as_str());
2323 let raw_name = schema.get("name").and_then(|v| v.as_str());
2324 let name = if is_named_type(typ) {
2325 Some(format!(
2326 "{}{}",
2327 ns.map_or("".to_string(), |n| { format!("{n}.") }),
2328 raw_name.unwrap_or_default()
2329 ))
2330 } else {
2331 None
2332 };
2333
2334 if let Some(ref n) = name {
2336 if defined_names.contains(n) {
2337 return pcf_string(n);
2338 } else {
2339 defined_names.insert(n.clone());
2340 }
2341 }
2342
2343 let mut fields = Vec::new();
2344 for (k, v) in schema {
2345 if schema.len() == 1 && k == "type" {
2347 if let Value::String(s) = v {
2349 return pcf_string(s);
2350 }
2351 }
2352
2353 if field_ordering_position(k).is_none()
2355 || k == "default"
2356 || k == "doc"
2357 || k == "aliases"
2358 || k == "logicalType"
2359 {
2360 continue;
2361 }
2362
2363 if k == "name" {
2365 if let Some(ref n) = name {
2366 fields.push(("name", format!("{}:{}", pcf_string(k), pcf_string(n))));
2367 continue;
2368 }
2369 }
2370
2371 if k == "size" || k == "precision" || k == "scale" {
2373 let i = match v.as_str() {
2374 Some(s) => s.parse::<i64>().expect("Only valid schemas are accepted!"),
2375 None => v.as_i64().unwrap(),
2376 };
2377 fields.push((k, format!("{}:{}", pcf_string(k), i)));
2378 continue;
2379 }
2380
2381 fields.push((
2383 k,
2384 format!(
2385 "{}:{}",
2386 pcf_string(k),
2387 parsing_canonical_form(v, defined_names)
2388 ),
2389 ));
2390 }
2391
2392 fields.sort_unstable_by_key(|(k, _)| field_ordering_position(k).unwrap());
2394 let inter = fields
2395 .into_iter()
2396 .map(|(_, v)| v)
2397 .collect::<Vec<_>>()
2398 .join(",");
2399 format!("{{{inter}}}")
2400}
2401
2402fn is_named_type(typ: Option<&str>) -> bool {
2403 matches!(
2404 typ,
2405 Some("record") | Some("enum") | Some("fixed") | Some("ref")
2406 )
2407}
2408
2409fn pcf_array(arr: &[Value], defined_names: &mut HashSet<String>) -> String {
2410 let inter = arr
2411 .iter()
2412 .map(|a| parsing_canonical_form(a, defined_names))
2413 .collect::<Vec<String>>()
2414 .join(",");
2415 format!("[{inter}]")
2416}
2417
2418fn pcf_string(s: &str) -> String {
2419 format!("\"{s}\"")
2420}
2421
2422const RESERVED_FIELDS: &[&str] = &[
2423 "name",
2424 "type",
2425 "fields",
2426 "symbols",
2427 "items",
2428 "values",
2429 "size",
2430 "logicalType",
2431 "order",
2432 "doc",
2433 "aliases",
2434 "default",
2435 "precision",
2436 "scale",
2437];
2438
2439fn field_ordering_position(field: &str) -> Option<usize> {
2441 RESERVED_FIELDS
2442 .iter()
2443 .position(|&f| f == field)
2444 .map(|pos| pos + 1)
2445}
2446
2447pub trait AvroSchema {
2452 fn get_schema() -> Schema;
2453}
2454
2455#[cfg(feature = "derive")]
2456pub mod derive {
2457 use super::*;
2458 use std::borrow::Cow;
2459
2460 pub trait AvroSchemaComponent {
2509 fn get_schema_in_ctxt(named_schemas: &mut Names, enclosing_namespace: &Namespace)
2510 -> Schema;
2511 }
2512
2513 impl<T> AvroSchema for T
2514 where
2515 T: AvroSchemaComponent,
2516 {
2517 fn get_schema() -> Schema {
2518 T::get_schema_in_ctxt(&mut HashMap::default(), &None)
2519 }
2520 }
2521
2522 macro_rules! impl_schema (
2523 ($type:ty, $variant_constructor:expr) => (
2524 impl AvroSchemaComponent for $type {
2525 fn get_schema_in_ctxt(_: &mut Names, _: &Namespace) -> Schema {
2526 $variant_constructor
2527 }
2528 }
2529 );
2530 );
2531
2532 impl_schema!(bool, Schema::Boolean);
2533 impl_schema!(i8, Schema::Int);
2534 impl_schema!(i16, Schema::Int);
2535 impl_schema!(i32, Schema::Int);
2536 impl_schema!(i64, Schema::Long);
2537 impl_schema!(u8, Schema::Int);
2538 impl_schema!(u16, Schema::Int);
2539 impl_schema!(u32, Schema::Long);
2540 impl_schema!(f32, Schema::Float);
2541 impl_schema!(f64, Schema::Double);
2542 impl_schema!(String, Schema::String);
2543 impl_schema!(uuid::Uuid, Schema::Uuid);
2544 impl_schema!(core::time::Duration, Schema::Duration);
2545
2546 impl<T> AvroSchemaComponent for Vec<T>
2547 where
2548 T: AvroSchemaComponent,
2549 {
2550 fn get_schema_in_ctxt(
2551 named_schemas: &mut Names,
2552 enclosing_namespace: &Namespace,
2553 ) -> Schema {
2554 Schema::array(T::get_schema_in_ctxt(named_schemas, enclosing_namespace))
2555 }
2556 }
2557
2558 impl<T> AvroSchemaComponent for Option<T>
2559 where
2560 T: AvroSchemaComponent,
2561 {
2562 fn get_schema_in_ctxt(
2563 named_schemas: &mut Names,
2564 enclosing_namespace: &Namespace,
2565 ) -> Schema {
2566 let inner_schema = T::get_schema_in_ctxt(named_schemas, enclosing_namespace);
2567 Schema::Union(UnionSchema {
2568 schemas: vec![Schema::Null, inner_schema.clone()],
2569 variant_index: vec![Schema::Null, inner_schema]
2570 .iter()
2571 .enumerate()
2572 .map(|(idx, s)| (SchemaKind::from(s), idx))
2573 .collect(),
2574 })
2575 }
2576 }
2577
2578 impl<T> AvroSchemaComponent for Map<String, T>
2579 where
2580 T: AvroSchemaComponent,
2581 {
2582 fn get_schema_in_ctxt(
2583 named_schemas: &mut Names,
2584 enclosing_namespace: &Namespace,
2585 ) -> Schema {
2586 Schema::map(T::get_schema_in_ctxt(named_schemas, enclosing_namespace))
2587 }
2588 }
2589
2590 impl<T> AvroSchemaComponent for HashMap<String, T>
2591 where
2592 T: AvroSchemaComponent,
2593 {
2594 fn get_schema_in_ctxt(
2595 named_schemas: &mut Names,
2596 enclosing_namespace: &Namespace,
2597 ) -> Schema {
2598 Schema::map(T::get_schema_in_ctxt(named_schemas, enclosing_namespace))
2599 }
2600 }
2601
2602 impl<T> AvroSchemaComponent for Box<T>
2603 where
2604 T: AvroSchemaComponent,
2605 {
2606 fn get_schema_in_ctxt(
2607 named_schemas: &mut Names,
2608 enclosing_namespace: &Namespace,
2609 ) -> Schema {
2610 T::get_schema_in_ctxt(named_schemas, enclosing_namespace)
2611 }
2612 }
2613
2614 impl<T> AvroSchemaComponent for std::sync::Mutex<T>
2615 where
2616 T: AvroSchemaComponent,
2617 {
2618 fn get_schema_in_ctxt(
2619 named_schemas: &mut Names,
2620 enclosing_namespace: &Namespace,
2621 ) -> Schema {
2622 T::get_schema_in_ctxt(named_schemas, enclosing_namespace)
2623 }
2624 }
2625
2626 impl<T> AvroSchemaComponent for Cow<'_, T>
2627 where
2628 T: AvroSchemaComponent + Clone,
2629 {
2630 fn get_schema_in_ctxt(
2631 named_schemas: &mut Names,
2632 enclosing_namespace: &Namespace,
2633 ) -> Schema {
2634 T::get_schema_in_ctxt(named_schemas, enclosing_namespace)
2635 }
2636 }
2637}
2638
2639#[cfg(test)]
2640mod tests {
2641 use super::*;
2642 use crate::{rabin::Rabin, SpecificSingleObjectWriter};
2643 use apache_avro_test_helper::{
2644 logger::{assert_logged, assert_not_logged},
2645 TestResult,
2646 };
2647 use serde_json::json;
2648 use serial_test::serial;
2649 use std::sync::atomic::Ordering;
2650
2651 #[test]
2652 fn test_invalid_schema() {
2653 assert!(Schema::parse_str("invalid").is_err());
2654 }
2655
2656 #[test]
2657 fn test_primitive_schema() -> TestResult {
2658 assert_eq!(Schema::Null, Schema::parse_str("\"null\"")?);
2659 assert_eq!(Schema::Int, Schema::parse_str("\"int\"")?);
2660 assert_eq!(Schema::Double, Schema::parse_str("\"double\"")?);
2661 Ok(())
2662 }
2663
2664 #[test]
2665 fn test_array_schema() -> TestResult {
2666 let schema = Schema::parse_str(r#"{"type": "array", "items": "string"}"#)?;
2667 assert_eq!(Schema::array(Schema::String), schema);
2668 Ok(())
2669 }
2670
2671 #[test]
2672 fn test_map_schema() -> TestResult {
2673 let schema = Schema::parse_str(r#"{"type": "map", "values": "double"}"#)?;
2674 assert_eq!(Schema::map(Schema::Double), schema);
2675 Ok(())
2676 }
2677
2678 #[test]
2679 fn test_union_schema() -> TestResult {
2680 let schema = Schema::parse_str(r#"["null", "int"]"#)?;
2681 assert_eq!(
2682 Schema::Union(UnionSchema::new(vec![Schema::Null, Schema::Int])?),
2683 schema
2684 );
2685 Ok(())
2686 }
2687
2688 #[test]
2689 fn test_union_unsupported_schema() {
2690 let schema = Schema::parse_str(r#"["null", ["null", "int"], "string"]"#);
2691 assert!(schema.is_err());
2692 }
2693
2694 #[test]
2695 fn test_multi_union_schema() -> TestResult {
2696 let schema = Schema::parse_str(r#"["null", "int", "float", "string", "bytes"]"#);
2697 assert!(schema.is_ok());
2698 let schema = schema?;
2699 assert_eq!(SchemaKind::from(&schema), SchemaKind::Union);
2700 let union_schema = match schema {
2701 Schema::Union(u) => u,
2702 _ => unreachable!(),
2703 };
2704 assert_eq!(union_schema.variants().len(), 5);
2705 let mut variants = union_schema.variants().iter();
2706 assert_eq!(SchemaKind::from(variants.next().unwrap()), SchemaKind::Null);
2707 assert_eq!(SchemaKind::from(variants.next().unwrap()), SchemaKind::Int);
2708 assert_eq!(
2709 SchemaKind::from(variants.next().unwrap()),
2710 SchemaKind::Float
2711 );
2712 assert_eq!(
2713 SchemaKind::from(variants.next().unwrap()),
2714 SchemaKind::String
2715 );
2716 assert_eq!(
2717 SchemaKind::from(variants.next().unwrap()),
2718 SchemaKind::Bytes
2719 );
2720 assert_eq!(variants.next(), None);
2721
2722 Ok(())
2723 }
2724
2725 #[test]
2726 fn test_avro_3621_nullable_record_field() -> TestResult {
2727 let nullable_record_field = RecordField::builder()
2728 .name("next".to_string())
2729 .schema(Schema::Union(UnionSchema::new(vec![
2730 Schema::Null,
2731 Schema::Ref {
2732 name: Name {
2733 name: "LongList".to_owned(),
2734 namespace: None,
2735 },
2736 },
2737 ])?))
2738 .order(RecordFieldOrder::Ascending)
2739 .position(1)
2740 .build();
2741
2742 assert!(nullable_record_field.is_nullable());
2743
2744 let non_nullable_record_field = RecordField::builder()
2745 .name("next".to_string())
2746 .default(json!(2))
2747 .schema(Schema::Long)
2748 .order(RecordFieldOrder::Ascending)
2749 .position(1)
2750 .build();
2751
2752 assert!(!non_nullable_record_field.is_nullable());
2753 Ok(())
2754 }
2755
2756 #[test]
2758 fn test_union_of_records() -> TestResult {
2759 use std::iter::FromIterator;
2760
2761 let schema_str_a = r#"{
2763 "name": "A",
2764 "type": "record",
2765 "fields": [
2766 {"name": "field_one", "type": "float"}
2767 ]
2768 }"#;
2769
2770 let schema_str_b = r#"{
2771 "name": "B",
2772 "type": "record",
2773 "fields": [
2774 {"name": "field_one", "type": "float"}
2775 ]
2776 }"#;
2777
2778 let schema_str_c = r#"{
2780 "name": "C",
2781 "type": "record",
2782 "fields": [
2783 {"name": "field_one", "type": ["A", "B"]}
2784 ]
2785 }"#;
2786
2787 let schema_c = Schema::parse_list([schema_str_a, schema_str_b, schema_str_c])?
2788 .last()
2789 .unwrap()
2790 .clone();
2791
2792 let schema_c_expected = Schema::Record(
2793 RecordSchema::builder()
2794 .name(Name::new("C")?)
2795 .fields(vec![RecordField::builder()
2796 .name("field_one".to_string())
2797 .schema(Schema::Union(UnionSchema::new(vec![
2798 Schema::Ref {
2799 name: Name::new("A")?,
2800 },
2801 Schema::Ref {
2802 name: Name::new("B")?,
2803 },
2804 ])?))
2805 .build()])
2806 .lookup(BTreeMap::from_iter(vec![("field_one".to_string(), 0)]))
2807 .build(),
2808 );
2809
2810 assert_eq!(schema_c, schema_c_expected);
2811 Ok(())
2812 }
2813
2814 #[test]
2815 fn avro_rs_104_test_root_union_of_records() -> TestResult {
2816 let schema_str_a = r#"{
2818 "name": "A",
2819 "type": "record",
2820 "fields": [
2821 {"name": "field_one", "type": "float"}
2822 ]
2823 }"#;
2824
2825 let schema_str_b = r#"{
2826 "name": "B",
2827 "type": "record",
2828 "fields": [
2829 {"name": "field_one", "type": "float"}
2830 ]
2831 }"#;
2832
2833 let schema_str_c = r#"["A", "B"]"#;
2834
2835 let (schema_c, schemata) =
2836 Schema::parse_str_with_list(schema_str_c, [schema_str_a, schema_str_b])?;
2837
2838 let schema_a_expected = Schema::Record(RecordSchema {
2839 name: Name::new("A")?,
2840 aliases: None,
2841 doc: None,
2842 fields: vec![RecordField {
2843 name: "field_one".to_string(),
2844 doc: None,
2845 default: None,
2846 aliases: None,
2847 schema: Schema::Float,
2848 order: RecordFieldOrder::Ignore,
2849 position: 0,
2850 custom_attributes: Default::default(),
2851 }],
2852 lookup: BTreeMap::from_iter(vec![("field_one".to_string(), 0)]),
2853 attributes: Default::default(),
2854 });
2855
2856 let schema_b_expected = Schema::Record(RecordSchema {
2857 name: Name::new("B")?,
2858 aliases: None,
2859 doc: None,
2860 fields: vec![RecordField {
2861 name: "field_one".to_string(),
2862 doc: None,
2863 default: None,
2864 aliases: None,
2865 schema: Schema::Float,
2866 order: RecordFieldOrder::Ignore,
2867 position: 0,
2868 custom_attributes: Default::default(),
2869 }],
2870 lookup: BTreeMap::from_iter(vec![("field_one".to_string(), 0)]),
2871 attributes: Default::default(),
2872 });
2873
2874 let schema_c_expected = Schema::Union(UnionSchema::new(vec![
2875 Schema::Ref {
2876 name: Name::new("A")?,
2877 },
2878 Schema::Ref {
2879 name: Name::new("B")?,
2880 },
2881 ])?);
2882
2883 assert_eq!(schema_c, schema_c_expected);
2884 assert_eq!(schemata[0], schema_a_expected);
2885 assert_eq!(schemata[1], schema_b_expected);
2886
2887 Ok(())
2888 }
2889
2890 #[test]
2891 fn avro_rs_104_test_root_union_of_records_name_collision() -> TestResult {
2892 let schema_str_a1 = r#"{
2894 "name": "A",
2895 "type": "record",
2896 "fields": [
2897 {"name": "field_one", "type": "float"}
2898 ]
2899 }"#;
2900
2901 let schema_str_a2 = r#"{
2902 "name": "A",
2903 "type": "record",
2904 "fields": [
2905 {"name": "field_one", "type": "float"}
2906 ]
2907 }"#;
2908
2909 let schema_str_c = r#"["A", "A"]"#;
2910
2911 match Schema::parse_str_with_list(schema_str_c, [schema_str_a1, schema_str_a2]) {
2912 Ok(_) => unreachable!("Expected an error that the name is already defined"),
2913 Err(e) => assert_eq!(
2914 e.to_string(),
2915 "Two schemas with the same fullname were given: \"A\""
2916 ),
2917 }
2918
2919 Ok(())
2920 }
2921
2922 #[test]
2923 fn avro_rs_104_test_root_union_of_records_no_name() -> TestResult {
2924 let schema_str_a = r#"{
2925 "name": "A",
2926 "type": "record",
2927 "fields": [
2928 {"name": "field_one", "type": "float"}
2929 ]
2930 }"#;
2931
2932 let schema_str_b = r#"{
2934 "type": "record",
2935 "fields": [
2936 {"name": "field_one", "type": "float"}
2937 ]
2938 }"#;
2939
2940 let schema_str_c = r#"["A", "A"]"#;
2941
2942 match Schema::parse_str_with_list(schema_str_c, [schema_str_a, schema_str_b]) {
2943 Ok(_) => unreachable!("Expected an error that schema_str_b is missing a name field"),
2944 Err(e) => assert_eq!(e.to_string(), "No `name` field"),
2945 }
2946
2947 Ok(())
2948 }
2949
2950 #[test]
2951 fn avro_3584_test_recursion_records() -> TestResult {
2952 let schema_str_a = r#"{
2954 "name": "A",
2955 "type": "record",
2956 "fields": [ {"name": "field_one", "type": "B"} ]
2957 }"#;
2958
2959 let schema_str_b = r#"{
2960 "name": "B",
2961 "type": "record",
2962 "fields": [ {"name": "field_one", "type": "A"} ]
2963 }"#;
2964
2965 let list = Schema::parse_list([schema_str_a, schema_str_b])?;
2966
2967 let schema_a = list.first().unwrap().clone();
2968
2969 match schema_a {
2970 Schema::Record(RecordSchema { fields, .. }) => {
2971 let f1 = fields.first();
2972
2973 let ref_schema = Schema::Ref {
2974 name: Name::new("B")?,
2975 };
2976 assert_eq!(ref_schema, f1.unwrap().schema);
2977 }
2978 _ => panic!("Expected a record schema!"),
2979 }
2980
2981 Ok(())
2982 }
2983
2984 #[test]
2985 fn test_avro_3248_nullable_record() -> TestResult {
2986 use std::iter::FromIterator;
2987
2988 let schema_str_a = r#"{
2989 "name": "A",
2990 "type": "record",
2991 "fields": [
2992 {"name": "field_one", "type": "float"}
2993 ]
2994 }"#;
2995
2996 let schema_str_option_a = r#"{
2998 "name": "OptionA",
2999 "type": "record",
3000 "fields": [
3001 {"name": "field_one", "type": ["null", "A"], "default": null}
3002 ]
3003 }"#;
3004
3005 let schema_option_a = Schema::parse_list([schema_str_a, schema_str_option_a])?
3006 .last()
3007 .unwrap()
3008 .clone();
3009
3010 let schema_option_a_expected = Schema::Record(RecordSchema {
3011 name: Name::new("OptionA")?,
3012 aliases: None,
3013 doc: None,
3014 fields: vec![RecordField {
3015 name: "field_one".to_string(),
3016 doc: None,
3017 default: Some(Value::Null),
3018 aliases: None,
3019 schema: Schema::Union(UnionSchema::new(vec![
3020 Schema::Null,
3021 Schema::Ref {
3022 name: Name::new("A")?,
3023 },
3024 ])?),
3025 order: RecordFieldOrder::Ignore,
3026 position: 0,
3027 custom_attributes: Default::default(),
3028 }],
3029 lookup: BTreeMap::from_iter(vec![("field_one".to_string(), 0)]),
3030 attributes: Default::default(),
3031 });
3032
3033 assert_eq!(schema_option_a, schema_option_a_expected);
3034
3035 Ok(())
3036 }
3037
3038 #[test]
3039 fn test_record_schema() -> TestResult {
3040 let parsed = Schema::parse_str(
3041 r#"
3042 {
3043 "type": "record",
3044 "name": "test",
3045 "fields": [
3046 {"name": "a", "type": "long", "default": 42},
3047 {"name": "b", "type": "string"}
3048 ]
3049 }
3050 "#,
3051 )?;
3052
3053 let mut lookup = BTreeMap::new();
3054 lookup.insert("a".to_owned(), 0);
3055 lookup.insert("b".to_owned(), 1);
3056
3057 let expected = Schema::Record(RecordSchema {
3058 name: Name::new("test")?,
3059 aliases: None,
3060 doc: None,
3061 fields: vec![
3062 RecordField {
3063 name: "a".to_string(),
3064 doc: None,
3065 default: Some(Value::Number(42i64.into())),
3066 aliases: None,
3067 schema: Schema::Long,
3068 order: RecordFieldOrder::Ascending,
3069 position: 0,
3070 custom_attributes: Default::default(),
3071 },
3072 RecordField {
3073 name: "b".to_string(),
3074 doc: None,
3075 default: None,
3076 aliases: None,
3077 schema: Schema::String,
3078 order: RecordFieldOrder::Ascending,
3079 position: 1,
3080 custom_attributes: Default::default(),
3081 },
3082 ],
3083 lookup,
3084 attributes: Default::default(),
3085 });
3086
3087 assert_eq!(parsed, expected);
3088
3089 Ok(())
3090 }
3091
3092 #[test]
3093 fn test_avro_3302_record_schema_with_currently_parsing_schema() -> TestResult {
3094 let schema = Schema::parse_str(
3095 r#"
3096 {
3097 "type": "record",
3098 "name": "test",
3099 "fields": [{
3100 "name": "recordField",
3101 "type": {
3102 "type": "record",
3103 "name": "Node",
3104 "fields": [
3105 {"name": "label", "type": "string"},
3106 {"name": "children", "type": {"type": "array", "items": "Node"}}
3107 ]
3108 }
3109 }]
3110 }
3111 "#,
3112 )?;
3113
3114 let mut lookup = BTreeMap::new();
3115 lookup.insert("recordField".to_owned(), 0);
3116
3117 let mut node_lookup = BTreeMap::new();
3118 node_lookup.insert("children".to_owned(), 1);
3119 node_lookup.insert("label".to_owned(), 0);
3120
3121 let expected = Schema::Record(RecordSchema {
3122 name: Name::new("test")?,
3123 aliases: None,
3124 doc: None,
3125 fields: vec![RecordField {
3126 name: "recordField".to_string(),
3127 doc: None,
3128 default: None,
3129 aliases: None,
3130 schema: Schema::Record(RecordSchema {
3131 name: Name::new("Node")?,
3132 aliases: None,
3133 doc: None,
3134 fields: vec![
3135 RecordField {
3136 name: "label".to_string(),
3137 doc: None,
3138 default: None,
3139 aliases: None,
3140 schema: Schema::String,
3141 order: RecordFieldOrder::Ascending,
3142 position: 0,
3143 custom_attributes: Default::default(),
3144 },
3145 RecordField {
3146 name: "children".to_string(),
3147 doc: None,
3148 default: None,
3149 aliases: None,
3150 schema: Schema::array(Schema::Ref {
3151 name: Name::new("Node")?,
3152 }),
3153 order: RecordFieldOrder::Ascending,
3154 position: 1,
3155 custom_attributes: Default::default(),
3156 },
3157 ],
3158 lookup: node_lookup,
3159 attributes: Default::default(),
3160 }),
3161 order: RecordFieldOrder::Ascending,
3162 position: 0,
3163 custom_attributes: Default::default(),
3164 }],
3165 lookup,
3166 attributes: Default::default(),
3167 });
3168 assert_eq!(schema, expected);
3169
3170 let canonical_form = &schema.canonical_form();
3171 let expected = r#"{"name":"test","type":"record","fields":[{"name":"recordField","type":{"name":"Node","type":"record","fields":[{"name":"label","type":"string"},{"name":"children","type":{"type":"array","items":"Node"}}]}}]}"#;
3172 assert_eq!(canonical_form, &expected);
3173
3174 Ok(())
3175 }
3176
3177 #[test]
3179 fn test_parsing_of_recursive_type_enum() -> TestResult {
3180 let schema = r#"
3181 {
3182 "type": "record",
3183 "name": "User",
3184 "namespace": "office",
3185 "fields": [
3186 {
3187 "name": "details",
3188 "type": [
3189 {
3190 "type": "record",
3191 "name": "Employee",
3192 "fields": [
3193 {
3194 "name": "gender",
3195 "type": {
3196 "type": "enum",
3197 "name": "Gender",
3198 "symbols": [
3199 "male",
3200 "female"
3201 ]
3202 },
3203 "default": "female"
3204 }
3205 ]
3206 },
3207 {
3208 "type": "record",
3209 "name": "Manager",
3210 "fields": [
3211 {
3212 "name": "gender",
3213 "type": "Gender"
3214 }
3215 ]
3216 }
3217 ]
3218 }
3219 ]
3220 }
3221 "#;
3222
3223 let schema = Schema::parse_str(schema)?;
3224 let schema_str = schema.canonical_form();
3225 let expected = r#"{"name":"office.User","type":"record","fields":[{"name":"details","type":[{"name":"office.Employee","type":"record","fields":[{"name":"gender","type":{"name":"office.Gender","type":"enum","symbols":["male","female"]}}]},{"name":"office.Manager","type":"record","fields":[{"name":"gender","type":"office.Gender"}]}]}]}"#;
3226 assert_eq!(schema_str, expected);
3227
3228 Ok(())
3229 }
3230
3231 #[test]
3232 fn test_parsing_of_recursive_type_fixed() -> TestResult {
3233 let schema = r#"
3234 {
3235 "type": "record",
3236 "name": "User",
3237 "namespace": "office",
3238 "fields": [
3239 {
3240 "name": "details",
3241 "type": [
3242 {
3243 "type": "record",
3244 "name": "Employee",
3245 "fields": [
3246 {
3247 "name": "id",
3248 "type": {
3249 "type": "fixed",
3250 "name": "EmployeeId",
3251 "size": 16
3252 },
3253 "default": "female"
3254 }
3255 ]
3256 },
3257 {
3258 "type": "record",
3259 "name": "Manager",
3260 "fields": [
3261 {
3262 "name": "id",
3263 "type": "EmployeeId"
3264 }
3265 ]
3266 }
3267 ]
3268 }
3269 ]
3270 }
3271 "#;
3272
3273 let schema = Schema::parse_str(schema)?;
3274 let schema_str = schema.canonical_form();
3275 let expected = r#"{"name":"office.User","type":"record","fields":[{"name":"details","type":[{"name":"office.Employee","type":"record","fields":[{"name":"id","type":{"name":"office.EmployeeId","type":"fixed","size":16}}]},{"name":"office.Manager","type":"record","fields":[{"name":"id","type":"office.EmployeeId"}]}]}]}"#;
3276 assert_eq!(schema_str, expected);
3277
3278 Ok(())
3279 }
3280
3281 #[test]
3282 fn test_avro_3302_record_schema_with_currently_parsing_schema_aliases() -> TestResult {
3283 let schema = Schema::parse_str(
3284 r#"
3285 {
3286 "type": "record",
3287 "name": "LongList",
3288 "aliases": ["LinkedLongs"],
3289 "fields" : [
3290 {"name": "value", "type": "long"},
3291 {"name": "next", "type": ["null", "LinkedLongs"]}
3292 ]
3293 }
3294 "#,
3295 )?;
3296
3297 let mut lookup = BTreeMap::new();
3298 lookup.insert("value".to_owned(), 0);
3299 lookup.insert("next".to_owned(), 1);
3300
3301 let expected = Schema::Record(RecordSchema {
3302 name: Name {
3303 name: "LongList".to_owned(),
3304 namespace: None,
3305 },
3306 aliases: Some(vec![Alias::new("LinkedLongs").unwrap()]),
3307 doc: None,
3308 fields: vec![
3309 RecordField {
3310 name: "value".to_string(),
3311 doc: None,
3312 default: None,
3313 aliases: None,
3314 schema: Schema::Long,
3315 order: RecordFieldOrder::Ascending,
3316 position: 0,
3317 custom_attributes: Default::default(),
3318 },
3319 RecordField {
3320 name: "next".to_string(),
3321 doc: None,
3322 default: None,
3323 aliases: None,
3324 schema: Schema::Union(UnionSchema::new(vec![
3325 Schema::Null,
3326 Schema::Ref {
3327 name: Name {
3328 name: "LongList".to_owned(),
3329 namespace: None,
3330 },
3331 },
3332 ])?),
3333 order: RecordFieldOrder::Ascending,
3334 position: 1,
3335 custom_attributes: Default::default(),
3336 },
3337 ],
3338 lookup,
3339 attributes: Default::default(),
3340 });
3341 assert_eq!(schema, expected);
3342
3343 let canonical_form = &schema.canonical_form();
3344 let expected = r#"{"name":"LongList","type":"record","fields":[{"name":"value","type":"long"},{"name":"next","type":["null","LongList"]}]}"#;
3345 assert_eq!(canonical_form, &expected);
3346
3347 Ok(())
3348 }
3349
3350 #[test]
3351 fn test_avro_3370_record_schema_with_currently_parsing_schema_named_record() -> TestResult {
3352 let schema = Schema::parse_str(
3353 r#"
3354 {
3355 "type" : "record",
3356 "name" : "record",
3357 "fields" : [
3358 { "name" : "value", "type" : "long" },
3359 { "name" : "next", "type" : "record" }
3360 ]
3361 }
3362 "#,
3363 )?;
3364
3365 let mut lookup = BTreeMap::new();
3366 lookup.insert("value".to_owned(), 0);
3367 lookup.insert("next".to_owned(), 1);
3368
3369 let expected = Schema::Record(RecordSchema {
3370 name: Name {
3371 name: "record".to_owned(),
3372 namespace: None,
3373 },
3374 aliases: None,
3375 doc: None,
3376 fields: vec![
3377 RecordField {
3378 name: "value".to_string(),
3379 doc: None,
3380 default: None,
3381 aliases: None,
3382 schema: Schema::Long,
3383 order: RecordFieldOrder::Ascending,
3384 position: 0,
3385 custom_attributes: Default::default(),
3386 },
3387 RecordField {
3388 name: "next".to_string(),
3389 doc: None,
3390 default: None,
3391 aliases: None,
3392 schema: Schema::Ref {
3393 name: Name {
3394 name: "record".to_owned(),
3395 namespace: None,
3396 },
3397 },
3398 order: RecordFieldOrder::Ascending,
3399 position: 1,
3400 custom_attributes: Default::default(),
3401 },
3402 ],
3403 lookup,
3404 attributes: Default::default(),
3405 });
3406 assert_eq!(schema, expected);
3407
3408 let canonical_form = &schema.canonical_form();
3409 let expected = r#"{"name":"record","type":"record","fields":[{"name":"value","type":"long"},{"name":"next","type":"record"}]}"#;
3410 assert_eq!(canonical_form, &expected);
3411
3412 Ok(())
3413 }
3414
3415 #[test]
3416 fn test_avro_3370_record_schema_with_currently_parsing_schema_named_enum() -> TestResult {
3417 let schema = Schema::parse_str(
3418 r#"
3419 {
3420 "type" : "record",
3421 "name" : "record",
3422 "fields" : [
3423 {
3424 "type" : "enum",
3425 "name" : "enum",
3426 "symbols": ["one", "two", "three"]
3427 },
3428 { "name" : "next", "type" : "enum" }
3429 ]
3430 }
3431 "#,
3432 )?;
3433
3434 let mut lookup = BTreeMap::new();
3435 lookup.insert("enum".to_owned(), 0);
3436 lookup.insert("next".to_owned(), 1);
3437
3438 let expected = Schema::Record(RecordSchema {
3439 name: Name {
3440 name: "record".to_owned(),
3441 namespace: None,
3442 },
3443 aliases: None,
3444 doc: None,
3445 fields: vec![
3446 RecordField {
3447 name: "enum".to_string(),
3448 doc: None,
3449 default: None,
3450 aliases: None,
3451 schema: Schema::Enum(
3452 EnumSchema::builder()
3453 .name(Name::new("enum")?)
3454 .symbols(vec![
3455 "one".to_string(),
3456 "two".to_string(),
3457 "three".to_string(),
3458 ])
3459 .build(),
3460 ),
3461 order: RecordFieldOrder::Ascending,
3462 position: 0,
3463 custom_attributes: Default::default(),
3464 },
3465 RecordField {
3466 name: "next".to_string(),
3467 doc: None,
3468 default: None,
3469 aliases: None,
3470 schema: Schema::Enum(EnumSchema {
3471 name: Name {
3472 name: "enum".to_owned(),
3473 namespace: None,
3474 },
3475 aliases: None,
3476 doc: None,
3477 symbols: vec!["one".to_string(), "two".to_string(), "three".to_string()],
3478 default: None,
3479 attributes: Default::default(),
3480 }),
3481 order: RecordFieldOrder::Ascending,
3482 position: 1,
3483 custom_attributes: Default::default(),
3484 },
3485 ],
3486 lookup,
3487 attributes: Default::default(),
3488 });
3489 assert_eq!(schema, expected);
3490
3491 let canonical_form = &schema.canonical_form();
3492 let expected = r#"{"name":"record","type":"record","fields":[{"name":"enum","type":{"name":"enum","type":"enum","symbols":["one","two","three"]}},{"name":"next","type":"enum"}]}"#;
3493 assert_eq!(canonical_form, &expected);
3494
3495 Ok(())
3496 }
3497
3498 #[test]
3499 fn test_avro_3370_record_schema_with_currently_parsing_schema_named_fixed() -> TestResult {
3500 let schema = Schema::parse_str(
3501 r#"
3502 {
3503 "type" : "record",
3504 "name" : "record",
3505 "fields" : [
3506 {
3507 "type" : "fixed",
3508 "name" : "fixed",
3509 "size": 456
3510 },
3511 { "name" : "next", "type" : "fixed" }
3512 ]
3513 }
3514 "#,
3515 )?;
3516
3517 let mut lookup = BTreeMap::new();
3518 lookup.insert("fixed".to_owned(), 0);
3519 lookup.insert("next".to_owned(), 1);
3520
3521 let expected = Schema::Record(RecordSchema {
3522 name: Name {
3523 name: "record".to_owned(),
3524 namespace: None,
3525 },
3526 aliases: None,
3527 doc: None,
3528 fields: vec![
3529 RecordField {
3530 name: "fixed".to_string(),
3531 doc: None,
3532 default: None,
3533 aliases: None,
3534 schema: Schema::Fixed(FixedSchema {
3535 name: Name {
3536 name: "fixed".to_owned(),
3537 namespace: None,
3538 },
3539 aliases: None,
3540 doc: None,
3541 size: 456,
3542 default: None,
3543 attributes: Default::default(),
3544 }),
3545 order: RecordFieldOrder::Ascending,
3546 position: 0,
3547 custom_attributes: Default::default(),
3548 },
3549 RecordField {
3550 name: "next".to_string(),
3551 doc: None,
3552 default: None,
3553 aliases: None,
3554 schema: Schema::Fixed(FixedSchema {
3555 name: Name {
3556 name: "fixed".to_owned(),
3557 namespace: None,
3558 },
3559 aliases: None,
3560 doc: None,
3561 size: 456,
3562 default: None,
3563 attributes: Default::default(),
3564 }),
3565 order: RecordFieldOrder::Ascending,
3566 position: 1,
3567 custom_attributes: Default::default(),
3568 },
3569 ],
3570 lookup,
3571 attributes: Default::default(),
3572 });
3573 assert_eq!(schema, expected);
3574
3575 let canonical_form = &schema.canonical_form();
3576 let expected = r#"{"name":"record","type":"record","fields":[{"name":"fixed","type":{"name":"fixed","type":"fixed","size":456}},{"name":"next","type":"fixed"}]}"#;
3577 assert_eq!(canonical_form, &expected);
3578
3579 Ok(())
3580 }
3581
3582 #[test]
3583 fn test_enum_schema() -> TestResult {
3584 let schema = Schema::parse_str(
3585 r#"{"type": "enum", "name": "Suit", "symbols": ["diamonds", "spades", "clubs", "hearts"]}"#,
3586 )?;
3587
3588 let expected = Schema::Enum(EnumSchema {
3589 name: Name::new("Suit")?,
3590 aliases: None,
3591 doc: None,
3592 symbols: vec![
3593 "diamonds".to_owned(),
3594 "spades".to_owned(),
3595 "clubs".to_owned(),
3596 "hearts".to_owned(),
3597 ],
3598 default: None,
3599 attributes: Default::default(),
3600 });
3601
3602 assert_eq!(expected, schema);
3603
3604 Ok(())
3605 }
3606
3607 #[test]
3608 fn test_enum_schema_duplicate() -> TestResult {
3609 let schema = Schema::parse_str(
3611 r#"{"type": "enum", "name": "Suit", "symbols": ["diamonds", "spades", "clubs", "diamonds"]}"#,
3612 );
3613 assert!(schema.is_err());
3614
3615 Ok(())
3616 }
3617
3618 #[test]
3619 fn test_enum_schema_name() -> TestResult {
3620 let schema = Schema::parse_str(
3622 r#"{"type": "enum", "name": "Enum", "symbols": ["0000", "variant"]}"#,
3623 );
3624 assert!(schema.is_err());
3625
3626 Ok(())
3627 }
3628
3629 #[test]
3630 fn test_fixed_schema() -> TestResult {
3631 let schema = Schema::parse_str(r#"{"type": "fixed", "name": "test", "size": 16}"#)?;
3632
3633 let expected = Schema::Fixed(FixedSchema {
3634 name: Name::new("test")?,
3635 aliases: None,
3636 doc: None,
3637 size: 16_usize,
3638 default: None,
3639 attributes: Default::default(),
3640 });
3641
3642 assert_eq!(expected, schema);
3643
3644 Ok(())
3645 }
3646
3647 #[test]
3648 fn test_fixed_schema_with_documentation() -> TestResult {
3649 let schema = Schema::parse_str(
3650 r#"{"type": "fixed", "name": "test", "size": 16, "doc": "FixedSchema documentation"}"#,
3651 )?;
3652
3653 let expected = Schema::Fixed(FixedSchema {
3654 name: Name::new("test")?,
3655 aliases: None,
3656 doc: Some(String::from("FixedSchema documentation")),
3657 size: 16_usize,
3658 default: None,
3659 attributes: Default::default(),
3660 });
3661
3662 assert_eq!(expected, schema);
3663
3664 Ok(())
3665 }
3666
3667 #[test]
3668 fn test_no_documentation() -> TestResult {
3669 let schema = Schema::parse_str(
3670 r#"{"type": "enum", "name": "Coin", "symbols": ["heads", "tails"]}"#,
3671 )?;
3672
3673 let doc = match schema {
3674 Schema::Enum(EnumSchema { doc, .. }) => doc,
3675 _ => unreachable!(),
3676 };
3677
3678 assert!(doc.is_none());
3679
3680 Ok(())
3681 }
3682
3683 #[test]
3684 fn test_documentation() -> TestResult {
3685 let schema = Schema::parse_str(
3686 r#"{"type": "enum", "name": "Coin", "doc": "Some documentation", "symbols": ["heads", "tails"]}"#,
3687 )?;
3688
3689 let doc = match schema {
3690 Schema::Enum(EnumSchema { doc, .. }) => doc,
3691 _ => None,
3692 };
3693
3694 assert_eq!("Some documentation".to_owned(), doc.unwrap());
3695
3696 Ok(())
3697 }
3698
3699 #[test]
3702 fn test_schema_is_send() {
3703 fn send<S: Send>(_s: S) {}
3704
3705 let schema = Schema::Null;
3706 send(schema);
3707 }
3708
3709 #[test]
3710 fn test_schema_is_sync() {
3711 fn sync<S: Sync>(_s: S) {}
3712
3713 let schema = Schema::Null;
3714 sync(&schema);
3715 sync(schema);
3716 }
3717
3718 #[test]
3719 fn test_schema_fingerprint() -> TestResult {
3720 use crate::rabin::Rabin;
3721 use md5::Md5;
3722 use sha2::Sha256;
3723
3724 let raw_schema = r#"
3725 {
3726 "type": "record",
3727 "name": "test",
3728 "fields": [
3729 {"name": "a", "type": "long", "default": 42},
3730 {"name": "b", "type": "string"},
3731 {"name": "c", "type": "long", "logicalType": "timestamp-micros"}
3732 ]
3733 }
3734"#;
3735
3736 let schema = Schema::parse_str(raw_schema)?;
3737 assert_eq!(
3738 "7eb3b28d73dfc99bdd9af1848298b40804a2f8ad5d2642be2ecc2ad34842b987",
3739 format!("{}", schema.fingerprint::<Sha256>())
3740 );
3741
3742 assert_eq!(
3743 "cb11615e412ee5d872620d8df78ff6ae",
3744 format!("{}", schema.fingerprint::<Md5>())
3745 );
3746 assert_eq!(
3747 "92f2ccef718c6754",
3748 format!("{}", schema.fingerprint::<Rabin>())
3749 );
3750
3751 Ok(())
3752 }
3753
3754 #[test]
3755 fn test_logical_types() -> TestResult {
3756 let schema = Schema::parse_str(r#"{"type": "int", "logicalType": "date"}"#)?;
3757 assert_eq!(schema, Schema::Date);
3758
3759 let schema = Schema::parse_str(r#"{"type": "long", "logicalType": "timestamp-micros"}"#)?;
3760 assert_eq!(schema, Schema::TimestampMicros);
3761
3762 Ok(())
3763 }
3764
3765 #[test]
3766 fn test_nullable_logical_type() -> TestResult {
3767 let schema = Schema::parse_str(
3768 r#"{"type": ["null", {"type": "long", "logicalType": "timestamp-micros"}]}"#,
3769 )?;
3770 assert_eq!(
3771 schema,
3772 Schema::Union(UnionSchema::new(vec![
3773 Schema::Null,
3774 Schema::TimestampMicros,
3775 ])?)
3776 );
3777
3778 Ok(())
3779 }
3780
3781 #[test]
3782 fn record_field_order_from_str() -> TestResult {
3783 use std::str::FromStr;
3784
3785 assert_eq!(
3786 RecordFieldOrder::from_str("ascending").unwrap(),
3787 RecordFieldOrder::Ascending
3788 );
3789 assert_eq!(
3790 RecordFieldOrder::from_str("descending").unwrap(),
3791 RecordFieldOrder::Descending
3792 );
3793 assert_eq!(
3794 RecordFieldOrder::from_str("ignore").unwrap(),
3795 RecordFieldOrder::Ignore
3796 );
3797 assert!(RecordFieldOrder::from_str("not an ordering").is_err());
3798
3799 Ok(())
3800 }
3801
3802 #[test]
3803 fn test_avro_3374_preserve_namespace_for_primitive() -> TestResult {
3804 let schema = Schema::parse_str(
3805 r#"
3806 {
3807 "type" : "record",
3808 "name" : "ns.int",
3809 "fields" : [
3810 {"name" : "value", "type" : "int"},
3811 {"name" : "next", "type" : [ "null", "ns.int" ]}
3812 ]
3813 }
3814 "#,
3815 )?;
3816
3817 let json = schema.canonical_form();
3818 assert_eq!(
3819 json,
3820 r#"{"name":"ns.int","type":"record","fields":[{"name":"value","type":"int"},{"name":"next","type":["null","ns.int"]}]}"#
3821 );
3822
3823 Ok(())
3824 }
3825
3826 #[test]
3827 fn test_avro_3433_preserve_schema_refs_in_json() -> TestResult {
3828 let schema = r#"
3829 {
3830 "name": "test.test",
3831 "type": "record",
3832 "fields": [
3833 {
3834 "name": "bar",
3835 "type": { "name": "test.foo", "type": "record", "fields": [{ "name": "id", "type": "long" }] }
3836 },
3837 { "name": "baz", "type": "test.foo" }
3838 ]
3839 }
3840 "#;
3841
3842 let schema = Schema::parse_str(schema)?;
3843
3844 let expected = r#"{"name":"test.test","type":"record","fields":[{"name":"bar","type":{"name":"test.foo","type":"record","fields":[{"name":"id","type":"long"}]}},{"name":"baz","type":"test.foo"}]}"#;
3845 assert_eq!(schema.canonical_form(), expected);
3846
3847 Ok(())
3848 }
3849
3850 #[test]
3851 fn test_read_namespace_from_name() -> TestResult {
3852 let schema = r#"
3853 {
3854 "name": "space.name",
3855 "type": "record",
3856 "fields": [
3857 {
3858 "name": "num",
3859 "type": "int"
3860 }
3861 ]
3862 }
3863 "#;
3864
3865 let schema = Schema::parse_str(schema)?;
3866 if let Schema::Record(RecordSchema { name, .. }) = schema {
3867 assert_eq!(name.name, "name");
3868 assert_eq!(name.namespace, Some("space".to_string()));
3869 } else {
3870 panic!("Expected a record schema!");
3871 }
3872
3873 Ok(())
3874 }
3875
3876 #[test]
3877 fn test_namespace_from_name_has_priority_over_from_field() -> TestResult {
3878 let schema = r#"
3879 {
3880 "name": "space1.name",
3881 "namespace": "space2",
3882 "type": "record",
3883 "fields": [
3884 {
3885 "name": "num",
3886 "type": "int"
3887 }
3888 ]
3889 }
3890 "#;
3891
3892 let schema = Schema::parse_str(schema)?;
3893 if let Schema::Record(RecordSchema { name, .. }) = schema {
3894 assert_eq!(name.namespace, Some("space1".to_string()));
3895 } else {
3896 panic!("Expected a record schema!");
3897 }
3898
3899 Ok(())
3900 }
3901
3902 #[test]
3903 fn test_namespace_from_field() -> TestResult {
3904 let schema = r#"
3905 {
3906 "name": "name",
3907 "namespace": "space2",
3908 "type": "record",
3909 "fields": [
3910 {
3911 "name": "num",
3912 "type": "int"
3913 }
3914 ]
3915 }
3916 "#;
3917
3918 let schema = Schema::parse_str(schema)?;
3919 if let Schema::Record(RecordSchema { name, .. }) = schema {
3920 assert_eq!(name.namespace, Some("space2".to_string()));
3921 } else {
3922 panic!("Expected a record schema!");
3923 }
3924
3925 Ok(())
3926 }
3927
3928 #[test]
3929 fn test_namespace_from_name_with_empty_value() -> TestResult {
3931 let name = Name::new(".name")?;
3932 assert_eq!(name.name, "name");
3933 assert_eq!(name.namespace, None);
3934
3935 Ok(())
3936 }
3937
3938 #[test]
3939 fn test_name_with_whitespace_value() {
3941 match Name::new(" ") {
3942 Err(Error::InvalidSchemaName(_, _)) => {}
3943 _ => panic!("Expected an Error::InvalidSchemaName!"),
3944 }
3945 }
3946
3947 #[test]
3948 fn test_name_with_no_name_part() {
3950 match Name::new("space.") {
3951 Err(Error::InvalidSchemaName(_, _)) => {}
3952 _ => panic!("Expected an Error::InvalidSchemaName!"),
3953 }
3954 }
3955
3956 #[test]
3957 fn avro_3448_test_proper_resolution_inner_record_inherited_namespace() -> TestResult {
3958 let schema = r#"
3959 {
3960 "name": "record_name",
3961 "namespace": "space",
3962 "type": "record",
3963 "fields": [
3964 {
3965 "name": "outer_field_1",
3966 "type": [
3967 "null",
3968 {
3969 "type":"record",
3970 "name":"inner_record_name",
3971 "fields":[
3972 {
3973 "name":"inner_field_1",
3974 "type":"double"
3975 }
3976 ]
3977 }
3978 ]
3979 },
3980 {
3981 "name": "outer_field_2",
3982 "type" : "inner_record_name"
3983 }
3984 ]
3985 }
3986 "#;
3987 let schema = Schema::parse_str(schema)?;
3988 let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
3989 assert_eq!(rs.get_names().len(), 2);
3990 for s in &["space.record_name", "space.inner_record_name"] {
3991 assert!(rs.get_names().contains_key(&Name::new(s)?));
3992 }
3993
3994 Ok(())
3995 }
3996
3997 #[test]
3998 fn avro_3448_test_proper_resolution_inner_record_qualified_namespace() -> TestResult {
3999 let schema = r#"
4000 {
4001 "name": "record_name",
4002 "namespace": "space",
4003 "type": "record",
4004 "fields": [
4005 {
4006 "name": "outer_field_1",
4007 "type": [
4008 "null",
4009 {
4010 "type":"record",
4011 "name":"inner_record_name",
4012 "fields":[
4013 {
4014 "name":"inner_field_1",
4015 "type":"double"
4016 }
4017 ]
4018 }
4019 ]
4020 },
4021 {
4022 "name": "outer_field_2",
4023 "type" : "space.inner_record_name"
4024 }
4025 ]
4026 }
4027 "#;
4028 let schema = Schema::parse_str(schema)?;
4029 let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
4030 assert_eq!(rs.get_names().len(), 2);
4031 for s in &["space.record_name", "space.inner_record_name"] {
4032 assert!(rs.get_names().contains_key(&Name::new(s)?));
4033 }
4034
4035 Ok(())
4036 }
4037
4038 #[test]
4039 fn avro_3448_test_proper_resolution_inner_enum_inherited_namespace() -> TestResult {
4040 let schema = r#"
4041 {
4042 "name": "record_name",
4043 "namespace": "space",
4044 "type": "record",
4045 "fields": [
4046 {
4047 "name": "outer_field_1",
4048 "type": [
4049 "null",
4050 {
4051 "type":"enum",
4052 "name":"inner_enum_name",
4053 "symbols":["Extensive","Testing"]
4054 }
4055 ]
4056 },
4057 {
4058 "name": "outer_field_2",
4059 "type" : "inner_enum_name"
4060 }
4061 ]
4062 }
4063 "#;
4064 let schema = Schema::parse_str(schema)?;
4065 let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
4066 assert_eq!(rs.get_names().len(), 2);
4067 for s in &["space.record_name", "space.inner_enum_name"] {
4068 assert!(rs.get_names().contains_key(&Name::new(s)?));
4069 }
4070
4071 Ok(())
4072 }
4073
4074 #[test]
4075 fn avro_3448_test_proper_resolution_inner_enum_qualified_namespace() -> TestResult {
4076 let schema = r#"
4077 {
4078 "name": "record_name",
4079 "namespace": "space",
4080 "type": "record",
4081 "fields": [
4082 {
4083 "name": "outer_field_1",
4084 "type": [
4085 "null",
4086 {
4087 "type":"enum",
4088 "name":"inner_enum_name",
4089 "symbols":["Extensive","Testing"]
4090 }
4091 ]
4092 },
4093 {
4094 "name": "outer_field_2",
4095 "type" : "space.inner_enum_name"
4096 }
4097 ]
4098 }
4099 "#;
4100 let schema = Schema::parse_str(schema)?;
4101 let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
4102 assert_eq!(rs.get_names().len(), 2);
4103 for s in &["space.record_name", "space.inner_enum_name"] {
4104 assert!(rs.get_names().contains_key(&Name::new(s)?));
4105 }
4106
4107 Ok(())
4108 }
4109
4110 #[test]
4111 fn avro_3448_test_proper_resolution_inner_fixed_inherited_namespace() -> TestResult {
4112 let schema = r#"
4113 {
4114 "name": "record_name",
4115 "namespace": "space",
4116 "type": "record",
4117 "fields": [
4118 {
4119 "name": "outer_field_1",
4120 "type": [
4121 "null",
4122 {
4123 "type":"fixed",
4124 "name":"inner_fixed_name",
4125 "size": 16
4126 }
4127 ]
4128 },
4129 {
4130 "name": "outer_field_2",
4131 "type" : "inner_fixed_name"
4132 }
4133 ]
4134 }
4135 "#;
4136 let schema = Schema::parse_str(schema)?;
4137 let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
4138 assert_eq!(rs.get_names().len(), 2);
4139 for s in &["space.record_name", "space.inner_fixed_name"] {
4140 assert!(rs.get_names().contains_key(&Name::new(s)?));
4141 }
4142
4143 Ok(())
4144 }
4145
4146 #[test]
4147 fn avro_3448_test_proper_resolution_inner_fixed_qualified_namespace() -> TestResult {
4148 let schema = r#"
4149 {
4150 "name": "record_name",
4151 "namespace": "space",
4152 "type": "record",
4153 "fields": [
4154 {
4155 "name": "outer_field_1",
4156 "type": [
4157 "null",
4158 {
4159 "type":"fixed",
4160 "name":"inner_fixed_name",
4161 "size": 16
4162 }
4163 ]
4164 },
4165 {
4166 "name": "outer_field_2",
4167 "type" : "space.inner_fixed_name"
4168 }
4169 ]
4170 }
4171 "#;
4172 let schema = Schema::parse_str(schema)?;
4173 let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
4174 assert_eq!(rs.get_names().len(), 2);
4175 for s in &["space.record_name", "space.inner_fixed_name"] {
4176 assert!(rs.get_names().contains_key(&Name::new(s)?));
4177 }
4178
4179 Ok(())
4180 }
4181
4182 #[test]
4183 fn avro_3448_test_proper_resolution_inner_record_inner_namespace() -> TestResult {
4184 let schema = r#"
4185 {
4186 "name": "record_name",
4187 "namespace": "space",
4188 "type": "record",
4189 "fields": [
4190 {
4191 "name": "outer_field_1",
4192 "type": [
4193 "null",
4194 {
4195 "type":"record",
4196 "name":"inner_record_name",
4197 "namespace":"inner_space",
4198 "fields":[
4199 {
4200 "name":"inner_field_1",
4201 "type":"double"
4202 }
4203 ]
4204 }
4205 ]
4206 },
4207 {
4208 "name": "outer_field_2",
4209 "type" : "inner_space.inner_record_name"
4210 }
4211 ]
4212 }
4213 "#;
4214 let schema = Schema::parse_str(schema)?;
4215 let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
4216 assert_eq!(rs.get_names().len(), 2);
4217 for s in &["space.record_name", "inner_space.inner_record_name"] {
4218 assert!(rs.get_names().contains_key(&Name::new(s)?));
4219 }
4220
4221 Ok(())
4222 }
4223
4224 #[test]
4225 fn avro_3448_test_proper_resolution_inner_enum_inner_namespace() -> TestResult {
4226 let schema = r#"
4227 {
4228 "name": "record_name",
4229 "namespace": "space",
4230 "type": "record",
4231 "fields": [
4232 {
4233 "name": "outer_field_1",
4234 "type": [
4235 "null",
4236 {
4237 "type":"enum",
4238 "name":"inner_enum_name",
4239 "namespace": "inner_space",
4240 "symbols":["Extensive","Testing"]
4241 }
4242 ]
4243 },
4244 {
4245 "name": "outer_field_2",
4246 "type" : "inner_space.inner_enum_name"
4247 }
4248 ]
4249 }
4250 "#;
4251 let schema = Schema::parse_str(schema)?;
4252 let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
4253 assert_eq!(rs.get_names().len(), 2);
4254 for s in &["space.record_name", "inner_space.inner_enum_name"] {
4255 assert!(rs.get_names().contains_key(&Name::new(s)?));
4256 }
4257
4258 Ok(())
4259 }
4260
4261 #[test]
4262 fn avro_3448_test_proper_resolution_inner_fixed_inner_namespace() -> TestResult {
4263 let schema = r#"
4264 {
4265 "name": "record_name",
4266 "namespace": "space",
4267 "type": "record",
4268 "fields": [
4269 {
4270 "name": "outer_field_1",
4271 "type": [
4272 "null",
4273 {
4274 "type":"fixed",
4275 "name":"inner_fixed_name",
4276 "namespace": "inner_space",
4277 "size": 16
4278 }
4279 ]
4280 },
4281 {
4282 "name": "outer_field_2",
4283 "type" : "inner_space.inner_fixed_name"
4284 }
4285 ]
4286 }
4287 "#;
4288 let schema = Schema::parse_str(schema)?;
4289 let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
4290 assert_eq!(rs.get_names().len(), 2);
4291 for s in &["space.record_name", "inner_space.inner_fixed_name"] {
4292 assert!(rs.get_names().contains_key(&Name::new(s)?));
4293 }
4294
4295 Ok(())
4296 }
4297
4298 #[test]
4299 fn avro_3448_test_proper_multi_level_resolution_inner_record_outer_namespace() -> TestResult {
4300 let schema = r#"
4301 {
4302 "name": "record_name",
4303 "namespace": "space",
4304 "type": "record",
4305 "fields": [
4306 {
4307 "name": "outer_field_1",
4308 "type": [
4309 "null",
4310 {
4311 "type":"record",
4312 "name":"middle_record_name",
4313 "fields":[
4314 {
4315 "name":"middle_field_1",
4316 "type":[
4317 "null",
4318 {
4319 "type":"record",
4320 "name":"inner_record_name",
4321 "fields":[
4322 {
4323 "name":"inner_field_1",
4324 "type":"double"
4325 }
4326 ]
4327 }
4328 ]
4329 }
4330 ]
4331 }
4332 ]
4333 },
4334 {
4335 "name": "outer_field_2",
4336 "type" : "space.inner_record_name"
4337 }
4338 ]
4339 }
4340 "#;
4341 let schema = Schema::parse_str(schema)?;
4342 let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
4343 assert_eq!(rs.get_names().len(), 3);
4344 for s in &[
4345 "space.record_name",
4346 "space.middle_record_name",
4347 "space.inner_record_name",
4348 ] {
4349 assert!(rs.get_names().contains_key(&Name::new(s)?));
4350 }
4351
4352 Ok(())
4353 }
4354
4355 #[test]
4356 fn avro_3448_test_proper_multi_level_resolution_inner_record_middle_namespace() -> TestResult {
4357 let schema = r#"
4358 {
4359 "name": "record_name",
4360 "namespace": "space",
4361 "type": "record",
4362 "fields": [
4363 {
4364 "name": "outer_field_1",
4365 "type": [
4366 "null",
4367 {
4368 "type":"record",
4369 "name":"middle_record_name",
4370 "namespace":"middle_namespace",
4371 "fields":[
4372 {
4373 "name":"middle_field_1",
4374 "type":[
4375 "null",
4376 {
4377 "type":"record",
4378 "name":"inner_record_name",
4379 "fields":[
4380 {
4381 "name":"inner_field_1",
4382 "type":"double"
4383 }
4384 ]
4385 }
4386 ]
4387 }
4388 ]
4389 }
4390 ]
4391 },
4392 {
4393 "name": "outer_field_2",
4394 "type" : "middle_namespace.inner_record_name"
4395 }
4396 ]
4397 }
4398 "#;
4399 let schema = Schema::parse_str(schema)?;
4400 let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
4401 assert_eq!(rs.get_names().len(), 3);
4402 for s in &[
4403 "space.record_name",
4404 "middle_namespace.middle_record_name",
4405 "middle_namespace.inner_record_name",
4406 ] {
4407 assert!(rs.get_names().contains_key(&Name::new(s)?));
4408 }
4409
4410 Ok(())
4411 }
4412
4413 #[test]
4414 fn avro_3448_test_proper_multi_level_resolution_inner_record_inner_namespace() -> TestResult {
4415 let schema = r#"
4416 {
4417 "name": "record_name",
4418 "namespace": "space",
4419 "type": "record",
4420 "fields": [
4421 {
4422 "name": "outer_field_1",
4423 "type": [
4424 "null",
4425 {
4426 "type":"record",
4427 "name":"middle_record_name",
4428 "namespace":"middle_namespace",
4429 "fields":[
4430 {
4431 "name":"middle_field_1",
4432 "type":[
4433 "null",
4434 {
4435 "type":"record",
4436 "name":"inner_record_name",
4437 "namespace":"inner_namespace",
4438 "fields":[
4439 {
4440 "name":"inner_field_1",
4441 "type":"double"
4442 }
4443 ]
4444 }
4445 ]
4446 }
4447 ]
4448 }
4449 ]
4450 },
4451 {
4452 "name": "outer_field_2",
4453 "type" : "inner_namespace.inner_record_name"
4454 }
4455 ]
4456 }
4457 "#;
4458 let schema = Schema::parse_str(schema)?;
4459 let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
4460 assert_eq!(rs.get_names().len(), 3);
4461 for s in &[
4462 "space.record_name",
4463 "middle_namespace.middle_record_name",
4464 "inner_namespace.inner_record_name",
4465 ] {
4466 assert!(rs.get_names().contains_key(&Name::new(s)?));
4467 }
4468
4469 Ok(())
4470 }
4471
4472 #[test]
4473 fn avro_3448_test_proper_in_array_resolution_inherited_namespace() -> TestResult {
4474 let schema = r#"
4475 {
4476 "name": "record_name",
4477 "namespace": "space",
4478 "type": "record",
4479 "fields": [
4480 {
4481 "name": "outer_field_1",
4482 "type": {
4483 "type":"array",
4484 "items":{
4485 "type":"record",
4486 "name":"in_array_record",
4487 "fields": [
4488 {
4489 "name":"array_record_field",
4490 "type":"string"
4491 }
4492 ]
4493 }
4494 }
4495 },
4496 {
4497 "name":"outer_field_2",
4498 "type":"in_array_record"
4499 }
4500 ]
4501 }
4502 "#;
4503 let schema = Schema::parse_str(schema)?;
4504 let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
4505 assert_eq!(rs.get_names().len(), 2);
4506 for s in &["space.record_name", "space.in_array_record"] {
4507 assert!(rs.get_names().contains_key(&Name::new(s)?));
4508 }
4509
4510 Ok(())
4511 }
4512
4513 #[test]
4514 fn avro_3448_test_proper_in_map_resolution_inherited_namespace() -> TestResult {
4515 let schema = r#"
4516 {
4517 "name": "record_name",
4518 "namespace": "space",
4519 "type": "record",
4520 "fields": [
4521 {
4522 "name": "outer_field_1",
4523 "type": {
4524 "type":"map",
4525 "values":{
4526 "type":"record",
4527 "name":"in_map_record",
4528 "fields": [
4529 {
4530 "name":"map_record_field",
4531 "type":"string"
4532 }
4533 ]
4534 }
4535 }
4536 },
4537 {
4538 "name":"outer_field_2",
4539 "type":"in_map_record"
4540 }
4541 ]
4542 }
4543 "#;
4544 let schema = Schema::parse_str(schema)?;
4545 let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
4546 assert_eq!(rs.get_names().len(), 2);
4547 for s in &["space.record_name", "space.in_map_record"] {
4548 assert!(rs.get_names().contains_key(&Name::new(s)?));
4549 }
4550
4551 Ok(())
4552 }
4553
4554 #[test]
4555 fn avro_3466_test_to_json_inner_enum_inner_namespace() -> TestResult {
4556 let schema = r#"
4557 {
4558 "name": "record_name",
4559 "namespace": "space",
4560 "type": "record",
4561 "fields": [
4562 {
4563 "name": "outer_field_1",
4564 "type": [
4565 "null",
4566 {
4567 "type":"enum",
4568 "name":"inner_enum_name",
4569 "namespace": "inner_space",
4570 "symbols":["Extensive","Testing"]
4571 }
4572 ]
4573 },
4574 {
4575 "name": "outer_field_2",
4576 "type" : "inner_space.inner_enum_name"
4577 }
4578 ]
4579 }
4580 "#;
4581 let schema = Schema::parse_str(schema)?;
4582 let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
4583
4584 assert_eq!(rs.get_names().len(), 2);
4586 for s in &["space.record_name", "inner_space.inner_enum_name"] {
4587 assert!(rs.get_names().contains_key(&Name::new(s)?));
4588 }
4589
4590 let schema_str = serde_json::to_string(&schema).expect("test failed");
4592 let _schema = Schema::parse_str(&schema_str).expect("test failed");
4593 assert_eq!(schema, _schema);
4594
4595 Ok(())
4596 }
4597
4598 #[test]
4599 fn avro_3466_test_to_json_inner_fixed_inner_namespace() -> TestResult {
4600 let schema = r#"
4601 {
4602 "name": "record_name",
4603 "namespace": "space",
4604 "type": "record",
4605 "fields": [
4606 {
4607 "name": "outer_field_1",
4608 "type": [
4609 "null",
4610 {
4611 "type":"fixed",
4612 "name":"inner_fixed_name",
4613 "namespace": "inner_space",
4614 "size":54
4615 }
4616 ]
4617 },
4618 {
4619 "name": "outer_field_2",
4620 "type" : "inner_space.inner_fixed_name"
4621 }
4622 ]
4623 }
4624 "#;
4625 let schema = Schema::parse_str(schema)?;
4626 let rs = ResolvedSchema::try_from(&schema).expect("Schema didn't successfully parse");
4627
4628 assert_eq!(rs.get_names().len(), 2);
4630 for s in &["space.record_name", "inner_space.inner_fixed_name"] {
4631 assert!(rs.get_names().contains_key(&Name::new(s)?));
4632 }
4633
4634 let schema_str = serde_json::to_string(&schema).expect("test failed");
4636 let _schema = Schema::parse_str(&schema_str).expect("test failed");
4637 assert_eq!(schema, _schema);
4638
4639 Ok(())
4640 }
4641
4642 fn assert_avro_3512_aliases(aliases: &Aliases) {
4643 match aliases {
4644 Some(aliases) => {
4645 assert_eq!(aliases.len(), 3);
4646 assert_eq!(aliases[0], Alias::new("space.b").unwrap());
4647 assert_eq!(aliases[1], Alias::new("x.y").unwrap());
4648 assert_eq!(aliases[2], Alias::new(".c").unwrap());
4649 }
4650 None => {
4651 panic!("'aliases' must be Some");
4652 }
4653 }
4654 }
4655
4656 #[test]
4657 fn avro_3512_alias_with_null_namespace_record() -> TestResult {
4658 let schema = Schema::parse_str(
4659 r#"
4660 {
4661 "type": "record",
4662 "name": "a",
4663 "namespace": "space",
4664 "aliases": ["b", "x.y", ".c"],
4665 "fields" : [
4666 {"name": "time", "type": "long"}
4667 ]
4668 }
4669 "#,
4670 )?;
4671
4672 if let Schema::Record(RecordSchema { ref aliases, .. }) = schema {
4673 assert_avro_3512_aliases(aliases);
4674 } else {
4675 panic!("The Schema should be a record: {schema:?}");
4676 }
4677
4678 Ok(())
4679 }
4680
4681 #[test]
4682 fn avro_3512_alias_with_null_namespace_enum() -> TestResult {
4683 let schema = Schema::parse_str(
4684 r#"
4685 {
4686 "type": "enum",
4687 "name": "a",
4688 "namespace": "space",
4689 "aliases": ["b", "x.y", ".c"],
4690 "symbols" : [
4691 "symbol1", "symbol2"
4692 ]
4693 }
4694 "#,
4695 )?;
4696
4697 if let Schema::Enum(EnumSchema { ref aliases, .. }) = schema {
4698 assert_avro_3512_aliases(aliases);
4699 } else {
4700 panic!("The Schema should be an enum: {schema:?}");
4701 }
4702
4703 Ok(())
4704 }
4705
4706 #[test]
4707 fn avro_3512_alias_with_null_namespace_fixed() -> TestResult {
4708 let schema = Schema::parse_str(
4709 r#"
4710 {
4711 "type": "fixed",
4712 "name": "a",
4713 "namespace": "space",
4714 "aliases": ["b", "x.y", ".c"],
4715 "size" : 12
4716 }
4717 "#,
4718 )?;
4719
4720 if let Schema::Fixed(FixedSchema { ref aliases, .. }) = schema {
4721 assert_avro_3512_aliases(aliases);
4722 } else {
4723 panic!("The Schema should be a fixed: {schema:?}");
4724 }
4725
4726 Ok(())
4727 }
4728
4729 #[test]
4730 fn avro_3518_serialize_aliases_record() -> TestResult {
4731 let schema = Schema::parse_str(
4732 r#"
4733 {
4734 "type": "record",
4735 "name": "a",
4736 "namespace": "space",
4737 "aliases": ["b", "x.y", ".c"],
4738 "fields" : [
4739 {
4740 "name": "time",
4741 "type": "long",
4742 "doc": "The documentation is not serialized",
4743 "default": 123,
4744 "aliases": ["time1", "ns.time2"]
4745 }
4746 ]
4747 }
4748 "#,
4749 )?;
4750
4751 let value = serde_json::to_value(&schema)?;
4752 let serialized = serde_json::to_string(&value)?;
4753 assert_eq!(
4754 r#"{"aliases":["space.b","x.y","c"],"fields":[{"aliases":["time1","ns.time2"],"default":123,"name":"time","type":"long"}],"name":"a","namespace":"space","type":"record"}"#,
4755 &serialized
4756 );
4757 assert_eq!(schema, Schema::parse_str(&serialized)?);
4758
4759 Ok(())
4760 }
4761
4762 #[test]
4763 fn avro_3518_serialize_aliases_enum() -> TestResult {
4764 let schema = Schema::parse_str(
4765 r#"
4766 {
4767 "type": "enum",
4768 "name": "a",
4769 "namespace": "space",
4770 "aliases": ["b", "x.y", ".c"],
4771 "symbols" : [
4772 "symbol1", "symbol2"
4773 ]
4774 }
4775 "#,
4776 )?;
4777
4778 let value = serde_json::to_value(&schema)?;
4779 let serialized = serde_json::to_string(&value)?;
4780 assert_eq!(
4781 r#"{"aliases":["space.b","x.y","c"],"name":"a","namespace":"space","symbols":["symbol1","symbol2"],"type":"enum"}"#,
4782 &serialized
4783 );
4784 assert_eq!(schema, Schema::parse_str(&serialized)?);
4785
4786 Ok(())
4787 }
4788
4789 #[test]
4790 fn avro_3518_serialize_aliases_fixed() -> TestResult {
4791 let schema = Schema::parse_str(
4792 r#"
4793 {
4794 "type": "fixed",
4795 "name": "a",
4796 "namespace": "space",
4797 "aliases": ["b", "x.y", ".c"],
4798 "size" : 12
4799 }
4800 "#,
4801 )?;
4802
4803 let value = serde_json::to_value(&schema)?;
4804 let serialized = serde_json::to_string(&value)?;
4805 assert_eq!(
4806 r#"{"aliases":["space.b","x.y","c"],"name":"a","namespace":"space","size":12,"type":"fixed"}"#,
4807 &serialized
4808 );
4809 assert_eq!(schema, Schema::parse_str(&serialized)?);
4810
4811 Ok(())
4812 }
4813
4814 #[test]
4815 fn avro_3130_parse_anonymous_union_type() -> TestResult {
4816 let schema_str = r#"
4817 {
4818 "type": "record",
4819 "name": "AccountEvent",
4820 "fields": [
4821 {"type":
4822 ["null",
4823 { "name": "accountList",
4824 "type": {
4825 "type": "array",
4826 "items": "long"
4827 }
4828 }
4829 ],
4830 "name":"NullableLongArray"
4831 }
4832 ]
4833 }
4834 "#;
4835 let schema = Schema::parse_str(schema_str)?;
4836
4837 if let Schema::Record(RecordSchema { name, fields, .. }) = schema {
4838 assert_eq!(name, Name::new("AccountEvent")?);
4839
4840 let field = &fields[0];
4841 assert_eq!(&field.name, "NullableLongArray");
4842
4843 if let Schema::Union(ref union) = field.schema {
4844 assert_eq!(union.schemas[0], Schema::Null);
4845
4846 if let Schema::Array(ref array_schema) = union.schemas[1] {
4847 if let Schema::Long = *array_schema.items {
4848 } else {
4850 panic!("Expected a Schema::Array of type Long");
4851 }
4852 } else {
4853 panic!("Expected Schema::Array");
4854 }
4855 } else {
4856 panic!("Expected Schema::Union");
4857 }
4858 } else {
4859 panic!("Expected Schema::Record");
4860 }
4861
4862 Ok(())
4863 }
4864
4865 #[test]
4866 fn avro_custom_attributes_schema_without_attributes() -> TestResult {
4867 let schemata_str = [
4868 r#"
4869 {
4870 "type": "record",
4871 "name": "Rec",
4872 "doc": "A Record schema without custom attributes",
4873 "fields": []
4874 }
4875 "#,
4876 r#"
4877 {
4878 "type": "enum",
4879 "name": "Enum",
4880 "doc": "An Enum schema without custom attributes",
4881 "symbols": []
4882 }
4883 "#,
4884 r#"
4885 {
4886 "type": "fixed",
4887 "name": "Fixed",
4888 "doc": "A Fixed schema without custom attributes",
4889 "size": 0
4890 }
4891 "#,
4892 ];
4893 for schema_str in schemata_str.iter() {
4894 let schema = Schema::parse_str(schema_str)?;
4895 assert_eq!(schema.custom_attributes(), Some(&Default::default()));
4896 }
4897
4898 Ok(())
4899 }
4900
4901 const CUSTOM_ATTRS_SUFFIX: &str = r#"
4902 "string_key": "value",
4903 "number_key": 1.23,
4904 "null_key": null,
4905 "array_key": [1, 2, 3],
4906 "object_key": {
4907 "key": "value"
4908 }
4909 "#;
4910
4911 #[test]
4912 fn avro_3609_custom_attributes_schema_with_attributes() -> TestResult {
4913 let schemata_str = [
4914 r#"
4915 {
4916 "type": "record",
4917 "name": "Rec",
4918 "namespace": "ns",
4919 "doc": "A Record schema with custom attributes",
4920 "fields": [],
4921 {{{}}}
4922 }
4923 "#,
4924 r#"
4925 {
4926 "type": "enum",
4927 "name": "Enum",
4928 "namespace": "ns",
4929 "doc": "An Enum schema with custom attributes",
4930 "symbols": [],
4931 {{{}}}
4932 }
4933 "#,
4934 r#"
4935 {
4936 "type": "fixed",
4937 "name": "Fixed",
4938 "namespace": "ns",
4939 "doc": "A Fixed schema with custom attributes",
4940 "size": 2,
4941 {{{}}}
4942 }
4943 "#,
4944 ];
4945
4946 for schema_str in schemata_str.iter() {
4947 let schema = Schema::parse_str(
4948 schema_str
4949 .to_owned()
4950 .replace("{{{}}}", CUSTOM_ATTRS_SUFFIX)
4951 .as_str(),
4952 )?;
4953
4954 assert_eq!(
4955 schema.custom_attributes(),
4956 Some(&expected_custom_attributes())
4957 );
4958 }
4959
4960 Ok(())
4961 }
4962
4963 fn expected_custom_attributes() -> BTreeMap<String, Value> {
4964 let mut expected_attributes: BTreeMap<String, Value> = Default::default();
4965 expected_attributes.insert("string_key".to_string(), Value::String("value".to_string()));
4966 expected_attributes.insert("number_key".to_string(), json!(1.23));
4967 expected_attributes.insert("null_key".to_string(), Value::Null);
4968 expected_attributes.insert(
4969 "array_key".to_string(),
4970 Value::Array(vec![json!(1), json!(2), json!(3)]),
4971 );
4972 let mut object_value: HashMap<String, Value> = HashMap::new();
4973 object_value.insert("key".to_string(), Value::String("value".to_string()));
4974 expected_attributes.insert("object_key".to_string(), json!(object_value));
4975 expected_attributes
4976 }
4977
4978 #[test]
4979 fn avro_3609_custom_attributes_record_field_without_attributes() -> TestResult {
4980 let schema_str = String::from(
4981 r#"
4982 {
4983 "type": "record",
4984 "name": "Rec",
4985 "doc": "A Record schema without custom attributes",
4986 "fields": [
4987 {
4988 "name": "field_one",
4989 "type": "float",
4990 {{{}}}
4991 }
4992 ]
4993 }
4994 "#,
4995 );
4996
4997 let schema = Schema::parse_str(schema_str.replace("{{{}}}", CUSTOM_ATTRS_SUFFIX).as_str())?;
4998
4999 match schema {
5000 Schema::Record(RecordSchema { name, fields, .. }) => {
5001 assert_eq!(name, Name::new("Rec")?);
5002 assert_eq!(fields.len(), 1);
5003 let field = &fields[0];
5004 assert_eq!(&field.name, "field_one");
5005 assert_eq!(field.custom_attributes, expected_custom_attributes());
5006 }
5007 _ => panic!("Expected Schema::Record"),
5008 }
5009
5010 Ok(())
5011 }
5012
5013 #[test]
5014 fn avro_3625_null_is_first() -> TestResult {
5015 let schema_str = String::from(
5016 r#"
5017 {
5018 "type": "record",
5019 "name": "union_schema_test",
5020 "fields": [
5021 {"name": "a", "type": ["null", "long"], "default": null}
5022 ]
5023 }
5024 "#,
5025 );
5026
5027 let schema = Schema::parse_str(&schema_str)?;
5028
5029 match schema {
5030 Schema::Record(RecordSchema { name, fields, .. }) => {
5031 assert_eq!(name, Name::new("union_schema_test")?);
5032 assert_eq!(fields.len(), 1);
5033 let field = &fields[0];
5034 assert_eq!(&field.name, "a");
5035 assert_eq!(&field.default, &Some(Value::Null));
5036 match &field.schema {
5037 Schema::Union(union) => {
5038 assert_eq!(union.variants().len(), 2);
5039 assert!(union.is_nullable());
5040 assert_eq!(union.variants()[0], Schema::Null);
5041 assert_eq!(union.variants()[1], Schema::Long);
5042 }
5043 _ => panic!("Expected Schema::Union"),
5044 }
5045 }
5046 _ => panic!("Expected Schema::Record"),
5047 }
5048
5049 Ok(())
5050 }
5051
5052 #[test]
5053 fn avro_3625_null_is_last() -> TestResult {
5054 let schema_str = String::from(
5055 r#"
5056 {
5057 "type": "record",
5058 "name": "union_schema_test",
5059 "fields": [
5060 {"name": "a", "type": ["long","null"], "default": 123}
5061 ]
5062 }
5063 "#,
5064 );
5065
5066 let schema = Schema::parse_str(&schema_str)?;
5067
5068 match schema {
5069 Schema::Record(RecordSchema { name, fields, .. }) => {
5070 assert_eq!(name, Name::new("union_schema_test")?);
5071 assert_eq!(fields.len(), 1);
5072 let field = &fields[0];
5073 assert_eq!(&field.name, "a");
5074 assert_eq!(&field.default, &Some(json!(123)));
5075 match &field.schema {
5076 Schema::Union(union) => {
5077 assert_eq!(union.variants().len(), 2);
5078 assert_eq!(union.variants()[0], Schema::Long);
5079 assert_eq!(union.variants()[1], Schema::Null);
5080 }
5081 _ => panic!("Expected Schema::Union"),
5082 }
5083 }
5084 _ => panic!("Expected Schema::Record"),
5085 }
5086
5087 Ok(())
5088 }
5089
5090 #[test]
5091 fn avro_3625_null_is_the_middle() -> TestResult {
5092 let schema_str = String::from(
5093 r#"
5094 {
5095 "type": "record",
5096 "name": "union_schema_test",
5097 "fields": [
5098 {"name": "a", "type": ["long","null","int"], "default": 123}
5099 ]
5100 }
5101 "#,
5102 );
5103
5104 let schema = Schema::parse_str(&schema_str)?;
5105
5106 match schema {
5107 Schema::Record(RecordSchema { name, fields, .. }) => {
5108 assert_eq!(name, Name::new("union_schema_test")?);
5109 assert_eq!(fields.len(), 1);
5110 let field = &fields[0];
5111 assert_eq!(&field.name, "a");
5112 assert_eq!(&field.default, &Some(json!(123)));
5113 match &field.schema {
5114 Schema::Union(union) => {
5115 assert_eq!(union.variants().len(), 3);
5116 assert_eq!(union.variants()[0], Schema::Long);
5117 assert_eq!(union.variants()[1], Schema::Null);
5118 assert_eq!(union.variants()[2], Schema::Int);
5119 }
5120 _ => panic!("Expected Schema::Union"),
5121 }
5122 }
5123 _ => panic!("Expected Schema::Record"),
5124 }
5125
5126 Ok(())
5127 }
5128
5129 #[test]
5130 fn avro_3649_default_notintfirst() -> TestResult {
5131 let schema_str = String::from(
5132 r#"
5133 {
5134 "type": "record",
5135 "name": "union_schema_test",
5136 "fields": [
5137 {"name": "a", "type": ["string", "int"], "default": 123}
5138 ]
5139 }
5140 "#,
5141 );
5142
5143 let schema = Schema::parse_str(&schema_str)?;
5144
5145 match schema {
5146 Schema::Record(RecordSchema { name, fields, .. }) => {
5147 assert_eq!(name, Name::new("union_schema_test")?);
5148 assert_eq!(fields.len(), 1);
5149 let field = &fields[0];
5150 assert_eq!(&field.name, "a");
5151 assert_eq!(&field.default, &Some(json!(123)));
5152 match &field.schema {
5153 Schema::Union(union) => {
5154 assert_eq!(union.variants().len(), 2);
5155 assert_eq!(union.variants()[0], Schema::String);
5156 assert_eq!(union.variants()[1], Schema::Int);
5157 }
5158 _ => panic!("Expected Schema::Union"),
5159 }
5160 }
5161 _ => panic!("Expected Schema::Record"),
5162 }
5163
5164 Ok(())
5165 }
5166
5167 #[test]
5168 fn avro_3709_parsing_of_record_field_aliases() -> TestResult {
5169 let schema = r#"
5170 {
5171 "name": "rec",
5172 "type": "record",
5173 "fields": [
5174 {
5175 "name": "num",
5176 "type": "int",
5177 "aliases": ["num1", "num2"]
5178 }
5179 ]
5180 }
5181 "#;
5182
5183 let schema = Schema::parse_str(schema)?;
5184 if let Schema::Record(RecordSchema { fields, .. }) = schema {
5185 let num_field = &fields[0];
5186 assert_eq!(num_field.name, "num");
5187 assert_eq!(num_field.aliases, Some(vec!("num1".into(), "num2".into())));
5188 } else {
5189 panic!("Expected a record schema!");
5190 }
5191
5192 Ok(())
5193 }
5194
5195 #[test]
5196 fn avro_3735_parse_enum_namespace() -> TestResult {
5197 let schema = r#"
5198 {
5199 "type": "record",
5200 "name": "Foo",
5201 "namespace": "name.space",
5202 "fields":
5203 [
5204 {
5205 "name": "barInit",
5206 "type":
5207 {
5208 "type": "enum",
5209 "name": "Bar",
5210 "symbols":
5211 [
5212 "bar0",
5213 "bar1"
5214 ]
5215 }
5216 },
5217 {
5218 "name": "barUse",
5219 "type": "Bar"
5220 }
5221 ]
5222 }
5223 "#;
5224
5225 #[derive(
5226 Debug, PartialEq, Eq, Hash, PartialOrd, Ord, Clone, serde::Deserialize, serde::Serialize,
5227 )]
5228 pub enum Bar {
5229 #[serde(rename = "bar0")]
5230 Bar0,
5231 #[serde(rename = "bar1")]
5232 Bar1,
5233 }
5234
5235 #[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize)]
5236 pub struct Foo {
5237 #[serde(rename = "barInit")]
5238 pub bar_init: Bar,
5239 #[serde(rename = "barUse")]
5240 pub bar_use: Bar,
5241 }
5242
5243 let schema = Schema::parse_str(schema)?;
5244
5245 let foo = Foo {
5246 bar_init: Bar::Bar0,
5247 bar_use: Bar::Bar1,
5248 };
5249
5250 let avro_value = crate::to_value(foo)?;
5251 assert!(avro_value.validate(&schema));
5252
5253 let mut writer = crate::Writer::new(&schema, Vec::new());
5254
5255 writer.append(avro_value)?;
5257
5258 Ok(())
5259 }
5260
5261 #[test]
5262 fn avro_3755_deserialize() -> TestResult {
5263 #[derive(
5264 Debug, PartialEq, Eq, Hash, PartialOrd, Ord, Clone, serde::Deserialize, serde::Serialize,
5265 )]
5266 pub enum Bar {
5267 #[serde(rename = "bar0")]
5268 Bar0,
5269 #[serde(rename = "bar1")]
5270 Bar1,
5271 #[serde(rename = "bar2")]
5272 Bar2,
5273 }
5274
5275 #[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize)]
5276 pub struct Foo {
5277 #[serde(rename = "barInit")]
5278 pub bar_init: Bar,
5279 #[serde(rename = "barUse")]
5280 pub bar_use: Bar,
5281 }
5282
5283 let writer_schema = r#"{
5284 "type": "record",
5285 "name": "Foo",
5286 "fields":
5287 [
5288 {
5289 "name": "barInit",
5290 "type":
5291 {
5292 "type": "enum",
5293 "name": "Bar",
5294 "symbols":
5295 [
5296 "bar0",
5297 "bar1"
5298 ]
5299 }
5300 },
5301 {
5302 "name": "barUse",
5303 "type": "Bar"
5304 }
5305 ]
5306 }"#;
5307
5308 let reader_schema = r#"{
5309 "type": "record",
5310 "name": "Foo",
5311 "namespace": "name.space",
5312 "fields":
5313 [
5314 {
5315 "name": "barInit",
5316 "type":
5317 {
5318 "type": "enum",
5319 "name": "Bar",
5320 "symbols":
5321 [
5322 "bar0",
5323 "bar1",
5324 "bar2"
5325 ]
5326 }
5327 },
5328 {
5329 "name": "barUse",
5330 "type": "Bar"
5331 }
5332 ]
5333 }"#;
5334
5335 let writer_schema = Schema::parse_str(writer_schema)?;
5336 let foo = Foo {
5337 bar_init: Bar::Bar0,
5338 bar_use: Bar::Bar1,
5339 };
5340 let avro_value = crate::to_value(foo)?;
5341 assert!(
5342 avro_value.validate(&writer_schema),
5343 "value is valid for schema",
5344 );
5345 let datum = crate::to_avro_datum(&writer_schema, avro_value)?;
5346 let mut x = &datum[..];
5347 let reader_schema = Schema::parse_str(reader_schema)?;
5348 let deser_value = crate::from_avro_datum(&writer_schema, &mut x, Some(&reader_schema))?;
5349 match deser_value {
5350 types::Value::Record(fields) => {
5351 assert_eq!(fields.len(), 2);
5352 assert_eq!(fields[0].0, "barInit");
5353 assert_eq!(fields[0].1, types::Value::Enum(0, "bar0".to_string()));
5354 assert_eq!(fields[1].0, "barUse");
5355 assert_eq!(fields[1].1, types::Value::Enum(1, "bar1".to_string()));
5356 }
5357 _ => panic!("Expected Value::Record"),
5358 }
5359
5360 Ok(())
5361 }
5362
5363 #[test]
5364 fn test_avro_3780_decimal_schema_type_with_fixed() -> TestResult {
5365 let schema = json!(
5366 {
5367 "type": "record",
5368 "name": "recordWithDecimal",
5369 "fields": [
5370 {
5371 "name": "decimal",
5372 "type": "fixed",
5373 "name": "nestedFixed",
5374 "size": 8,
5375 "logicalType": "decimal",
5376 "precision": 4
5377 }
5378 ]
5379 });
5380
5381 let parse_result = Schema::parse(&schema);
5382 assert!(
5383 parse_result.is_ok(),
5384 "parse result must be ok, got: {parse_result:?}"
5385 );
5386
5387 Ok(())
5388 }
5389
5390 #[test]
5391 fn test_avro_3772_enum_default_wrong_type() -> TestResult {
5392 let schema = r#"
5393 {
5394 "type": "record",
5395 "name": "test",
5396 "fields": [
5397 {"name": "a", "type": "long", "default": 42},
5398 {"name": "b", "type": "string"},
5399 {
5400 "name": "c",
5401 "type": {
5402 "type": "enum",
5403 "name": "suit",
5404 "symbols": ["diamonds", "spades", "clubs", "hearts"],
5405 "default": 123
5406 }
5407 }
5408 ]
5409 }
5410 "#;
5411
5412 match Schema::parse_str(schema) {
5413 Err(err) => {
5414 assert_eq!(
5415 err.to_string(),
5416 "Default value for enum must be a string! Got: 123"
5417 );
5418 }
5419 _ => panic!("Expected an error"),
5420 }
5421 Ok(())
5422 }
5423
5424 #[test]
5425 fn test_avro_3812_handle_null_namespace_properly() -> TestResult {
5426 let schema_str = r#"
5427 {
5428 "namespace": "",
5429 "type": "record",
5430 "name": "my_schema",
5431 "fields": [
5432 {
5433 "name": "a",
5434 "type": {
5435 "type": "enum",
5436 "name": "my_enum",
5437 "namespace": "",
5438 "symbols": ["a", "b"]
5439 }
5440 }, {
5441 "name": "b",
5442 "type": {
5443 "type": "fixed",
5444 "name": "my_fixed",
5445 "namespace": "",
5446 "size": 10
5447 }
5448 }
5449 ]
5450 }
5451 "#;
5452
5453 let expected = r#"{"name":"my_schema","type":"record","fields":[{"name":"a","type":{"name":"my_enum","type":"enum","symbols":["a","b"]}},{"name":"b","type":{"name":"my_fixed","type":"fixed","size":10}}]}"#;
5454 let schema = Schema::parse_str(schema_str)?;
5455 let canonical_form = schema.canonical_form();
5456 assert_eq!(canonical_form, expected);
5457
5458 let name = Name::new("my_name")?;
5459 let fullname = name.fullname(Some("".to_string()));
5460 assert_eq!(fullname, "my_name");
5461 let qname = name.fully_qualified_name(&Some("".to_string())).to_string();
5462 assert_eq!(qname, "my_name");
5463
5464 Ok(())
5465 }
5466
5467 #[test]
5468 fn test_avro_3818_inherit_enclosing_namespace() -> TestResult {
5469 let schema_str = r#"
5471 {
5472 "namespace": "my_ns",
5473 "type": "record",
5474 "name": "my_schema",
5475 "fields": [
5476 {
5477 "name": "f1",
5478 "type": {
5479 "name": "enum1",
5480 "type": "enum",
5481 "symbols": ["a"]
5482 }
5483 }, {
5484 "name": "f2",
5485 "type": {
5486 "name": "fixed1",
5487 "type": "fixed",
5488 "size": 1
5489 }
5490 }
5491 ]
5492 }
5493 "#;
5494
5495 let expected = r#"{"name":"my_ns.my_schema","type":"record","fields":[{"name":"f1","type":{"name":"my_ns.enum1","type":"enum","symbols":["a"]}},{"name":"f2","type":{"name":"my_ns.fixed1","type":"fixed","size":1}}]}"#;
5496 let schema = Schema::parse_str(schema_str)?;
5497 let canonical_form = schema.canonical_form();
5498 assert_eq!(canonical_form, expected);
5499
5500 let schema_str = r#"
5503 {
5504 "namespace": "my_ns",
5505 "type": "record",
5506 "name": "my_schema",
5507 "fields": [
5508 {
5509 "name": "f1",
5510 "type": {
5511 "name": "enum1",
5512 "type": "enum",
5513 "namespace": "",
5514 "symbols": ["a"]
5515 }
5516 }, {
5517 "name": "f2",
5518 "type": {
5519 "name": "fixed1",
5520 "type": "fixed",
5521 "namespace": "",
5522 "size": 1
5523 }
5524 }
5525 ]
5526 }
5527 "#;
5528
5529 let expected = r#"{"name":"my_ns.my_schema","type":"record","fields":[{"name":"f1","type":{"name":"enum1","type":"enum","symbols":["a"]}},{"name":"f2","type":{"name":"fixed1","type":"fixed","size":1}}]}"#;
5530 let schema = Schema::parse_str(schema_str)?;
5531 let canonical_form = schema.canonical_form();
5532 assert_eq!(canonical_form, expected);
5533
5534 let schema_str = r#"
5536 {
5537 "namespace": "",
5538 "type": "record",
5539 "name": "my_schema",
5540 "fields": [
5541 {
5542 "name": "f1",
5543 "type": {
5544 "name": "enum1",
5545 "type": "enum",
5546 "namespace": "f1.ns",
5547 "symbols": ["a"]
5548 }
5549 }, {
5550 "name": "f2",
5551 "type": {
5552 "name": "f2.ns.fixed1",
5553 "type": "fixed",
5554 "size": 1
5555 }
5556 }
5557 ]
5558 }
5559 "#;
5560
5561 let expected = r#"{"name":"my_schema","type":"record","fields":[{"name":"f1","type":{"name":"f1.ns.enum1","type":"enum","symbols":["a"]}},{"name":"f2","type":{"name":"f2.ns.fixed1","type":"fixed","size":1}}]}"#;
5562 let schema = Schema::parse_str(schema_str)?;
5563 let canonical_form = schema.canonical_form();
5564 assert_eq!(canonical_form, expected);
5565
5566 let schema_str = r#"
5568 {
5569 "type": "record",
5570 "name": "my_ns.my_schema",
5571 "fields": [
5572 {
5573 "name": "f1",
5574 "type": {
5575 "name": "inner_record1",
5576 "type": "record",
5577 "fields": [
5578 {
5579 "name": "f1_1",
5580 "type": {
5581 "name": "enum1",
5582 "type": "enum",
5583 "symbols": ["a"]
5584 }
5585 }
5586 ]
5587 }
5588 }, {
5589 "name": "f2",
5590 "type": {
5591 "name": "inner_record2",
5592 "type": "record",
5593 "namespace": "inner_ns",
5594 "fields": [
5595 {
5596 "name": "f2_1",
5597 "type": {
5598 "name": "enum2",
5599 "type": "enum",
5600 "symbols": ["a"]
5601 }
5602 }
5603 ]
5604 }
5605 }
5606 ]
5607 }
5608 "#;
5609
5610 let expected = r#"{"name":"my_ns.my_schema","type":"record","fields":[{"name":"f1","type":{"name":"my_ns.inner_record1","type":"record","fields":[{"name":"f1_1","type":{"name":"my_ns.enum1","type":"enum","symbols":["a"]}}]}},{"name":"f2","type":{"name":"inner_ns.inner_record2","type":"record","fields":[{"name":"f2_1","type":{"name":"inner_ns.enum2","type":"enum","symbols":["a"]}}]}}]}"#;
5611 let schema = Schema::parse_str(schema_str)?;
5612 let canonical_form = schema.canonical_form();
5613 assert_eq!(canonical_form, expected);
5614
5615 Ok(())
5616 }
5617
5618 #[test]
5619 fn test_avro_3779_bigdecimal_schema() -> TestResult {
5620 let schema = json!(
5621 {
5622 "name": "decimal",
5623 "type": "bytes",
5624 "logicalType": "big-decimal"
5625 }
5626 );
5627
5628 let parse_result = Schema::parse(&schema);
5629 assert!(
5630 parse_result.is_ok(),
5631 "parse result must be ok, got: {parse_result:?}"
5632 );
5633 match parse_result? {
5634 Schema::BigDecimal => (),
5635 other => panic!("Expected Schema::BigDecimal but got: {other:?}"),
5636 }
5637
5638 Ok(())
5639 }
5640
5641 #[test]
5642 fn test_avro_3820_deny_invalid_field_names() -> TestResult {
5643 let schema_str = r#"
5644 {
5645 "name": "my_record",
5646 "type": "record",
5647 "fields": [
5648 {
5649 "name": "f1.x",
5650 "type": {
5651 "name": "my_enum",
5652 "type": "enum",
5653 "symbols": ["a"]
5654 }
5655 }, {
5656 "name": "f2",
5657 "type": {
5658 "name": "my_fixed",
5659 "type": "fixed",
5660 "size": 1
5661 }
5662 }
5663 ]
5664 }
5665 "#;
5666
5667 match Schema::parse_str(schema_str) {
5668 Err(Error::FieldName(x)) if x == "f1.x" => Ok(()),
5669 other => Err(format!("Expected Error::FieldName, got {other:?}").into()),
5670 }
5671 }
5672
5673 #[test]
5674 fn test_avro_3827_disallow_duplicate_field_names() -> TestResult {
5675 let schema_str = r#"
5676 {
5677 "name": "my_schema",
5678 "type": "record",
5679 "fields": [
5680 {
5681 "name": "f1",
5682 "type": {
5683 "name": "a",
5684 "type": "record",
5685 "fields": []
5686 }
5687 }, {
5688 "name": "f1",
5689 "type": {
5690 "name": "b",
5691 "type": "record",
5692 "fields": []
5693 }
5694 }
5695 ]
5696 }
5697 "#;
5698
5699 match Schema::parse_str(schema_str) {
5700 Err(Error::FieldNameDuplicate(_)) => (),
5701 other => {
5702 return Err(format!("Expected Error::FieldNameDuplicate, got {other:?}").into());
5703 }
5704 };
5705
5706 let schema_str = r#"
5707 {
5708 "name": "my_schema",
5709 "type": "record",
5710 "fields": [
5711 {
5712 "name": "f1",
5713 "type": {
5714 "name": "a",
5715 "type": "record",
5716 "fields": [
5717 {
5718 "name": "f1",
5719 "type": {
5720 "name": "b",
5721 "type": "record",
5722 "fields": []
5723 }
5724 }
5725 ]
5726 }
5727 }
5728 ]
5729 }
5730 "#;
5731
5732 let expected = r#"{"name":"my_schema","type":"record","fields":[{"name":"f1","type":{"name":"a","type":"record","fields":[{"name":"f1","type":{"name":"b","type":"record","fields":[]}}]}}]}"#;
5733 let schema = Schema::parse_str(schema_str)?;
5734 let canonical_form = schema.canonical_form();
5735 assert_eq!(canonical_form, expected);
5736
5737 Ok(())
5738 }
5739
5740 #[test]
5741 fn test_avro_3830_null_namespace_in_fully_qualified_names() -> TestResult {
5742 let schema_str = r#"
5745 {
5746 "name": ".record1",
5747 "namespace": "ns1",
5748 "type": "record",
5749 "fields": [
5750 {
5751 "name": "f1",
5752 "type": {
5753 "name": ".enum1",
5754 "namespace": "ns2",
5755 "type": "enum",
5756 "symbols": ["a"]
5757 }
5758 }, {
5759 "name": "f2",
5760 "type": {
5761 "name": ".fxed1",
5762 "namespace": "ns3",
5763 "type": "fixed",
5764 "size": 1
5765 }
5766 }
5767 ]
5768 }
5769 "#;
5770
5771 let expected = r#"{"name":"record1","type":"record","fields":[{"name":"f1","type":{"name":"enum1","type":"enum","symbols":["a"]}},{"name":"f2","type":{"name":"fxed1","type":"fixed","size":1}}]}"#;
5772 let schema = Schema::parse_str(schema_str)?;
5773 let canonical_form = schema.canonical_form();
5774 assert_eq!(canonical_form, expected);
5775
5776 let schema_str = r#"
5778 {
5779 "name": ".record1",
5780 "namespace": "ns1",
5781 "type": "record",
5782 "fields": [
5783 {
5784 "name": "f1",
5785 "type": {
5786 "name": "enum1",
5787 "type": "enum",
5788 "symbols": ["a"]
5789 }
5790 }, {
5791 "name": "f2",
5792 "type": {
5793 "name": "fxed1",
5794 "type": "fixed",
5795 "size": 1
5796 }
5797 }
5798 ]
5799 }
5800 "#;
5801
5802 let expected = r#"{"name":"record1","type":"record","fields":[{"name":"f1","type":{"name":"enum1","type":"enum","symbols":["a"]}},{"name":"f2","type":{"name":"fxed1","type":"fixed","size":1}}]}"#;
5803 let schema = Schema::parse_str(schema_str)?;
5804 let canonical_form = schema.canonical_form();
5805 assert_eq!(canonical_form, expected);
5806
5807 let name = Name::new(".my_name")?;
5808 let fullname = name.fullname(None);
5809 assert_eq!(fullname, "my_name");
5810 let qname = name.fully_qualified_name(&None).to_string();
5811 assert_eq!(qname, "my_name");
5812
5813 Ok(())
5814 }
5815
5816 #[test]
5817 fn test_avro_3814_schema_resolution_failure() -> TestResult {
5818 let reader_schema = json!(
5820 {
5821 "type": "record",
5822 "name": "MyOuterRecord",
5823 "fields": [
5824 {
5825 "name": "inner_record",
5826 "type": [
5827 "null",
5828 {
5829 "type": "record",
5830 "name": "MyRecord",
5831 "fields": [
5832 {"name": "a", "type": "string"}
5833 ]
5834 }
5835 ],
5836 "default": null
5837 }
5838 ]
5839 }
5840 );
5841
5842 let writer_schema = json!(
5845 {
5846 "type": "record",
5847 "name": "MyOuterRecord",
5848 "fields": [
5849 {
5850 "name": "inner_record",
5851 "type": [
5852 "null",
5853 {
5854 "type": "record",
5855 "name": "MyRecord",
5856 "fields": [
5857 {"name": "a", "type": "string"},
5858 {
5859 "name": "b",
5860 "type": [
5861 "null",
5862 {
5863 "type": "enum",
5864 "name": "MyEnum",
5865 "symbols": ["A", "B", "C"],
5866 "default": "C"
5867 }
5868 ],
5869 "default": null
5870 },
5871 ]
5872 }
5873 ]
5874 }
5875 ],
5876 "default": null
5877 }
5878 );
5879
5880 #[derive(Serialize, Deserialize, Debug)]
5883 struct MyInnerRecordReader {
5884 a: String,
5885 }
5886
5887 #[derive(Serialize, Deserialize, Debug)]
5888 struct MyRecordReader {
5889 inner_record: Option<MyInnerRecordReader>,
5890 }
5891
5892 #[derive(Serialize, Deserialize, Debug)]
5893 enum MyEnum {
5894 A,
5895 B,
5896 C,
5897 }
5898
5899 #[derive(Serialize, Deserialize, Debug)]
5900 struct MyInnerRecordWriter {
5901 a: String,
5902 b: Option<MyEnum>,
5903 }
5904
5905 #[derive(Serialize, Deserialize, Debug)]
5906 struct MyRecordWriter {
5907 inner_record: Option<MyInnerRecordWriter>,
5908 }
5909
5910 let s = MyRecordWriter {
5911 inner_record: Some(MyInnerRecordWriter {
5912 a: "foo".to_string(),
5913 b: None,
5914 }),
5915 };
5916
5917 let writer_schema = Schema::parse(&writer_schema)?;
5919 let avro_value = crate::to_value(s)?;
5920 assert!(
5921 avro_value.validate(&writer_schema),
5922 "value is valid for schema",
5923 );
5924 let datum = crate::to_avro_datum(&writer_schema, avro_value)?;
5925
5926 let reader_schema = Schema::parse(&reader_schema)?;
5928 let mut x = &datum[..];
5929
5930 let deser_value = crate::from_avro_datum(&writer_schema, &mut x, Some(&reader_schema))?;
5932 assert!(deser_value.validate(&reader_schema));
5933
5934 let d: MyRecordReader = crate::from_value(&deser_value)?;
5936 assert_eq!(d.inner_record.unwrap().a, "foo".to_string());
5937 Ok(())
5938 }
5939
5940 #[test]
5941 fn test_avro_3837_disallow_invalid_namespace() -> TestResult {
5942 let schema_str = r#"
5944 {
5945 "name": "record1",
5946 "namespace": "ns1",
5947 "type": "record",
5948 "fields": []
5949 }
5950 "#;
5951
5952 let expected = r#"{"name":"ns1.record1","type":"record","fields":[]}"#;
5953 let schema = Schema::parse_str(schema_str)?;
5954 let canonical_form = schema.canonical_form();
5955 assert_eq!(canonical_form, expected);
5956
5957 let schema_str = r#"
5959 {
5960 "name": "enum1",
5961 "namespace": "ns1.foo.bar",
5962 "type": "enum",
5963 "symbols": ["a"]
5964 }
5965 "#;
5966
5967 let expected = r#"{"name":"ns1.foo.bar.enum1","type":"enum","symbols":["a"]}"#;
5968 let schema = Schema::parse_str(schema_str)?;
5969 let canonical_form = schema.canonical_form();
5970 assert_eq!(canonical_form, expected);
5971
5972 let schema_str = r#"
5974 {
5975 "name": "fixed1",
5976 "namespace": ".ns1.a.b",
5977 "type": "fixed",
5978 "size": 1
5979 }
5980 "#;
5981
5982 match Schema::parse_str(schema_str) {
5983 Err(Error::InvalidNamespace(_, _)) => (),
5984 other => return Err(format!("Expected Error::InvalidNamespace, got {other:?}").into()),
5985 };
5986
5987 let schema_str = r#"
5989 {
5990 "name": "record1",
5991 "namespace": "ns1.a*b.c",
5992 "type": "record",
5993 "fields": []
5994 }
5995 "#;
5996
5997 match Schema::parse_str(schema_str) {
5998 Err(Error::InvalidNamespace(_, _)) => (),
5999 other => return Err(format!("Expected Error::InvalidNamespace, got {other:?}").into()),
6000 };
6001
6002 let schema_str = r#"
6004 {
6005 "name": "fixed1",
6006 "namespace": "ns1.1a.b",
6007 "type": "fixed",
6008 "size": 1
6009 }
6010 "#;
6011
6012 match Schema::parse_str(schema_str) {
6013 Err(Error::InvalidNamespace(_, _)) => (),
6014 other => return Err(format!("Expected Error::InvalidNamespace, got {other:?}").into()),
6015 };
6016
6017 let schema_str = r#"
6019 {
6020 "name": "fixed1",
6021 "namespace": "ns1..a",
6022 "type": "fixed",
6023 "size": 1
6024 }
6025 "#;
6026
6027 match Schema::parse_str(schema_str) {
6028 Err(Error::InvalidNamespace(_, _)) => (),
6029 other => return Err(format!("Expected Error::InvalidNamespace, got {other:?}").into()),
6030 };
6031
6032 let schema_str = r#"
6034 {
6035 "name": "fixed1",
6036 "namespace": "ns1.a.",
6037 "type": "fixed",
6038 "size": 1
6039 }
6040 "#;
6041
6042 match Schema::parse_str(schema_str) {
6043 Err(Error::InvalidNamespace(_, _)) => (),
6044 other => return Err(format!("Expected Error::InvalidNamespace, got {other:?}").into()),
6045 };
6046
6047 Ok(())
6048 }
6049
6050 #[test]
6051 fn test_avro_3851_validate_default_value_of_simple_record_field() -> TestResult {
6052 let schema_str = r#"
6053 {
6054 "name": "record1",
6055 "namespace": "ns",
6056 "type": "record",
6057 "fields": [
6058 {
6059 "name": "f1",
6060 "type": "int",
6061 "default": "invalid"
6062 }
6063 ]
6064 }
6065 "#;
6066 let expected = Error::GetDefaultRecordField(
6067 "f1".to_string(),
6068 "ns.record1".to_string(),
6069 r#""int""#.to_string(),
6070 )
6071 .to_string();
6072 let result = Schema::parse_str(schema_str);
6073 assert!(result.is_err());
6074 let err = result
6075 .map_err(|e| e.to_string())
6076 .err()
6077 .unwrap_or_else(|| "unexpected".to_string());
6078 assert_eq!(expected, err);
6079
6080 Ok(())
6081 }
6082
6083 #[test]
6084 fn test_avro_3851_validate_default_value_of_nested_record_field() -> TestResult {
6085 let schema_str = r#"
6086 {
6087 "name": "record1",
6088 "namespace": "ns",
6089 "type": "record",
6090 "fields": [
6091 {
6092 "name": "f1",
6093 "type": {
6094 "name": "record2",
6095 "type": "record",
6096 "fields": [
6097 {
6098 "name": "f1_1",
6099 "type": "int"
6100 }
6101 ]
6102 },
6103 "default": "invalid"
6104 }
6105 ]
6106 }
6107 "#;
6108 let expected = Error::GetDefaultRecordField(
6109 "f1".to_string(),
6110 "ns.record1".to_string(),
6111 r#"{"name":"ns.record2","type":"record","fields":[{"name":"f1_1","type":"int"}]}"#
6112 .to_string(),
6113 )
6114 .to_string();
6115 let result = Schema::parse_str(schema_str);
6116 assert!(result.is_err());
6117 let err = result
6118 .map_err(|e| e.to_string())
6119 .err()
6120 .unwrap_or_else(|| "unexpected".to_string());
6121 assert_eq!(expected, err);
6122
6123 Ok(())
6124 }
6125
6126 #[test]
6127 fn test_avro_3851_validate_default_value_of_enum_record_field() -> TestResult {
6128 let schema_str = r#"
6129 {
6130 "name": "record1",
6131 "namespace": "ns",
6132 "type": "record",
6133 "fields": [
6134 {
6135 "name": "f1",
6136 "type": {
6137 "name": "enum1",
6138 "type": "enum",
6139 "symbols": ["a", "b", "c"]
6140 },
6141 "default": "invalid"
6142 }
6143 ]
6144 }
6145 "#;
6146 let expected = Error::GetDefaultRecordField(
6147 "f1".to_string(),
6148 "ns.record1".to_string(),
6149 r#"{"name":"ns.enum1","type":"enum","symbols":["a","b","c"]}"#.to_string(),
6150 )
6151 .to_string();
6152 let result = Schema::parse_str(schema_str);
6153 assert!(result.is_err());
6154 let err = result
6155 .map_err(|e| e.to_string())
6156 .err()
6157 .unwrap_or_else(|| "unexpected".to_string());
6158 assert_eq!(expected, err);
6159
6160 Ok(())
6161 }
6162
6163 #[test]
6164 fn test_avro_3851_validate_default_value_of_fixed_record_field() -> TestResult {
6165 let schema_str = r#"
6166 {
6167 "name": "record1",
6168 "namespace": "ns",
6169 "type": "record",
6170 "fields": [
6171 {
6172 "name": "f1",
6173 "type": {
6174 "name": "fixed1",
6175 "type": "fixed",
6176 "size": 3
6177 },
6178 "default": 100
6179 }
6180 ]
6181 }
6182 "#;
6183 let expected = Error::GetDefaultRecordField(
6184 "f1".to_string(),
6185 "ns.record1".to_string(),
6186 r#"{"name":"ns.fixed1","type":"fixed","size":3}"#.to_string(),
6187 )
6188 .to_string();
6189 let result = Schema::parse_str(schema_str);
6190 assert!(result.is_err());
6191 let err = result
6192 .map_err(|e| e.to_string())
6193 .err()
6194 .unwrap_or_else(|| "unexpected".to_string());
6195 assert_eq!(expected, err);
6196
6197 Ok(())
6198 }
6199
6200 #[test]
6201 fn test_avro_3851_validate_default_value_of_array_record_field() -> TestResult {
6202 let schema_str = r#"
6203 {
6204 "name": "record1",
6205 "namespace": "ns",
6206 "type": "record",
6207 "fields": [
6208 {
6209 "name": "f1",
6210 "type": "array",
6211 "items": "int",
6212 "default": "invalid"
6213 }
6214 ]
6215 }
6216 "#;
6217 let expected = Error::GetDefaultRecordField(
6218 "f1".to_string(),
6219 "ns.record1".to_string(),
6220 r#"{"type":"array","items":"int"}"#.to_string(),
6221 )
6222 .to_string();
6223 let result = Schema::parse_str(schema_str);
6224 assert!(result.is_err());
6225 let err = result
6226 .map_err(|e| e.to_string())
6227 .err()
6228 .unwrap_or_else(|| "unexpected".to_string());
6229 assert_eq!(expected, err);
6230
6231 Ok(())
6232 }
6233
6234 #[test]
6235 fn test_avro_3851_validate_default_value_of_map_record_field() -> TestResult {
6236 let schema_str = r#"
6237 {
6238 "name": "record1",
6239 "namespace": "ns",
6240 "type": "record",
6241 "fields": [
6242 {
6243 "name": "f1",
6244 "type": "map",
6245 "values": "string",
6246 "default": "invalid"
6247 }
6248 ]
6249 }
6250 "#;
6251 let expected = Error::GetDefaultRecordField(
6252 "f1".to_string(),
6253 "ns.record1".to_string(),
6254 r#"{"type":"map","values":"string"}"#.to_string(),
6255 )
6256 .to_string();
6257 let result = Schema::parse_str(schema_str);
6258 assert!(result.is_err());
6259 let err = result
6260 .map_err(|e| e.to_string())
6261 .err()
6262 .unwrap_or_else(|| "unexpected".to_string());
6263 assert_eq!(expected, err);
6264
6265 Ok(())
6266 }
6267
6268 #[test]
6269 fn test_avro_3851_validate_default_value_of_ref_record_field() -> TestResult {
6270 let schema_str = r#"
6271 {
6272 "name": "record1",
6273 "namespace": "ns",
6274 "type": "record",
6275 "fields": [
6276 {
6277 "name": "f1",
6278 "type": {
6279 "name": "record2",
6280 "type": "record",
6281 "fields": [
6282 {
6283 "name": "f1_1",
6284 "type": "int"
6285 }
6286 ]
6287 }
6288 }, {
6289 "name": "f2",
6290 "type": "ns.record2",
6291 "default": { "f1_1": true }
6292 }
6293 ]
6294 }
6295 "#;
6296 let expected = Error::GetDefaultRecordField(
6297 "f2".to_string(),
6298 "ns.record1".to_string(),
6299 r#""ns.record2""#.to_string(),
6300 )
6301 .to_string();
6302 let result = Schema::parse_str(schema_str);
6303 assert!(result.is_err());
6304 let err = result
6305 .map_err(|e| e.to_string())
6306 .err()
6307 .unwrap_or_else(|| "unexpected".to_string());
6308 assert_eq!(expected, err);
6309
6310 Ok(())
6311 }
6312
6313 #[test]
6314 fn test_avro_3851_validate_default_value_of_enum() -> TestResult {
6315 let schema_str = r#"
6316 {
6317 "name": "enum1",
6318 "namespace": "ns",
6319 "type": "enum",
6320 "symbols": ["a", "b", "c"],
6321 "default": 100
6322 }
6323 "#;
6324 let expected = Error::EnumDefaultWrongType(100.into()).to_string();
6325 let result = Schema::parse_str(schema_str);
6326 assert!(result.is_err());
6327 let err = result
6328 .map_err(|e| e.to_string())
6329 .err()
6330 .unwrap_or_else(|| "unexpected".to_string());
6331 assert_eq!(expected, err);
6332
6333 let schema_str = r#"
6334 {
6335 "name": "enum1",
6336 "namespace": "ns",
6337 "type": "enum",
6338 "symbols": ["a", "b", "c"],
6339 "default": "d"
6340 }
6341 "#;
6342 let expected = Error::GetEnumDefault {
6343 symbol: "d".to_string(),
6344 symbols: vec!["a".to_string(), "b".to_string(), "c".to_string()],
6345 }
6346 .to_string();
6347 let result = Schema::parse_str(schema_str);
6348 assert!(result.is_err());
6349 let err = result
6350 .map_err(|e| e.to_string())
6351 .err()
6352 .unwrap_or_else(|| "unexpected".to_string());
6353 assert_eq!(expected, err);
6354
6355 Ok(())
6356 }
6357
6358 #[test]
6359 fn test_avro_3862_get_aliases() -> TestResult {
6360 let schema_str = r#"
6362 {
6363 "name": "record1",
6364 "namespace": "ns1",
6365 "type": "record",
6366 "aliases": ["r1", "ns2.r2"],
6367 "fields": [
6368 { "name": "f1", "type": "int" },
6369 { "name": "f2", "type": "string" }
6370 ]
6371 }
6372 "#;
6373 let schema = Schema::parse_str(schema_str)?;
6374 let expected = vec![Alias::new("ns1.r1")?, Alias::new("ns2.r2")?];
6375 match schema.aliases() {
6376 Some(aliases) => assert_eq!(aliases, &expected),
6377 None => panic!("Expected Some({expected:?}), got None"),
6378 }
6379
6380 let schema_str = r#"
6381 {
6382 "name": "record1",
6383 "namespace": "ns1",
6384 "type": "record",
6385 "fields": [
6386 { "name": "f1", "type": "int" },
6387 { "name": "f2", "type": "string" }
6388 ]
6389 }
6390 "#;
6391 let schema = Schema::parse_str(schema_str)?;
6392 match schema.aliases() {
6393 None => (),
6394 some => panic!("Expected None, got {some:?}"),
6395 }
6396
6397 let schema_str = r#"
6399 {
6400 "name": "enum1",
6401 "namespace": "ns1",
6402 "type": "enum",
6403 "aliases": ["en1", "ns2.en2"],
6404 "symbols": ["a", "b", "c"]
6405 }
6406 "#;
6407 let schema = Schema::parse_str(schema_str)?;
6408 let expected = vec![Alias::new("ns1.en1")?, Alias::new("ns2.en2")?];
6409 match schema.aliases() {
6410 Some(aliases) => assert_eq!(aliases, &expected),
6411 None => panic!("Expected Some({expected:?}), got None"),
6412 }
6413
6414 let schema_str = r#"
6415 {
6416 "name": "enum1",
6417 "namespace": "ns1",
6418 "type": "enum",
6419 "symbols": ["a", "b", "c"]
6420 }
6421 "#;
6422 let schema = Schema::parse_str(schema_str)?;
6423 match schema.aliases() {
6424 None => (),
6425 some => panic!("Expected None, got {some:?}"),
6426 }
6427
6428 let schema_str = r#"
6430 {
6431 "name": "fixed1",
6432 "namespace": "ns1",
6433 "type": "fixed",
6434 "aliases": ["fx1", "ns2.fx2"],
6435 "size": 10
6436 }
6437 "#;
6438 let schema = Schema::parse_str(schema_str)?;
6439 let expected = vec![Alias::new("ns1.fx1")?, Alias::new("ns2.fx2")?];
6440 match schema.aliases() {
6441 Some(aliases) => assert_eq!(aliases, &expected),
6442 None => panic!("Expected Some({expected:?}), got None"),
6443 }
6444
6445 let schema_str = r#"
6446 {
6447 "name": "fixed1",
6448 "namespace": "ns1",
6449 "type": "fixed",
6450 "size": 10
6451 }
6452 "#;
6453 let schema = Schema::parse_str(schema_str)?;
6454 match schema.aliases() {
6455 None => (),
6456 some => panic!("Expected None, got {some:?}"),
6457 }
6458
6459 let schema = Schema::Int;
6461 match schema.aliases() {
6462 None => (),
6463 some => panic!("Expected None, got {some:?}"),
6464 }
6465
6466 Ok(())
6467 }
6468
6469 #[test]
6470 fn test_avro_3862_get_doc() -> TestResult {
6471 let schema_str = r#"
6473 {
6474 "name": "record1",
6475 "type": "record",
6476 "doc": "Record Document",
6477 "fields": [
6478 { "name": "f1", "type": "int" },
6479 { "name": "f2", "type": "string" }
6480 ]
6481 }
6482 "#;
6483 let schema = Schema::parse_str(schema_str)?;
6484 let expected = "Record Document";
6485 match schema.doc() {
6486 Some(doc) => assert_eq!(doc, expected),
6487 None => panic!("Expected Some({expected:?}), got None"),
6488 }
6489
6490 let schema_str = r#"
6491 {
6492 "name": "record1",
6493 "type": "record",
6494 "fields": [
6495 { "name": "f1", "type": "int" },
6496 { "name": "f2", "type": "string" }
6497 ]
6498 }
6499 "#;
6500 let schema = Schema::parse_str(schema_str)?;
6501 match schema.doc() {
6502 None => (),
6503 some => panic!("Expected None, got {some:?}"),
6504 }
6505
6506 let schema_str = r#"
6508 {
6509 "name": "enum1",
6510 "type": "enum",
6511 "doc": "Enum Document",
6512 "symbols": ["a", "b", "c"]
6513 }
6514 "#;
6515 let schema = Schema::parse_str(schema_str)?;
6516 let expected = "Enum Document";
6517 match schema.doc() {
6518 Some(doc) => assert_eq!(doc, expected),
6519 None => panic!("Expected Some({expected:?}), got None"),
6520 }
6521
6522 let schema_str = r#"
6523 {
6524 "name": "enum1",
6525 "type": "enum",
6526 "symbols": ["a", "b", "c"]
6527 }
6528 "#;
6529 let schema = Schema::parse_str(schema_str)?;
6530 match schema.doc() {
6531 None => (),
6532 some => panic!("Expected None, got {some:?}"),
6533 }
6534
6535 let schema_str = r#"
6537 {
6538 "name": "fixed1",
6539 "type": "fixed",
6540 "doc": "Fixed Document",
6541 "size": 10
6542 }
6543 "#;
6544 let schema = Schema::parse_str(schema_str)?;
6545 let expected = "Fixed Document";
6546 match schema.doc() {
6547 Some(doc) => assert_eq!(doc, expected),
6548 None => panic!("Expected Some({expected:?}), got None"),
6549 }
6550
6551 let schema_str = r#"
6552 {
6553 "name": "fixed1",
6554 "type": "fixed",
6555 "size": 10
6556 }
6557 "#;
6558 let schema = Schema::parse_str(schema_str)?;
6559 match schema.doc() {
6560 None => (),
6561 some => panic!("Expected None, got {some:?}"),
6562 }
6563
6564 let schema = Schema::Int;
6566 match schema.doc() {
6567 None => (),
6568 some => panic!("Expected None, got {some:?}"),
6569 }
6570
6571 Ok(())
6572 }
6573
6574 #[test]
6575 fn avro_3886_serialize_attributes() -> TestResult {
6576 let attributes = BTreeMap::from([
6577 ("string_key".into(), "value".into()),
6578 ("number_key".into(), 1.23.into()),
6579 ("null_key".into(), Value::Null),
6580 (
6581 "array_key".into(),
6582 Value::Array(vec![1.into(), 2.into(), 3.into()]),
6583 ),
6584 ("object_key".into(), Value::Object(Map::default())),
6585 ]);
6586
6587 let schema = Schema::Enum(EnumSchema {
6589 name: Name::new("a")?,
6590 aliases: None,
6591 doc: None,
6592 symbols: vec![],
6593 default: None,
6594 attributes: attributes.clone(),
6595 });
6596 let serialized = serde_json::to_string(&schema)?;
6597 assert_eq!(
6598 r#"{"type":"enum","name":"a","symbols":[],"array_key":[1,2,3],"null_key":null,"number_key":1.23,"object_key":{},"string_key":"value"}"#,
6599 &serialized
6600 );
6601
6602 let schema = Schema::Fixed(FixedSchema {
6604 name: Name::new("a")?,
6605 aliases: None,
6606 doc: None,
6607 size: 1,
6608 default: None,
6609 attributes: attributes.clone(),
6610 });
6611 let serialized = serde_json::to_string(&schema)?;
6612 assert_eq!(
6613 r#"{"type":"fixed","name":"a","size":1,"array_key":[1,2,3],"null_key":null,"number_key":1.23,"object_key":{},"string_key":"value"}"#,
6614 &serialized
6615 );
6616
6617 let schema = Schema::Record(RecordSchema {
6619 name: Name::new("a")?,
6620 aliases: None,
6621 doc: None,
6622 fields: vec![],
6623 lookup: BTreeMap::new(),
6624 attributes,
6625 });
6626 let serialized = serde_json::to_string(&schema)?;
6627 assert_eq!(
6628 r#"{"type":"record","name":"a","fields":[],"array_key":[1,2,3],"null_key":null,"number_key":1.23,"object_key":{},"string_key":"value"}"#,
6629 &serialized
6630 );
6631
6632 Ok(())
6633 }
6634
6635 #[test]
6638 fn test_avro_3897_funny_valid_names_and_namespaces() -> TestResult {
6639 for funny_name in ["_", "_._", "__._", "_.__", "_._._"] {
6640 let name = Name::new(funny_name);
6641 assert!(name.is_ok());
6642 }
6643 Ok(())
6644 }
6645
6646 #[test]
6647 fn test_avro_3896_decimal_schema() -> TestResult {
6648 let schema = json!(
6650 {
6651 "type": "bytes",
6652 "name": "BytesDecimal",
6653 "logicalType": "decimal",
6654 "size": 38,
6655 "precision": 9,
6656 "scale": 2
6657 });
6658 let parse_result = Schema::parse(&schema)?;
6659 assert!(matches!(
6660 parse_result,
6661 Schema::Decimal(DecimalSchema {
6662 precision: 9,
6663 scale: 2,
6664 ..
6665 })
6666 ));
6667
6668 let schema = json!(
6670 {
6671 "type": "long",
6672 "name": "LongDecimal",
6673 "logicalType": "decimal"
6674 });
6675 let parse_result = Schema::parse(&schema)?;
6676 assert_eq!(parse_result, Schema::Long);
6678
6679 Ok(())
6680 }
6681
6682 #[test]
6683 fn avro_3896_uuid_schema_for_string() -> TestResult {
6684 let schema = json!(
6686 {
6687 "type": "string",
6688 "name": "StringUUID",
6689 "logicalType": "uuid"
6690 });
6691 let parse_result = Schema::parse(&schema)?;
6692 assert_eq!(parse_result, Schema::Uuid);
6693
6694 Ok(())
6695 }
6696
6697 #[test]
6698 #[serial(serde_is_human_readable)]
6699 fn avro_rs_53_uuid_with_fixed() -> TestResult {
6700 #[derive(Debug, Serialize, Deserialize)]
6701 struct Comment {
6702 id: crate::Uuid,
6703 }
6704
6705 impl AvroSchema for Comment {
6706 fn get_schema() -> Schema {
6707 Schema::parse_str(
6708 r#"{
6709 "type" : "record",
6710 "name" : "Comment",
6711 "fields" : [ {
6712 "name" : "id",
6713 "type" : {
6714 "type" : "fixed",
6715 "size" : 16,
6716 "logicalType" : "uuid",
6717 "name": "FixedUUID"
6718 }
6719 } ]
6720 }"#,
6721 )
6722 .expect("Invalid Comment Avro schema")
6723 }
6724 }
6725
6726 let payload = Comment {
6727 id: "de2df598-9948-4988-b00a-a41c0e287398".parse()?,
6728 };
6729 let mut buffer = Vec::new();
6730
6731 crate::util::SERDE_HUMAN_READABLE.store(true, Ordering::Release);
6733 let bytes = SpecificSingleObjectWriter::<Comment>::with_capacity(64)?
6734 .write_ref(&payload, &mut buffer)?;
6735 assert_eq!(bytes, 47);
6736
6737 crate::util::SERDE_HUMAN_READABLE.store(false, Ordering::Release);
6739 let bytes = SpecificSingleObjectWriter::<Comment>::with_capacity(64)?
6740 .write_ref(&payload, &mut buffer)?;
6741 assert_eq!(bytes, 27);
6742
6743 Ok(())
6744 }
6745
6746 #[test]
6747 fn avro_3926_uuid_schema_for_fixed_with_size_16() -> TestResult {
6748 let schema = json!(
6749 {
6750 "type": "fixed",
6751 "name": "FixedUUID",
6752 "size": 16,
6753 "logicalType": "uuid"
6754 });
6755 let parse_result = Schema::parse(&schema)?;
6756 assert_eq!(parse_result, Schema::Uuid);
6757 assert_not_logged(
6758 r#"Ignoring uuid logical type for a Fixed schema because its size (6) is not 16! Schema: Fixed(FixedSchema { name: Name { name: "FixedUUID", namespace: None }, aliases: None, doc: None, size: 6, attributes: {"logicalType": String("uuid")} })"#,
6759 );
6760
6761 Ok(())
6762 }
6763
6764 #[test]
6765 fn avro_3926_uuid_schema_for_fixed_with_size_different_than_16() -> TestResult {
6766 let schema = json!(
6767 {
6768 "type": "fixed",
6769 "name": "FixedUUID",
6770 "size": 6,
6771 "logicalType": "uuid"
6772 });
6773 let parse_result = Schema::parse(&schema)?;
6774
6775 assert_eq!(
6776 parse_result,
6777 Schema::Fixed(FixedSchema {
6778 name: Name::new("FixedUUID")?,
6779 aliases: None,
6780 doc: None,
6781 size: 6,
6782 default: None,
6783 attributes: BTreeMap::from([("logicalType".to_string(), "uuid".into())]),
6784 })
6785 );
6786 assert_logged(
6787 r#"Ignoring uuid logical type for a Fixed schema because its size (6) is not 16! Schema: Fixed(FixedSchema { name: Name { name: "FixedUUID", namespace: None }, aliases: None, doc: None, size: 6, default: None, attributes: {"logicalType": String("uuid")} })"#,
6788 );
6789
6790 Ok(())
6791 }
6792
6793 #[test]
6794 fn test_avro_3896_timestamp_millis_schema() -> TestResult {
6795 let schema = json!(
6797 {
6798 "type": "long",
6799 "name": "LongTimestampMillis",
6800 "logicalType": "timestamp-millis"
6801 });
6802 let parse_result = Schema::parse(&schema)?;
6803 assert_eq!(parse_result, Schema::TimestampMillis);
6804
6805 let schema = json!(
6807 {
6808 "type": "int",
6809 "name": "IntTimestampMillis",
6810 "logicalType": "timestamp-millis"
6811 });
6812 let parse_result = Schema::parse(&schema)?;
6813 assert_eq!(parse_result, Schema::Int);
6814
6815 Ok(())
6816 }
6817
6818 #[test]
6819 fn test_avro_3896_custom_bytes_schema() -> TestResult {
6820 let schema = json!(
6822 {
6823 "type": "bytes",
6824 "name": "BytesLog",
6825 "logicalType": "custom"
6826 });
6827 let parse_result = Schema::parse(&schema)?;
6828 assert_eq!(parse_result, Schema::Bytes);
6829 assert_eq!(parse_result.custom_attributes(), None);
6830
6831 Ok(())
6832 }
6833
6834 #[test]
6835 fn test_avro_3899_parse_decimal_type() -> TestResult {
6836 let schema = Schema::parse_str(
6837 r#"{
6838 "name": "InvalidDecimal",
6839 "type": "fixed",
6840 "size": 16,
6841 "logicalType": "decimal",
6842 "precision": 2,
6843 "scale": 3
6844 }"#,
6845 )?;
6846 match schema {
6847 Schema::Fixed(fixed_schema) => {
6848 let attrs = fixed_schema.attributes;
6849 let precision = attrs
6850 .get("precision")
6851 .expect("The 'precision' attribute is missing");
6852 let scale = attrs
6853 .get("scale")
6854 .expect("The 'scale' attribute is missing");
6855 assert_logged(&format!("Ignoring invalid decimal logical type: The decimal precision ({precision}) must be bigger or equal to the scale ({scale})"));
6856 }
6857 _ => unreachable!("Expected Schema::Fixed, got {:?}", schema),
6858 }
6859
6860 let schema = Schema::parse_str(
6861 r#"{
6862 "name": "ValidDecimal",
6863 "type": "bytes",
6864 "logicalType": "decimal",
6865 "precision": 3,
6866 "scale": 2
6867 }"#,
6868 )?;
6869 match schema {
6870 Schema::Decimal(_) => {
6871 assert_not_logged("Ignoring invalid decimal logical type: The decimal precision (2) must be bigger or equal to the scale (3)");
6872 }
6873 _ => unreachable!("Expected Schema::Decimal, got {:?}", schema),
6874 }
6875
6876 Ok(())
6877 }
6878
6879 #[test]
6880 fn avro_3920_serialize_record_with_custom_attributes() -> TestResult {
6881 let expected = {
6882 let mut lookup = BTreeMap::new();
6883 lookup.insert("value".to_owned(), 0);
6884 Schema::Record(RecordSchema {
6885 name: Name {
6886 name: "LongList".to_owned(),
6887 namespace: None,
6888 },
6889 aliases: Some(vec![Alias::new("LinkedLongs").unwrap()]),
6890 doc: None,
6891 fields: vec![RecordField {
6892 name: "value".to_string(),
6893 doc: None,
6894 default: None,
6895 aliases: None,
6896 schema: Schema::Long,
6897 order: RecordFieldOrder::Ascending,
6898 position: 0,
6899 custom_attributes: BTreeMap::from([("field-id".to_string(), 1.into())]),
6900 }],
6901 lookup,
6902 attributes: BTreeMap::from([("custom-attribute".to_string(), "value".into())]),
6903 })
6904 };
6905
6906 let value = serde_json::to_value(&expected)?;
6907 let serialized = serde_json::to_string(&value)?;
6908 assert_eq!(
6909 r#"{"aliases":["LinkedLongs"],"custom-attribute":"value","fields":[{"field-id":1,"name":"value","type":"long"}],"name":"LongList","type":"record"}"#,
6910 &serialized
6911 );
6912 assert_eq!(expected, Schema::parse_str(&serialized)?);
6913
6914 Ok(())
6915 }
6916
6917 #[test]
6918 fn test_avro_3925_serialize_decimal_inner_fixed() -> TestResult {
6919 let schema = Schema::Decimal(DecimalSchema {
6920 precision: 36,
6921 scale: 10,
6922 inner: Box::new(Schema::Fixed(FixedSchema {
6923 name: Name::new("decimal_36_10").unwrap(),
6924 aliases: None,
6925 doc: None,
6926 size: 16,
6927 default: None,
6928 attributes: Default::default(),
6929 })),
6930 });
6931
6932 let serialized_json = serde_json::to_string_pretty(&schema)?;
6933
6934 let expected_json = r#"{
6935 "type": "fixed",
6936 "name": "decimal_36_10",
6937 "size": 16,
6938 "logicalType": "decimal",
6939 "scale": 10,
6940 "precision": 36
6941}"#;
6942
6943 assert_eq!(serialized_json, expected_json);
6944
6945 Ok(())
6946 }
6947
6948 #[test]
6949 fn test_avro_3925_serialize_decimal_inner_bytes() -> TestResult {
6950 let schema = Schema::Decimal(DecimalSchema {
6951 precision: 36,
6952 scale: 10,
6953 inner: Box::new(Schema::Bytes),
6954 });
6955
6956 let serialized_json = serde_json::to_string_pretty(&schema)?;
6957
6958 let expected_json = r#"{
6959 "type": "bytes",
6960 "logicalType": "decimal",
6961 "scale": 10,
6962 "precision": 36
6963}"#;
6964
6965 assert_eq!(serialized_json, expected_json);
6966
6967 Ok(())
6968 }
6969
6970 #[test]
6971 fn test_avro_3925_serialize_decimal_inner_invalid() -> TestResult {
6972 let schema = Schema::Decimal(DecimalSchema {
6973 precision: 36,
6974 scale: 10,
6975 inner: Box::new(Schema::String),
6976 });
6977
6978 let serialized_json = serde_json::to_string_pretty(&schema);
6979
6980 assert!(serialized_json.is_err());
6981
6982 Ok(())
6983 }
6984
6985 #[test]
6986 fn test_avro_3927_serialize_array_with_custom_attributes() -> TestResult {
6987 let expected = Schema::array_with_attributes(
6988 Schema::Long,
6989 BTreeMap::from([("field-id".to_string(), "1".into())]),
6990 );
6991
6992 let value = serde_json::to_value(&expected)?;
6993 let serialized = serde_json::to_string(&value)?;
6994 assert_eq!(
6995 r#"{"field-id":"1","items":"long","type":"array"}"#,
6996 &serialized
6997 );
6998 let actual_schema = Schema::parse_str(&serialized)?;
6999 assert_eq!(expected, actual_schema);
7000 assert_eq!(
7001 expected.custom_attributes(),
7002 actual_schema.custom_attributes()
7003 );
7004
7005 Ok(())
7006 }
7007
7008 #[test]
7009 fn test_avro_3927_serialize_map_with_custom_attributes() -> TestResult {
7010 let expected = Schema::map_with_attributes(
7011 Schema::Long,
7012 BTreeMap::from([("field-id".to_string(), "1".into())]),
7013 );
7014
7015 let value = serde_json::to_value(&expected)?;
7016 let serialized = serde_json::to_string(&value)?;
7017 assert_eq!(
7018 r#"{"field-id":"1","type":"map","values":"long"}"#,
7019 &serialized
7020 );
7021 let actual_schema = Schema::parse_str(&serialized)?;
7022 assert_eq!(expected, actual_schema);
7023 assert_eq!(
7024 expected.custom_attributes(),
7025 actual_schema.custom_attributes()
7026 );
7027
7028 Ok(())
7029 }
7030
7031 #[test]
7032 fn avro_3928_parse_int_based_schema_with_default() -> TestResult {
7033 let schema = r#"
7034 {
7035 "type": "record",
7036 "name": "DateLogicalType",
7037 "fields": [ {
7038 "name": "birthday",
7039 "type": {"type": "int", "logicalType": "date"},
7040 "default": 1681601653
7041 } ]
7042 }"#;
7043
7044 match Schema::parse_str(schema)? {
7045 Schema::Record(record_schema) => {
7046 assert_eq!(record_schema.fields.len(), 1);
7047 let field = record_schema.fields.first().unwrap();
7048 assert_eq!(field.name, "birthday");
7049 assert_eq!(field.schema, Schema::Date);
7050 assert_eq!(
7051 types::Value::from(field.default.clone().unwrap()),
7052 types::Value::Int(1681601653)
7053 );
7054 }
7055 _ => unreachable!("Expected Schema::Record"),
7056 }
7057
7058 Ok(())
7059 }
7060
7061 #[test]
7062 fn avro_3946_union_with_single_type() -> TestResult {
7063 let schema = r#"
7064 {
7065 "type": "record",
7066 "name": "Issue",
7067 "namespace": "invalid.example",
7068 "fields": [
7069 {
7070 "name": "myField",
7071 "type": ["long"]
7072 }
7073 ]
7074 }"#;
7075
7076 let _ = Schema::parse_str(schema)?;
7077
7078 assert_logged(
7079 "Union schema with just one member! Consider dropping the union! \
7080 Please enable debug logging to find out which Record schema \
7081 declares the union with 'RUST_LOG=apache_avro::schema=debug'.",
7082 );
7083
7084 Ok(())
7085 }
7086
7087 #[test]
7088 fn avro_3946_union_without_any_types() -> TestResult {
7089 let schema = r#"
7090 {
7091 "type": "record",
7092 "name": "Issue",
7093 "namespace": "invalid.example",
7094 "fields": [
7095 {
7096 "name": "myField",
7097 "type": []
7098 }
7099 ]
7100 }"#;
7101
7102 let _ = Schema::parse_str(schema)?;
7103
7104 assert_logged(
7105 "Union schemas should have at least two members! \
7106 Please enable debug logging to find out which Record schema \
7107 declares the union with 'RUST_LOG=apache_avro::schema=debug'.",
7108 );
7109
7110 Ok(())
7111 }
7112
7113 #[test]
7114 fn avro_3965_fixed_schema_with_default_bigger_than_size() -> TestResult {
7115 match Schema::parse_str(
7116 r#"{
7117 "type": "fixed",
7118 "name": "test",
7119 "size": 1,
7120 "default": "123456789"
7121 }"#,
7122 ) {
7123 Ok(_schema) => panic!("Must fail!"),
7124 Err(err) => {
7125 assert_eq!(
7126 err.to_string(),
7127 "Fixed schema's default value length (9) does not match its size (1)"
7128 );
7129 }
7130 }
7131
7132 Ok(())
7133 }
7134
7135 #[test]
7136 fn avro_4004_canonical_form_strip_logical_types() -> TestResult {
7137 let schema_str = r#"
7138 {
7139 "type": "record",
7140 "name": "test",
7141 "fields": [
7142 {"name": "a", "type": "long", "default": 42, "doc": "The field a"},
7143 {"name": "b", "type": "string", "namespace": "test.a"},
7144 {"name": "c", "type": "long", "logicalType": "timestamp-micros"}
7145 ]
7146 }"#;
7147
7148 let schema = Schema::parse_str(schema_str)?;
7149 let canonical_form = schema.canonical_form();
7150 let fp_rabin = schema.fingerprint::<Rabin>();
7151 assert_eq!(
7152 r#"{"name":"test","type":"record","fields":[{"name":"a","type":"long"},{"name":"b","type":"string"},{"name":"c","type":{"type":"long"}}]}"#,
7153 canonical_form
7154 );
7155 assert_eq!("92f2ccef718c6754", fp_rabin.to_string());
7156 Ok(())
7157 }
7158
7159 #[test]
7160 fn avro_4055_should_fail_to_parse_invalid_schema() -> TestResult {
7161 let invalid_schema_str = r#"
7163 {
7164 "type": "record",
7165 "name": "SampleSchema",
7166 "fields": [
7167 {
7168 "name": "order",
7169 "type": "record",
7170 "fields": [
7171 {
7172 "name": "order_number",
7173 "type": ["null", "string"],
7174 "default": null
7175 },
7176 { "name": "order_date", "type": "string" }
7177 ]
7178 }
7179 ]
7180 }"#;
7181
7182 let schema = Schema::parse_str(invalid_schema_str);
7183 assert!(schema.is_err());
7184 assert_eq!(
7185 schema.unwrap_err().to_string(),
7186 "Invalid schema: There is no type called 'record', if you meant to define a non-primitive schema, it should be defined inside `type` attribute. Please review the specification"
7187 );
7188
7189 let valid_schema = r#"
7190 {
7191 "type": "record",
7192 "name": "SampleSchema",
7193 "fields": [
7194 {
7195 "name": "order",
7196 "type": {
7197 "type": "record",
7198 "name": "Order",
7199 "fields": [
7200 {
7201 "name": "order_number",
7202 "type": ["null", "string"],
7203 "default": null
7204 },
7205 { "name": "order_date", "type": "string" }
7206 ]
7207 }
7208 }
7209 ]
7210 }"#;
7211 let schema = Schema::parse_str(valid_schema);
7212 assert!(schema.is_ok());
7213
7214 Ok(())
7215 }
7216}